问题 在Java中编写多线程映射迭代器


我有一个通用的映射迭代器:这样的东西:

class Mapper<F, T> implements Iterator<T> {

  private Iterator<F> input;
  private Action<F, T> action;

  public Mapper(input, action) {...}

  public boolean hasNext() {
    return input.hasNext();
  }

  public T next() {
    return action.process(input.next());
  }
}

现在,假设action.process()可能非常耗时,我希望通过使用多个线程并行处理输入项来获得性能。我想分配一个N个工作线程池,并将项目分配给这些线程进行处理。这应该在“幕后”发生,因此客户端代码只能看到Iterator。代码应避免将输入或输出序列保存在内存中。

为了添加一个扭曲,我想要两个版本的解决方案,一个保留订单(最终迭代器以与输入迭代器相同的顺序交付项目),其中一个不一定保留订单(每个输出项目尽快交付)它是可用的)。

我有点工作,但代码似乎令人费解和不可靠,我不相信它正在使用最佳实践。

有关最简单,最可靠的实现方式的建议吗?我正在寻找适用于JDK 6的东西,我希望尽可能避免在外部库/框架上引入依赖。


3322
2018-01-06 09:49


起源

我不知道答案,但这是一个非常好的问题。当我编码这个时,我睡了一夜。附:虽然对编程很有热情,但我对它有点慢,因为我掌握问题并慢慢提出解决方案,而不是那些能在几分钟内完成冒泡的摇滚明星程序员。因此整夜。 - Sid
“......所以客户端代码只看到迭代器......”如果你实现了怎么办? Iterator<Future<T>> 代替 Iterator<T>? - Solomon Slow


答案:


我会为线程和a使用线程池 BlockingQueue 从池中喂养。

这似乎适用于我的简单测试用例。

interface Action<F, T> {

    public T process(F f);

}

class Mapper<F, T> implements Iterator<T> {

    protected final Iterator<F> input;
    protected final Action<F, T> action;

    public Mapper(Iterator<F> input, Action<F, T> action) {
        this.input = input;
        this.action = action;
    }

    @Override
    public boolean hasNext() {
        return input.hasNext();
    }

    @Override
    public T next() {
        return action.process(input.next());
    }
}

class ParallelMapper<F, T> extends Mapper<F, T> {

    // The pool.
    final ExecutorService pool;
    // The queue.
    final BlockingQueue<T> queue;
    // The next one to deliver.
    private T next = null;

    public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
        super(input, action);
        // Start my pool.
        pool = Executors.newFixedThreadPool(threads);
        // And the queue.
        queue = new ArrayBlockingQueue<>(queueLength);
    }

    class Worker implements Runnable {

        final F f;
        private T t;

        public Worker(F f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                queue.put(action.process(f));
            } catch (InterruptedException ex) {
                // Not sure what you can do here.
            }
        }

    }

    @Override
    public boolean hasNext() {
        // All done if delivered it and the input is empty and the queue is empty and the threads are finished.
        while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
            // First look in the queue.
            next = queue.poll();
            if (next == null) {
                // Queue empty.
                if (input.hasNext()) {
                    // Start a new worker.
                    pool.execute(new Worker(input.next()));
                }
            } else {
                // Input exhausted - shut down the pool - unless we already have.
                if (!pool.isShutdown()) {
                    pool.shutdown();
                }
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        T n = next;
        if (n != null) {
            // Delivered that one.
            next = null;
        } else {
            // Fails.
            throw new NoSuchElementException();
        }
        return n;
    }
}

public void test() {
    List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
    System.out.println("Data");
    for (Integer i : Iterables.in(data)) {
        System.out.println(i);
    }
    Action<Integer, Integer> action = new Action<Integer, Integer>() {

        @Override
        public Integer process(Integer f) {
            try {
                // Wait that many seconds.
                Thread.sleep(1000L * f);
            } catch (InterruptedException ex) {
                // Just give up.
            }
            // Return it unchanged.
            return f;
        }

    };
    System.out.println("Processed");
    for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
        System.out.println(i);
    }
    System.out.println("Parallel Processed");
    for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
        System.out.println(i);
    }

}

