Friday 16 September 2016

Java 8 - Producer/Consumer threads using executor framework


Creating a thread consumes a significant amount of memory. In an application where there are lot of client programs, creating a thread per client will not scale. So, Java 5 came up with an executor framework to provide a thread pool for execution limiting the number of threads serving client request at any point of time. This helps in performance and in reducing the memory requirement.

Java 5 also provides blocking queue implementations and we no longer requires to control producer/consumer applications using wait/notify. This is automatically taken care by BlockingQueue implementations.

An example producer/consumer making use of a blocking queue implementation and executor framework is as follows:


package com.prasune.coding.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TestProducerConsumer {

    private static final int NUM_OF_MSGS = 20;
    private static final BlockingQueue<String> queue 
                                              = new ArrayBlockingQueue<String>(5);
    private static ExecutorService producerPool = Executors.newFixedThreadPool(3);
    private static ExecutorService consumerPool = Executors.newFixedThreadPool(1);

    private static Logger logger =                                                                               Logger.getLogger(TestProducerConsumer.class.getName());

    public static void main(String[] args) {
        Runnable producerTask = () -> {
            try {
                queue.put("test Message");
                System.out.println(Thread.currentThread().getName() 
                                   + " put message queue.size() " + queue.size());
            } catch (InterruptedException e) {
                logger.log(Level.SEVERE, e.getMessage(), e);
            }
        };
        Runnable consumerTask = () -> {
            try {
                System.out.println(Thread.currentThread().getName() 
                                   + " received msg " + queue.take());

            } catch (InterruptedException e) {
                logger.log(Level.SEVERE, e.getMessage(), e);
            }
        };
        try {
            for (int i = 0; i < NUM_OF_MSGS; i++) {
                producerPool.submit(producerTask);
            }
            for (int i = 0; i < NUM_OF_MSGS; i++) {
                consumerPool.submit(consumerTask);
            }
        } finally {
            if (producerPool != null) {
                producerPool.shutdown();
            }
            if (consumerPool != null) {
                consumerPool.shutdown();
            }
        }
    }
}



No comments:

Post a Comment