Blocking Queue Implementation in Java

Blocking Queue Implementation in Java thumbnail
22K
By Dhiraj 20 March, 2020

In the last article, we created a custom implementation of the Queue data structure in Java. In this article, we will be creating a custom implementation of Blocking Queue in Java. We will provide a LinkedList implementation of it and perform multiple operations such as put() and take().

As we will be using a Linked List for the implementation, I would recommend checking this article first which talks about a custom implementation of the Linked list in Java.

What is Blocking Queue

It is a queue implementation inside java.util.concurrent package that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element. BlockingQueue implementations are designed to be used primarily for producer-consumer queues but additionally support the Collection interface.

Suppose, we have an empty queue and a thread invokes it take() method to retrieve an element then the thread gets blocked until there is an element available for retrieval.

blocking-queue

Node Class Implementation

As discussed above, we will be using LinkedList for the queue implementation and a linked list is a linear data structure with interconnected nodes. A simple node contains two elements - data and a pointer to the next node.

public class Node {

    private int data;
    private Node nextNode;

    //setters and getters

}

Blocking Queue Template Implementation

A queue implementation has two pointers as front and rear which points to front and rear of the queue respectively and this is the reason the time complexity for enQueue and deQueue process is O(1). We also have a capacity count of the blocking queue.

During the put() and take() operation, this capacity is checked. In case, the capacity is reached or the queue is empty the thread accessing the queue gets blocked.

public class BlockingQueue {

    private Node front;
    private Node rear;
    private int length;

    public BlockingQueue(){
    }

    public synchronized void put(int data) throws InterruptedException {
        
    }

    public synchronized int take() throws InterruptedException {
        
    }

    public boolean isEmpty(){
        return length == 0;
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new BlockingQueue();
        
    }
}

Put() Implementation in Blocking Queue

This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.

public synchronized void put(int data) throws InterruptedException {
        System.out.println("put method called...");
        if(this.length > 9){
            System.out.println("Maximum capacity reached. Hence waiting for take() operation");
            wait();
        }
        Node node = new Node(data);
        if(isEmpty()){
            front = node;
        }else {
            rear.setNextNode(node);
        }
        rear = node;
        length++;
        System.out.println("Data added in the queue. Going to notify the observers.");
        notifyAll();
    }

Take() Implementation in Blocking Queue

The take() implementation is similar to deQueue() implementation in our custom Queue implementation article. This method blocks the thread during an element retrieval from an empty qeueue

public synchronized int take() throws InterruptedException {
        System.out.println("take() method called");
        int data;
        if(isEmpty()){
            System.out.println("No data found in the queue. take()() method execution paused.");
            wait();
        }
        System.out.println("Data became available. take() method resumed.");
        data = front.getData();
        front = front.getNextNode();
        length--;
        return data;
    }

Blocking Queue Implementation Runner

We have a main method implementation below. At first, take() method is called. As we are trying to retrieving from an empty queue, the thread gets blocked until some data is added into the queue. Once, the data is added the take() method resumes.

public static void main(String[] args) {
        BlockingQueue queue = new BlockingQueue();
        new Thread(() -> {
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                queue.put(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

    }
blocking-queue-result

Conclusion

In this article, we will be creating a custom implementation of Blocking Queue in Java. We provided a LinkedList implementation of it and performed multiple operations such as put() and take().

Share

If You Appreciate This, You Can Consider:

We are thankful for your never ending support.

About The Author

author-image
A technology savvy professional with an exceptional capacity to analyze, solve problems and multi-task. Technical expertise in highly scalable distributed systems, self-healing systems, and service-oriented architecture. Technical Skills: Java/J2EE, Spring, Hibernate, Reactive Programming, Microservices, Hystrix, Rest APIs, Java 8, Kafka, Kibana, Elasticsearch, etc.

Further Reading on Data Structure