Geeks With Blogs
Josh Reuben


in java.util.concurrent package - JDK 7

A Framework for Divide and Conquer

recursively divides a task into smaller subtasks until threshold check indicates subtask size is small enough to execute serially. Optimal threshold is affected by specific computational steps & obtained through profiling – heuristic: between 100 and 10000.

abstracts multithreading - automatically scale up.

Leverages work-stealing - Each worker thread maintains a queue of tasks. If one worker thread’s queue is empty, it will take a task from another worker thread.

uses daemon threads - automatically terminated when all user threads have terminated -no need to explicitly shut down

Classes

  • ForkJoinTask<T> - abstract class that defines a task. implements Serializable.

  • ForkJoinPool – schedules execution of ForkJoinTask<T> objects on thread pool

  • RecursiveAction – extends ForkJoinTask<T> for tasks that do not return values

  • RecursiveTask<T> - extends ForkJoinTask<T> for tasks that return values. Must aggregate the results, so that when the first invocation finishes, it returns the overall result.

Using the framework requires custom classes that extend RecursiveAction / RecursiveTask<T> and @override its compute() method

ForkJoinTask<T> Members

  • compute( ) - implement the recursive divide-and-conquer strategy and its threshold branch – implementer must override.

  • fork() - submit the invoking task for asynchronous execution. Non-blocking. Can be called only from another ForkJoinTask running within a ForkJoinPool. use to execute subtasks.

  • join() - waits until the task on which it is called terminates and returns the task result

  • invoke() - combines fork and join operations into a single synchronous call – convenience method that starts a task and then waits for it to terminate.

  • invokeAll() - invoke multiple tasks

  • cancel( ) - returns true if the task is cancelled, returns false if task can’t be cancelled, has already completed or was already cancelled.

  • isCancelled( ) - determine if a task has been cancelled by calling

  • isCompletedNormally( ) / isCompletedAbnormally( ) - Determine a Task’s Completion Status

  • reinitialize( ) - Reset state of task after it has completed so it can be rerun

  • inForkJoinPool( ) - determine if code is executing inside a task

  • adapt() - convert a Runnable or Callable object into a ForkJoinTask ( the run() or call( ) method is executed)

  • getQueuedTaskCount( ) - obtain approx count of number of tasks in queue of invoking thread

  • getSurplusQueuedTaskCount( ) - obtain approx count of number of tasks the invoking thread has in its queue that exceed the number of other threads in the pool that might steal them

  • quietlyJoin() / quitlyInvoke() - don’t return values or throw exceptions.

  • tryUnfork( ) - unschedule a task

ForkJoinPool Members

  • Ctor - pLevel argument specifies level of parallelism – valid range: one to implementation defined limit. By default uses the number of cores via Runtime.availableProcessors().

  • getParallelism( ) - get the level of parallelism

  • execute() - initiate the asynchronous execution of a task

  • invoke() - initiate the synchronous execution of a task

  • shutdown( ) - force termination of all threads

  • toString( ) - displays state of the pool.

  • isQuiescent( ) - determine if a pool is currently idle - returns true if the pool has no active threads

  • getPoolSize( ) - obtain number of worker threads currently in the pool

  • getActiveThreadCount( ) - obtain approx count of active threads in the pool

  • shutdown( ) - shut down pool gracefully - currently active tasks will still be executed, but no new tasks can be started.

  • shutdownNow( ) - stop a pool immediately - cancel currently active tasks.

  • isShutdown( ) - determine if pool is shut down

  • isTerminated( ) - determine if pool has been shut down with all tasks completed


RecursiveAction Example

import java.util.*;

import java.util.concurrent.*;


class MySqrtAction extends RecursiveAction {

final int seqThreshold = 1000;

double[] data;

int start, end;


MySqrtAction(double[] vals, int s, int e) {

data = vals;

start = s;

end = e;

}


@override

protected void compute() {

// if #elements < threshold then process sequentially

// else invoke new tasks using subdivided data

if ((end – start) < seqThreshold) {

for (int i=0; i< seqThreshold; i++) {

data[i] = Math.sqrt(data[i]);

}

} else {

int middle = (start + end) / 2;

invokeAll ( new MySqrtAction(data, start, middle),

new MySqrtAction(data, middle, end));

}

}

}


void testForkJoinAction () {

double[] nums = new double[100000];

for (int i=0; i < nums.length; i++) nums[i] = (double)i;

ForkJoinPool fjp = new ForkJoinPool();

MySqrtAction action = new MySqrtAction(nums, 0, nums.length);

fjp.invoke (action);

}


RecursiveTask<T> Example

import java.util.*;

import java.util.concurrent.*;


class MySumTask extends RecursiveTask<Double> {

final int seqThreshold = 1000;

double[] data;

int start, end;


MySumTask(double[] vals, int s, int e) {

data = vals;

start = s;

end = e;

}


@override

protected double compute() {

double sum = 0;

// if #elements < threshold then process sequentially

// else invoke new tasks using subdivided data

if ((end – start) < seqThreshold) {

for (int i=0; i< seqThreshold; i++) {

sum += data[i];

}

} else {

int middle = (start + end) / 2;

MySumTask t1 = new MySumTask(data, start; middle),

MySumTask t2 = new MySumTask(data, middle, end);

// start each subtask by forking:

t1.fork();

t2.fork();

sum = t1.join() + t2.join();

}

return sum;

}

}


void testForkJoinTask () {

double[] nums = new double[100000];

for (int i=0; i < nums.length; i++) nums[i] = (double)i;

ForkJoinPool fjp = new ForkJoinPool();

MySumTask task = new MySumTask(nums, 0, nums.length);

double d = fjp.invoke (task);

}

Tips

  • Avoid an overly low sequential threshold – leads to task thrashing

  • Use the default level of parallelism

  • Avoid synchronization or other blocking code


Posted on Sunday, February 15, 2015 6:30 AM Parallelism , Java | Back to top


Comments on this post: Java Fork-Join

# re: Java Fork-Join
Requesting Gravatar...
The process is easy to understand and is a very useful information. - Mark Zokle
Left by Michael Douglas on Dec 20, 2016 6:07 PM

Your comment:
 (will show your gravatar)


Copyright © JoshReuben | Powered by: GeeksWithBlogs.net