| Uutiset | Koodikirjasto | Wiki | Keskustelut | FAQ | Info |
forpcoderodde 11.03.12 20:39 Paralleeli for - rakenne.
/************************ * Files in the snippet: * * (1) Main.java * * (2) Task.java * * (3) Processor.java * ************************/ ///// //////////// /// Main.java /// //////////// ///// import java.util.ArrayList; import com.mureakuha.util.concurrent.Task; import com.mureakuha.util.concurrent.Processor; import static com.mureakuha.util.concurrent.Processor.forp; public class Main { // Our task class. The only method to implement is call() from Callable - // interface. THE TASK CLASS MUST BE DECLARED PUBLIC OR OTHERWISE forp WILL // THROW IllegalAccessException! public static class FibonacciTask extends Task<Integer, Long> { // Implement call() returning some heavy computation result. public Long call(){ return input == null ? -1L : fibonacci( input ); } static long fibonacci( int i ){ if( i < 0 ) return -1L; switch( i ) { case 0: return 0L; case 1: return 1L; default: return fibonacci( i - 1 ) + fibonacci( i - 2 ); } } } public static void main( String... args ){ ArrayList<Integer> input = new ArrayList<Integer>(); ArrayList<Long> output = new ArrayList<Long>(); ArrayList<Long> serialOutput = new ArrayList<Long>(); for( int i = 35; i <= 40; i++ ) input.add( i ); long ta = System.currentTimeMillis(); // Compute a subsequence of Fibonacci numbers serially. for( Integer i : input ) serialOutput.add( FibonacciTask.fibonacci( i ) ); long tb = System.currentTimeMillis(); System.out.print( "SERIAL: [ " ); for( Long l : serialOutput ) System.out.print( l + " " ); System.out.println( "] in " + (tb - ta) + " ms." ); // A processor using up to 4 worker threads and computing Fibonacci - // numbers. Processor<Integer, Long> processor = new Processor<Integer, Long>( 4, new FibonacciTask() ); ta = System.currentTimeMillis(); // OUR NIFTY PARALLEL FOR. :=) // If you want to lower the time measurements, make sure you run this // code on a multicore or multi-CPU machine. forp( input, processor, output ); tb = System.currentTimeMillis(); System.out.print( "PARALLEL: [ " ); for( Long l : output ) System.out.print( l + " " ); System.out.println( "] in " + (tb - ta) + " ms." ); } } ///// //////////// /// Task.java /// //////////// ///// package com.mureakuha.util.concurrent; import java.util.concurrent.Callable; public abstract class Task<I, O> implements Callable<O>{ protected I input; public void init( I input ){ this.input = input; } } ///// ///////////////// /// Processor.java /// ///////////////// ///// package com.mureakuha.util.concurrent; import java.util.List; import java.util.ArrayList; import java.util.concurrent.Future; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException; public class Processor<I, O>{ protected int threads; protected Task<I, O> task; /** * Constructs a new processor. * @param threads amount of worker threads to create. * @param task the task object performing translation of an input datum into * an output datum */ public Processor( int threads, Task<I, O> task ){ this.threads = threads; this.task = task; } /** * Computes in parallel output out of input using a single, specific * algorithm for converting an input datum into an output datum. * @param input the input data * @param output the list in which to store the output. In iteration order, * ith input datum will produce the ith output datum. */ protected void process( List<I> input, List<O> output ){ ExecutorService executorService = Executors.newFixedThreadPool( threads ); ArrayList<Task<I, O>> tasks = new ArrayList<Task<I, O>>( input.size() ); for( I i : input ) { try { Task<I, O> t = this.task.getClass().newInstance(); t.init( i ); tasks.add( t ); } catch( InstantiationException e ){ e.printStackTrace(); } catch( IllegalAccessException e ){ e.printStackTrace(); } } List<Future<O>> futures = null; try { futures = executorService.invokeAll( tasks ); } catch( InterruptedException e ){ e.printStackTrace(); return; } // Stop taking new tasks and finnish executing each and every one. executorService.shutdown(); // Load the output, if needed. if( output != null ) { output.clear(); for( Future<O> f : futures ) try { output.add( f.get() ); } catch( InterruptedException e ){ e.printStackTrace(); } catch( ExecutionException e ){ e.printStackTrace(); } } } /** * Translates input data into output using a particular processor. * @param <I> type of input data * @param <O> type of output data * @param input input data * @param processor a processor to translate input data into output * @param output list in which output data will be stored */ public static <I, O> void forp( List<I> input, Processor<I, O> processor, List<O> output ) { processor.process( input, output ); } } |
![]() Haku
|