问题 Clojure传感器与Java中的流中间操作具有相同的概念吗?


当我在Clojure中学习传感器时,它突然让我想起了他们提醒我的:Java 8流!

传感器 是可组合的算法转换。它们独立于输入和输出源的上下文,并且仅根据单个元素指定转换的本质。

一个  不是存储元素的数据结构;相反,它通过计算操作管道传递来自诸如数据结构,数组,生成器函数或I / O通道的源的元素。

Clojure的:

(def xf
  (comp
    (filter odd?)
    (map inc)
    (take 5)))

(println
  (transduce xf + (range 100)))  ; => 30
(println
  (into [] xf (range 100)))      ; => [2 4 6 8 10]

Java的:

// Purposely using Function and boxed primitive streams (instead of
// UnaryOperator<LongStream>) in order to keep it general.
Function<Stream<Long>, Stream<Long>> xf =
        s -> s.filter(n -> n % 2L == 1L)
                .map(n -> n + 1L)
                .limit(5L);

System.out.println(
        xf.apply(LongStream.range(0L, 100L).boxed())
                .reduce(0L, Math::addExact));    // => 30
System.out.println(
        xf.apply(LongStream.range(0L, 100L).boxed())
                .collect(Collectors.toList()));  // => [2, 4, 6, 8, 10]

除了静态/动态类型的不同之外,这些在目的和用途上看起来与我非常相似。

与Java流转换的类比是否是一种合理的传感器思考方式?如果没有,它是如何有缺陷的,或者两者在概念上有何不同(不是说实施)?


2164
2018-02-01 17:57


起源

对于我未经训练的眼睛,它似乎是这样。但他们对换能器有如此高的评价,也许还有更多的问题? :) - ZhongYu
语义相似性比比皆是! - Frank C.


答案:


主要区别在于动词(操作)的集合在某种程度上对于流是关闭的,而它对于传感器是开放的:试图例如实现 partition 在溪流上,感觉有点二等:

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;

public class StreamUtils {
    static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
        return Stream.of((Object) null).flatMap(x -> thunk.get());
    }

    static class Partitioner<T> implements Function<T, Stream<Stream<T>>> {
        final Function<T, ?> f;

        Object prev;
        Builder<T> sb;

        public Partitioner(Function<T, ?> f) {
            this.f = f;
        }

        public Stream<Stream<T>> apply(T t) {
            Object tag = f.apply(t);
            if (sb != null && prev.equals(tag)) {
                sb.accept(t);
                return Stream.empty();
            }
            Stream<Stream<T>> partition = sb == null ? Stream.empty() : Stream.of(sb.build());
            sb = Stream.builder();
            sb.accept(t);
            prev = tag;
            return partition;
        }

        Stream<Stream<T>> flush() {
            return sb == null ? Stream.empty() : Stream.of(sb.build());
        }
    }

    static <T> Stream<Stream<T>> partitionBy(Stream<T> in, Function<T, ?> f) {
        Partitioner<T> partitioner = new Partitioner<>(f);
        return Stream.concat(in.flatMap(partitioner), delay(() -> partitioner.flush()));
    }
}

与序列和缩减器一样,当您进行变换时,不要创建“更大”的计算,您可以创建“更大”的源。

为了能够通过计算,你已经介绍过了 xf 从Stream到Stream的函数,用于将操作从方法提升到第一类实体(以便从源中解开它们)。通过这样做,您已经创建了一个传感器,尽管接口太大。

以下是将任何(clojure)传感器应用于Stream的上述代码的更通用版本:

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;

import clojure.lang.AFn;
import clojure.lang.IFn;
import clojure.lang.Reduced;

public class StreamUtils {
    static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
        return Stream.of((Object) null).flatMap(x -> thunk.get());
    }

    static class Transducer implements Function {
        IFn rf;

        public Transducer(IFn xf) {
            rf = (IFn) xf.invoke(new AFn() {
                public Object invoke(Object acc) {
                    return acc;
                }

                public Object invoke(Object acc, Object item) {
                    ((Builder<Object>) acc).accept(item);
                    return acc;
                }
            });
        }

        public Stream<?> apply(Object t) {
            if (rf == null) return Stream.empty();
            Object ret = rf.invoke(Stream.builder(), t);
            if (ret instanceof Reduced) {
                Reduced red = (Reduced) ret;
                Builder<?> sb = (Builder<?>) red.deref();
                return Stream.concat(sb.build(), flush());
            }
            return ((Builder<?>) ret).build();
        }

        Stream<?> flush() {
            if (rf == null) return Stream.empty();
            Builder<?> sb = (Builder<?>) rf.invoke(Stream.builder());
            rf = null;
            return sb.build();
        }
    }

    static <T> Stream<?> withTransducer(Stream<T> in, IFn xf) {
        Transducer transducer = new Transducer(xf);
        return Stream.concat(in.flatMap(transducer), delay(() -> transducer.flush()));
    }
}

9
2018-02-01 22:33



先生,我可以请更多的话吗? - Arthur Ulfeldt
@ArthurUlfeldt更多的话,不确定它更好:) - cgrand
这确实更好!它表明,只有在很少或没有减少背景的微不足道的情况下它们才是相同的。我怀疑这正是要求的。 - Arthur Ulfeldt