注意: Iterables.in(Iterator<T>) 只是创造一个 Iterable<T> 封装传递的 Iterator<T>

对于你的有序,你可以处理 Pair<Integer,F> 并使用一个 PriorityQueue 用于线程输出。然后你可以安排按顺序拉它们。


4
2018-01-06 11:07



非常感谢。我从这个答案中找到了基本的想法,并结合了其他一些答案的成分,我将在下面展示我自己的解决方案。 - Michael Kay


我不认为它可以用于并行线程,因为hasNext()可能返回true但是当线程调用next()时可能没有更多的元素。最好只使用next(),当没有更多的元素时,它将返回null


3
2018-01-06 10:11





好的,谢谢大家。这就是我所做的。

首先,我将ItemMappingFunction包装在Callable中:

private static class CallableAction<F extends Item, T extends Item> 
implements Callable<T> {
    private ItemMappingFunction<F, T> action;
    private F input;
    public CallableAction(ItemMappingFunction<F, T> action, F input) {
            this.action = action;
            this.input = input;
    }
    public T call() throws XPathException {
            return action.mapItem(input);
    }
}

我用标准的Iterator类描述了我的问题,但实际上我使用的是我自己的SequenceIterator接口,它有一个next()方法,它在序列结束时返回null。

我用这样的“普通”映射迭代器来声明该类:

public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {

    private ExecutorService service;
    private BlockingQueue<Future<T>> resultQueue = 
        new LinkedBlockingQueue<Future<T>>();

在初始化时,我创建服务并填充队列:

public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
        super(base, action);

        int maxThreads = Runtime.getRuntime().availableProcessors();
        maxThreads = maxThreads > 0 ? maxThreads : 1;
        service = Executors.newFixedThreadPool(maxThreads);

        // prime the queue
        int n = 0;
        while (n++ < maxThreads) {
            F item = (F) base.next();
            if (item == null) {
                return;
            }
            mapOneItem(item);
        }
    }

mapOneItem的位置是:

private void mapOneItem(F in) throws XPathException {
    Future<T> future = service.submit(new CallableAction(action, in));
    resultQueue.add(future);
}

当客户端请求下一个项目时,我首先将下一个输入项目提交给执行程序服务,然后获取下一个输出项目,等待它在必要时可用:

    public T next() throws XPathException {
        F nextIn = (F)base.next();
        if (nextIn != null) {
            mapOneItem(nextIn);
        }
        try {
            Future<T> future = resultQueue.poll();
            if (future == null) {
                service.shutdown();
                return null;
            } else {
                return future.get();
            }
        } catch (InterruptedException e) {
            throw new XPathException(e);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof XPathException) {
                throw (XPathException)e.getCause();
            }
            throw new XPathException(e);
        }
    }

3
2018-01-06 13:18



用一个 LinkedBlockingQueue 您可能会发现,如果供应比池可以处理的速度快得多,那么您最终可能会在队列中输入条目。显然这取决于您的环境。 - OldCurmudgeon
运用 resultQueue.poll() 您可以发现许多线程正在进行中,但它们都没有在队列中发布任何内容。看起来你会提前退出。这就是我使用的原因 while 循环在我的 hasNext。 哎呀 抱歉 - 我没注意到你用结果队列填充 Futures  - 划伤那一个。 - OldCurmudgeon


为了 action.process 被并行调用, next() 需要并行调用。那不是好习惯。相反,你可以使用 ExecutorCompletionService

看到 https://stackoverflow.com/a/1228445/360211

不幸的是,我相信这只会让您选择保留订单。


0
2018-01-06 10:02





我建议看一下JDK执行器框架。为您的操作创建任务(Runnables)。如果需要,使用线程池并行运行它们,否则按顺序运行它们。如果您最终需要订单,请提供任务序列号。但正如其他答案所述,迭代器对你来说效果不好,因为调用next()通常不是并行完成的。所以你甚至需要一个迭代器或者只是为了处理任务?


0
2018-01-06 10:28