forp

coderodde 11.03.12 20:39

Paralleeli for - rakenne.

 Tekstiversio  Arvo: 0 (0 ääntä)  Äänestä: +  -
/************************
* Files in the snippet: *
* (1) Main.java         *
* (2) Task.java         *
* (3) Processor.java    *
************************/


/////  ////////////
 /// Main.java ///
////////////  /////

import java.util.ArrayList;

import com.mureakuha.util.concurrent.Task;
import com.mureakuha.util.concurrent.Processor;
import static com.mureakuha.util.concurrent.Processor.forp;

public class Main {

    // Our task class. The only method to implement is call() from Callable -
    // interface. THE TASK CLASS MUST BE DECLARED PUBLIC OR OTHERWISE forp WILL
    // THROW IllegalAccessException!
    public static class FibonacciTask extends Task<Integer, Long>
    {
        // Implement call() returning some heavy computation result.
        public Long call(){
            return input == null ? -1L : fibonacci( input );
        }

        static long fibonacci( int i ){
            if( i < 0 )
                return -1L;

            switch( i )
            {
                case 0:
                    return 0L;

                case 1:
                    return 1L;

                default:
                    return fibonacci( i - 1 ) + fibonacci( i - 2 );
            }
        }
    }

    public static void main( String... args ){
        ArrayList<Integer> input = new ArrayList<Integer>();
        ArrayList<Long> output = new ArrayList<Long>();
        ArrayList<Long> serialOutput = new ArrayList<Long>();

        for( int i = 35; i <= 40; i++ )
            input.add( i );

        long ta = System.currentTimeMillis();

        // Compute a subsequence of Fibonacci numbers serially.
        for( Integer i : input )
            serialOutput.add( FibonacciTask.fibonacci( i ) );

        long tb = System.currentTimeMillis();

        System.out.print( "SERIAL:   [ " );

        for( Long l : serialOutput )
            System.out.print( l + " " );

        System.out.println( "] in " + (tb - ta) + " ms." );

        // A processor using up to 4 worker threads and computing Fibonacci -
        // numbers.
        Processor<Integer, Long> processor =
                new Processor<Integer, Long>( 4, new FibonacciTask() );

        ta = System.currentTimeMillis();
       
        // OUR NIFTY PARALLEL FOR. :=)
        // If you want to lower the time measurements, make sure you run this
        // code on a multicore or multi-CPU machine.
        forp( input, processor, output );

        tb = System.currentTimeMillis();

        System.out.print( "PARALLEL: [ " );

        for( Long l : output )
            System.out.print( l + " " );

        System.out.println( "] in " + (tb - ta) + " ms." );
    }
}

/////  ////////////
 /// Task.java ///
////////////  /////

package com.mureakuha.util.concurrent;

import java.util.concurrent.Callable;

public abstract class Task<I, O> implements Callable<O>{
    protected I input;

    public void init( I input ){
        this.input = input;
    }
}

/////  /////////////////
 /// Processor.java ///
/////////////////  /////

package com.mureakuha.util.concurrent;

import java.util.List;
import java.util.ArrayList;

import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;

public class Processor<I, O>{
    protected int threads;
    protected Task<I, O> task;

    /**
     * Constructs a new processor.
     * @param threads amount of worker threads to create.
     * @param task the task object performing translation of an input datum into
     * an output datum
     */

    public Processor( int threads, Task<I, O> task ){
        this.threads = threads;
        this.task = task;
    }

    /**
     * Computes in parallel output out of input using a single, specific
     * algorithm for converting an input datum into an output datum.
     * @param input the input data
     * @param output the list in which to store the output. In iteration order,
     * ith input datum will produce the ith output datum.
     */

    protected void process( List<I> input, List<O> output ){
        ExecutorService executorService =
                Executors.newFixedThreadPool( threads );

        ArrayList<Task<I, O>> tasks = new ArrayList<Task<I, O>>( input.size() );

        for( I i : input )
        {
            try {
                Task<I, O> t = this.task.getClass().newInstance();
                t.init( i );
                tasks.add( t );
            }
            catch( InstantiationException e ){
                e.printStackTrace();
            }
            catch( IllegalAccessException e ){
                e.printStackTrace();
            }
        }

        List<Future<O>> futures = null;

        try {
            futures = executorService.invokeAll( tasks );
        }
        catch( InterruptedException e ){
            e.printStackTrace();
            return;
        }

        // Stop taking new tasks and finnish executing each and every one.
        executorService.shutdown();

        // Load the output, if needed.
        if( output != null )
        {
            output.clear();

            for( Future<O> f : futures )
                try {
                    output.add( f.get() );
                }
                catch( InterruptedException e ){
                    e.printStackTrace();
                }
                catch( ExecutionException e ){
                    e.printStackTrace();
                }
        }
    }

    /**
     * Translates input data into output using a particular processor.
     * @param <I> type of input data
     * @param <O> type of output data
     * @param input input data
     * @param processor a processor to translate input data into output
     * @param output list in which output data will be stored
     */

    public static <I, O> void forp(
            List<I> input,
            Processor<I, O> processor,
            List<O> output
            )
    {
        processor.process( input, output );
    }
}