我有一个通用的映射迭代器:这样的东西:
class Mapper<F, T> implements Iterator<T> {
private Iterator<F> input;
private Action<F, T> action;
public Mapper(input, action) {...}
public boolean hasNext() {
return input.hasNext();
}
public T next() {
return action.process(input.next());
}
}
现在,假设action.process()可能非常耗时,我希望通过使用多个线程并行处理输入项来获得性能。我想分配一个N个工作线程池,并将项目分配给这些线程进行处理。这应该在“幕后”发生,因此客户端代码只能看到Iterator。代码应避免将输入或输出序列保存在内存中。
为了添加一个扭曲,我想要两个版本的解决方案,一个保留订单(最终迭代器以与输入迭代器相同的顺序交付项目),其中一个不一定保留订单(每个输出项目尽快交付)它是可用的)。
我有点工作,但代码似乎令人费解和不可靠,我不相信它正在使用最佳实践。
有关最简单,最可靠的实现方式的建议吗?我正在寻找适用于JDK 6的东西,我希望尽可能避免在外部库/框架上引入依赖。
我会为线程和a使用线程池 BlockingQueue
从池中喂养。
这似乎适用于我的简单测试用例。
interface Action<F, T> {
public T process(F f);
}
class Mapper<F, T> implements Iterator<T> {
protected final Iterator<F> input;
protected final Action<F, T> action;
public Mapper(Iterator<F> input, Action<F, T> action) {
this.input = input;
this.action = action;
}
@Override
public boolean hasNext() {
return input.hasNext();
}
@Override
public T next() {
return action.process(input.next());
}
}
class ParallelMapper<F, T> extends Mapper<F, T> {
// The pool.
final ExecutorService pool;
// The queue.
final BlockingQueue<T> queue;
// The next one to deliver.
private T next = null;
public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
super(input, action);
// Start my pool.
pool = Executors.newFixedThreadPool(threads);
// And the queue.
queue = new ArrayBlockingQueue<>(queueLength);
}
class Worker implements Runnable {
final F f;
private T t;
public Worker(F f) {
this.f = f;
}
@Override
public void run() {
try {
queue.put(action.process(f));
} catch (InterruptedException ex) {
// Not sure what you can do here.
}
}
}
@Override
public boolean hasNext() {
// All done if delivered it and the input is empty and the queue is empty and the threads are finished.
while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
// First look in the queue.
next = queue.poll();
if (next == null) {
// Queue empty.
if (input.hasNext()) {
// Start a new worker.
pool.execute(new Worker(input.next()));
}
} else {
// Input exhausted - shut down the pool - unless we already have.
if (!pool.isShutdown()) {
pool.shutdown();
}
}
}
return next != null;
}
@Override
public T next() {
T n = next;
if (n != null) {
// Delivered that one.
next = null;
} else {
// Fails.
throw new NoSuchElementException();
}
return n;
}
}
public void test() {
List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
System.out.println("Data");
for (Integer i : Iterables.in(data)) {
System.out.println(i);
}
Action<Integer, Integer> action = new Action<Integer, Integer>() {
@Override
public Integer process(Integer f) {
try {
// Wait that many seconds.
Thread.sleep(1000L * f);
} catch (InterruptedException ex) {
// Just give up.
}
// Return it unchanged.
return f;
}
};
System.out.println("Processed");
for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
System.out.println(i);
}
System.out.println("Parallel Processed");
for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
System.out.println(i);
}
}
注意: Iterables.in(Iterator<T>)
只是创造一个 Iterable<T>
封装传递的 Iterator<T>
。
对于你的有序,你可以处理 Pair<Integer,F>
并使用一个 PriorityQueue
用于线程输出。然后你可以安排按顺序拉它们。
我不认为它可以用于并行线程,因为hasNext()可能返回true但是当线程调用next()时可能没有更多的元素。最好只使用next(),当没有更多的元素时,它将返回null
好的,谢谢大家。这就是我所做的。
首先,我将ItemMappingFunction包装在Callable中:
private static class CallableAction<F extends Item, T extends Item>
implements Callable<T> {
private ItemMappingFunction<F, T> action;
private F input;
public CallableAction(ItemMappingFunction<F, T> action, F input) {
this.action = action;
this.input = input;
}
public T call() throws XPathException {
return action.mapItem(input);
}
}
我用标准的Iterator类描述了我的问题,但实际上我使用的是我自己的SequenceIterator接口,它有一个next()方法,它在序列结束时返回null。
我用这样的“普通”映射迭代器来声明该类:
public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {
private ExecutorService service;
private BlockingQueue<Future<T>> resultQueue =
new LinkedBlockingQueue<Future<T>>();
在初始化时,我创建服务并填充队列:
public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
super(base, action);
int maxThreads = Runtime.getRuntime().availableProcessors();
maxThreads = maxThreads > 0 ? maxThreads : 1;
service = Executors.newFixedThreadPool(maxThreads);
// prime the queue
int n = 0;
while (n++ < maxThreads) {
F item = (F) base.next();
if (item == null) {
return;
}
mapOneItem(item);
}
}
mapOneItem的位置是:
private void mapOneItem(F in) throws XPathException {
Future<T> future = service.submit(new CallableAction(action, in));
resultQueue.add(future);
}
当客户端请求下一个项目时,我首先将下一个输入项目提交给执行程序服务,然后获取下一个输出项目,等待它在必要时可用:
public T next() throws XPathException {
F nextIn = (F)base.next();
if (nextIn != null) {
mapOneItem(nextIn);
}
try {
Future<T> future = resultQueue.poll();
if (future == null) {
service.shutdown();
return null;
} else {
return future.get();
}
} catch (InterruptedException e) {
throw new XPathException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof XPathException) {
throw (XPathException)e.getCause();
}
throw new XPathException(e);
}
}
为了 action.process
被并行调用, next()
需要并行调用。那不是好习惯。相反,你可以使用 ExecutorCompletionService。
看到 https://stackoverflow.com/a/1228445/360211
不幸的是,我相信这只会让您选择保留订单。
我建议看一下JDK执行器框架。为您的操作创建任务(Runnables)。如果需要,使用线程池并行运行它们,否则按顺序运行它们。如果您最终需要订单,请提供任务序列号。但正如其他答案所述,迭代器对你来说效果不好,因为调用next()通常不是并行完成的。所以你甚至需要一个迭代器或者只是为了处理任务?