| Uutiset | Koodikirjasto | Wiki | Keskustelut | FAQ | Info |
Producers and consumerscoderodde 06.03.12 18:33 Kaksi tuottajasäiettä ja kolme kuluttajasäiettä äärellisellä puskurilla Producer/Consumer - idiomissa.
/************************* * Files in the snippet: * * (1) RingBuffer.java * * (2) Main.java * ************************/ //// ///////////////// // RingBuffer.java // ///////////////// //// import java.util.Iterator; import java.util.NoSuchElementException; import java.util.ConcurrentModificationException; public class RingBuffer<T> implements Iterable<T> { protected Object[] buffer; protected int head; protected int size; protected final int capacity; private volatile long modCount = 0; public RingBuffer( int capacity ){ buffer = new Object[capacity]; this.capacity = capacity; } /** * Add an element into this FIFO RingBuffer. * @param element element to add * @throws RingBuffer.FullBufferException */ public void add( T element ) throws FullBufferException { if( size == capacity ) throw new FullBufferException( "Ring buffer is full." ); buffer[(head + size++) % capacity] = element; modCount++; } /** * Get the least recently added element without removing it from this * buffer. * @return least recently added element */ public T getFirst(){ if( size == 0 ) throw new NoSuchElementException( "Reading from an empty ring buffer." ); return (T) buffer[head]; } /** * Get the least recently added element and remove it from this buffer. * @return least recently added element */ public T removeFirst(){ if( size == 0 ) throw new NoSuchElementException( "Reading from an empty ring buffer." ); size--; T element = (T) buffer[head]; head = (head + 1) % capacity; modCount++; return element; } public int size(){ return size; } public int getCapacity(){ return capacity; } public Iterator<T> iterator(){ return new RingBufferIterator<T>(); } public class RingBufferIterator<T> implements Iterator<T> { private int iterated; private final long modCount; public RingBufferIterator(){ modCount = RingBuffer.this.modCount; } public boolean hasNext(){ if( modCount != RingBuffer.this.modCount ) throw new ConcurrentModificationException(); return iterated < RingBuffer.this.size; } public T next(){ if( modCount != RingBuffer.this.modCount ) throw new ConcurrentModificationException(); return (T) RingBuffer.this.buffer[(head + iterated++) % capacity]; } public void remove(){ if( modCount != RingBuffer.this.modCount ) throw new ConcurrentModificationException(); } } public static class FullBufferException extends Exception { public FullBufferException( String msg ){ super( msg ); } } } //// /////////// // Main.java // /////////// //// import java.util.Random; import java.util.concurrent.Semaphore; public class Main { static final int PRODUCERS = 2; static final int CONSUMERS = 3; static final int N = 7; // Guards against an empty buffer. 0 indicates the amount of initial // semaphore "permits", so that consumers block on an empty buffer. static final Semaphore notEmpty = new Semaphore( 0, true ); // Guards against a full buffer. N indicates the amount of semaphore // "permits", which should be the same as the capacity of an empty buffer. static final Semaphore notFull = new Semaphore( N, true ); // Excludes concurrent operations on buffer. 1 indicates the amount of // initial "permits" so that only one thread may operate on our buffer. static final Semaphore mutex = new Semaphore( 1, true ); static class Consumer extends Thread { private final RingBuffer<Integer> buffer; Consumer( RingBuffer<Integer> buffer ){ this.buffer = buffer; } public void run(){ for( int iterations = 0;;) { try { // Wait until can read stuff from buffer. notEmpty.acquire(); // Wait until no any other thread operates on buffer. mutex.acquire(); } catch( InterruptedException ie ){ continue; } // Do something with a datum. We restrict ourselves only to // printing the datum to stdout. consume( buffer.removeFirst() ); // Allows other threads to perform operations on the buffer. mutex.release(); // Indicates that buffer is not full so that producer threads // may produce more data for buffer. notFull.release(); if( iterations > 8 ) try { // Simulate a full buffer. Thread.sleep( 500 ); } catch( InterruptedException ie ){} else iterations++; } } void consume( int integer ){ System.out.println( "Consumer " + this.getName() + " consumed " + integer + "." ); } } static class Producer extends Thread { private final RingBuffer<Integer> buffer; private final Random random; private int productAmount; Producer( RingBuffer<Integer> buffer, long seed ){ this.buffer = buffer; this.random = new Random( seed ); } public void run(){ for(;;) { try { // Wait until can fit a datum into the buffer. notFull.acquire(); // Wait until no other threads operate on buffer. mutex.acquire(); } catch( InterruptedException ie ){ ie.printStackTrace(); continue; } try { // Produce, add to buffer. int product = produce(); buffer.add( product ); System.out.print( " Producer " + this.getName() + " produced " + product + ", " ); } catch( RingBuffer.FullBufferException fbe ){ // Should never happen. System.out.println( "Error: buffer is full." ); System.exit( -1 ); } // Print the entire contents of the buffer. System.out.print( "buffer: " ); printBuffer( buffer ); // Allows other threads to operate on the buffer. mutex.release(); // Indicates that buffer is not empty so that consumer threads // may proceed consuming. notEmpty.release(); if( ++productAmount < 10 ) try { // Simulate an empty buffer. Thread.sleep( 300 ); } catch( InterruptedException ie ){} } } int produce(){ return random.nextInt( 100 ); } /** * Prints to stdout the contents of the buffer. * @param buffer buffer to print */ void printBuffer( RingBuffer<Integer> buffer ){ System.out.print( "[ " ); for( Integer i : buffer ) System.out.print( i + " " ); System.out.println( "]." ); } } public static void main( String... args ){ RingBuffer<Integer> buffer = new RingBuffer<Integer>( N ); Producer[] producers = new Producer[PRODUCERS]; Consumer[] consumers = new Consumer[CONSUMERS]; for( int i = 0; i < PRODUCERS; i++ ) producers[i] = new Producer( buffer, 100L + i ); for( int i = 0; i < CONSUMERS; i++ ) consumers[i] = new Consumer( buffer ); // Creates 2 producer threads and 3 consumer threads and proceeds // executing them. for( Consumer c : consumers ) c.start(); for( Producer p : producers ) p.start(); } } |
![]() Haku
|