Saturday, September 10, 2005

JDK 5 concurrency API: group / batch thread pool

First up, let me say that the new concurrency API in JDK 5 is indeed a boon for the Java community especially for developers (including yours truly) who before this indulged in threads and concurrent programming only sparingly. Not because we didn’t know how to do it but because getting it right was quite an ordeal. The concurrency API should surely make concurrent programming, the bastion of only a selected few so far, more "mainstream".

So here’s my scenario: I need for a group / batch of tasks to execute concurrently and additionally, I need to wait until all of them have finished executing before moving forward.

Levaraging the new concurrency API; to implement this I can use the Executors factory to create a new thread pool. (A thread pool being an instance of ExecutorService.) To this pool I can submit the tasks of my batch which will be executed according to the policies of the pool. Then if I want to wait on all of them to complete, I need to call shutdown() followed by awaitTermination(). With this my code will indeed block until all tasks have been executed but the problem is that the thread pool no longer accepts any new tasks. So for my next batch of tasks I need to create a new thread pool all over again - which obviously is unneeded and expensive.

All said and done, I need an awaitExecution() method which like awaitTermination() blocks until all tasks have completed but unlike the shutdown() + awaitTermination() combo does not reject new tasks.

Below is a simple wrapper with the awaitExecution() method included. You can ofcourse use any of the extension patterns - decorator, adapter, etc. - for a more refined solution.


public class GroupThreadPool {
protected ExecutorService pool;
protected ArrayList<Future> futures = new ArrayList<Future>();

public GroupThreadPool(int poolSize) {
pool = Executors.newFixedThreadPool(poolSize);
}

public void submit(Runnable command) {
futures.add(pool.submit(command));
}

public void awaitExecution() {
try {
for (Iterator<Future> iter = futures.iterator(); iter.hasNext(); ) {
iter.next().get(); //blocking call
}
} catch (Exception ignore) {
} finally {
futures.clear();
}
}
}

The user creates this GroupThreadPool just once, calls submit() to submit various tasks in a batch and then calls awaitExecution() to block until all tasks have executed. He can continue to use the same GroupThreadPool object to execute subsequent batches.

The implementation adds the submitted tasks to a list of Futures. To block until all tasks have completed, it calls get() on all Futures which itself is a blocking operation. So awaitExecution() returns only after all tasks have been executed but before returning it clears the list of Futures to accept the next batch of tasks.

I would love suggestions / feedback on this implementation. Is there a better approach? Also, is this a common use case which merits inclusion of awaitExecution() in ExecutorService itself?

No comments: