Producers and consumers

coderodde 06.03.12 18:33

Kaksi tuottajasäiettä ja kolme kuluttajasäiettä äärellisellä puskurilla Producer/Consumer - idiomissa.

 Tekstiversio  Arvo: 0 (0 ääntä)  Äänestä: +  -
/*************************
 * Files in the snippet: *
 * (1) RingBuffer.java   *
 * (2) Main.java         *
 ************************/


////  /////////////////
 // RingBuffer.java //
/////////////////  ////

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.ConcurrentModificationException;

public class RingBuffer<T> implements Iterable<T> {
    protected Object[] buffer;
    protected int head;
    protected int size;
    protected final int capacity;
    private volatile long modCount = 0;

    public RingBuffer( int capacity ){
        buffer = new Object[capacity];
        this.capacity = capacity;
    }

    /**
     * Add an element into this FIFO RingBuffer.
     * @param element element to add
     * @throws RingBuffer.FullBufferException
     */

    public void add( T element ) throws FullBufferException {
        if( size == capacity )
            throw new FullBufferException( "Ring buffer is full." );

        buffer[(head + size++) % capacity] = element;
        modCount++;
    }

    /**
     * Get the least recently added element without removing it from this
     * buffer.
     * @return least recently added element
     */

    public T getFirst(){
        if( size == 0 )
            throw new NoSuchElementException(
                    "Reading from an empty ring buffer."
                    );

        return (T) buffer[head];
    }

    /**
     * Get the least recently added element and remove it from this buffer.
     * @return least recently added element
     */

    public T removeFirst(){
        if( size == 0 )
            throw new NoSuchElementException(
                    "Reading from an empty ring buffer."
                    );

        size--;
        T element = (T) buffer[head];
        head = (head + 1) % capacity;
        modCount++;
        return element;
    }

    public int size(){
        return size;
    }

    public int getCapacity(){
        return capacity;
    }

    public Iterator<T> iterator(){
        return new RingBufferIterator<T>();
    }

    public class RingBufferIterator<T> implements Iterator<T> {
        private int iterated;
        private final long modCount;

        public RingBufferIterator(){
            modCount = RingBuffer.this.modCount;
        }

        public boolean hasNext(){
            if( modCount != RingBuffer.this.modCount )
                throw new ConcurrentModificationException();

            return iterated < RingBuffer.this.size;
        }

        public T next(){
            if( modCount != RingBuffer.this.modCount )
                throw new ConcurrentModificationException();

            return (T) RingBuffer.this.buffer[(head + iterated++) % capacity];
        }

        public void remove(){
            if( modCount != RingBuffer.this.modCount )
                throw new ConcurrentModificationException();
        }
    }

    public static class FullBufferException extends Exception {
        public FullBufferException( String msg ){
            super( msg );
        }
    }
}

////  ///////////
 // Main.java //
///////////  ////

import java.util.Random;
import java.util.concurrent.Semaphore;

public class Main {
    static final int PRODUCERS = 2;
    static final int CONSUMERS = 3;
    static final int N = 7;

    // Guards against an empty buffer. 0 indicates the amount of initial
    // semaphore "permits", so that consumers block on an empty buffer.
    static final Semaphore notEmpty = new Semaphore( 0, true );

    // Guards against a full buffer. N indicates the amount of semaphore
    // "permits", which should be the same as the capacity of an empty buffer.
    static final Semaphore notFull = new Semaphore( N, true );

    // Excludes concurrent operations on buffer. 1 indicates the amount of
    // initial "permits" so that only one thread may operate on our buffer.
    static final Semaphore mutex = new Semaphore( 1, true );

    static class Consumer extends Thread {
        private final RingBuffer<Integer> buffer;

        Consumer( RingBuffer<Integer> buffer ){
            this.buffer = buffer;
        }

        public void run(){
            for( int iterations = 0;;)
            {
                try {
                    // Wait until can read stuff from buffer.
                    notEmpty.acquire();
                    // Wait until no any other thread operates on buffer.
                    mutex.acquire();
                }
                catch( InterruptedException ie ){
                    continue;
                }

                // Do something with a datum. We restrict ourselves only to
                // printing the datum to stdout.
                consume( buffer.removeFirst() );

                // Allows other threads to perform operations on the buffer.
                mutex.release();

                // Indicates that buffer is not full so that producer threads
                // may produce more data for buffer.
                notFull.release();

                if( iterations > 8 )
                    try {
                        // Simulate a full buffer.
                        Thread.sleep( 500 );
                    }
                    catch( InterruptedException ie ){}
                else
                    iterations++;
            }
        }

        void consume( int integer ){
            System.out.println(
                    "Consumer " + this.getName() + " consumed " + integer + "."
                    );
        }
    }

    static class Producer extends Thread {
        private final RingBuffer<Integer> buffer;
        private final Random random;
        private int productAmount;

        Producer( RingBuffer<Integer> buffer, long seed ){
            this.buffer = buffer;
            this.random = new Random( seed );
        }

        public void run(){
            for(;;)
            {
                try {
                    // Wait until can fit a datum into the buffer.
                    notFull.acquire();
                    // Wait until no other threads operate on buffer.
                    mutex.acquire();
                }
                catch( InterruptedException ie ){
                    ie.printStackTrace();
                    continue;
                }

                try {
                    // Produce, add to buffer.
                    int product = produce();
                    buffer.add( product );
                    System.out.print(
                            "  Producer " + this.getName() + " produced " +
                            product + ", "
                            );
                }
                catch( RingBuffer.FullBufferException fbe ){
                    // Should never happen.
                    System.out.println(
                            "Error: buffer is full."
                            );

                    System.exit( -1 );
                }

                // Print the entire contents of the buffer.
                System.out.print( "buffer: " );
                printBuffer( buffer );

                // Allows other threads to operate on the buffer.
                mutex.release();

                // Indicates that buffer is not empty so that consumer threads
                // may proceed consuming.
                notEmpty.release();
               
                if( ++productAmount < 10 )
                    try {
                        // Simulate an empty buffer.
                        Thread.sleep( 300 );
                    }
                    catch( InterruptedException ie ){}
            }
        }

        int produce(){
            return random.nextInt( 100 );
        }

        /**
         * Prints to stdout the contents of the buffer.
         * @param buffer buffer to print
         */

        void printBuffer( RingBuffer<Integer> buffer ){
            System.out.print( "[ " );

            for( Integer i : buffer )
                System.out.print( i + " " );

            System.out.println( "]." );
        }
    }

    public static void main( String... args ){
        RingBuffer<Integer> buffer = new RingBuffer<Integer>( N );
        Producer[] producers = new Producer[PRODUCERS];
        Consumer[] consumers = new Consumer[CONSUMERS];

        for( int i = 0; i < PRODUCERS; i++ )
            producers[i] = new Producer( buffer, 100L + i );

        for( int i = 0; i < CONSUMERS; i++ )
            consumers[i] = new Consumer( buffer );

        // Creates 2 producer threads and 3 consumer threads and proceeds
        // executing them.
        for( Consumer c : consumers ) c.start();
        for( Producer p : producers ) p.start();
    }
}