Friday, 16 September 2016

Java 5 - Producer/Consumer threads using executor framework


Creating a thread consumes a significant amount of memory. In an application where there are lot of client programs, creating a thread per client will not scale. So, Java 5 came up with an executor framework to provide a thread pool for execution limiting the number of threads serving client request at any point of time. This helps in performance and in reducing the memory requirement.

Java 5 also provides blocking queue implementations and we no longer requires to control producer/consumer applications using wait/notify. This is automatically taken care by BlockingQueue implementations.

An example producer/consumer making use of a blocking queue implementation and executor framework is as follows:


package com.prasune.test.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestExecutor {
   
    private static ExecutorService producerExecutors = Executors.newFixedThreadPool(5);
    private static ExecutorService consumerExecutors = Executors.newFixedThreadPool(5);
   
    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) {
       
        for (int i = 0; i < 1000; i++) {
            producerExecutors.execute(new Producer(queue));
        }

        for (int i = 0; i < 5; i++) {
            consumerExecutors.execute(new Consumer(queue));
        }
    }

}




Producer:


package com.prasune.test.concurrent;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue<String> queue;
   
    public Producer(BlockingQueue<String> queue){
        this.queue = queue;
    }
   
    @Override
    public void run() {
        try {
            queue.put("Message");
            //System.out.println(status);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}



Consumer:


package com.prasune.test.concurrent;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue<String> queue;
   
    public Consumer(BlockingQueue<String> queue){
        this.queue = queue;
    }
   
    @Override
    public void run() {
        while(true) {
            try {
                System.out.println(Thread.currentThread().getName() + " Received " +queue.take());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}