/******************************************************************************
* File: RingBuffer.java
* Author: Keith Schwarz (htiek@cs.stanford.edu)
*
* An implementation of a synchronized queue backed by a ring buffer. This
* functionality and implementation is similar to the ArrayBlockingQueue class,
* but I thought that I'd implement my own version to get a better feel for how
* it works.
*
* A ring buffer is a space-efficient, locality-friendly implementation of a
* FIFO queue. It is implemented as a fixed-sized array that is treated as
* though it wraps around like a ring; it has no well-defined start or end
* point. This array stores two pointers, a read pointer and a write pointer,
* delineating where the next insert should take place and from where the next
* element should be dequeued. For example:
*
* [2] [3] [ ] [ ] [ ] [ ] [0] [1]
* ^ ^
* | |
* write read
*
* When using a ring buffer, one must be careful not to let the read and write
* pointers cross one another. If this happens, future write operations will
* start overwriting old elements that have not yet been consumed. For this
* reason, most ring buffers adopt one of two strategies. First, the ring
* buffer can increase its size whenever it runs out of room. This approach
* allows the buffer to grow arbitrarily large if need be. The second option,
* and the one used in this implementation, is simply to block on a read or
* write when data is not available. This allows the ring buffer to implement
* the producer/consumer pattern fairly easily; any number of threads can begin
* creating data while some number of threads consume it, and at no time are
* too many elements kept in memory waiting to be read.
*/
public final class RingBuffer<T> {
/* The actual ring buffer. */
private final T[] elements;
/* The write pointer, represented as an offset into the array. */
private int offset = 0;
/* The read pointer is encoded implicitly by keeping track of the number of
* unconsumed elements. We can then determine its position by backing up
* that many positions before the read position.
*/
private int unconsumedElements = 0;
/**
* Constructs a new RingBuffer with the specified capacity, which must be
* positive.
*
* @param size The capacity of the new ring buffer.
* @throws IllegalArgumentException If the capacity is negative.
*/
@SuppressWarnings("unchecked")
public RingBuffer(int size) {
/* Validate the size. */
if (size <= 0)
throw new IllegalArgumentException("RingBuffer capacity must be positive.");
/* Construct the array to be that size. */
elements = (T[]) new Object[size];
}
/**
* Appends an element to the ring buffer, blocking until space becomes
* available.
*
* @param elem The element to add to the ring buffer.
* @throws InterruptedException If the thread is interrupted before the
* insertion completes.
*/
public synchronized void add(T elem) throws InterruptedException {
/* Block until the capacity is nonzero. Otherwise we don't have any
* space to write.
*/
while (unconsumedElements == elements.length)
wait();
/* Write the element into the next open spot, then advance the write
* pointer forward a step.
*/
elements[offset] = elem;
offset = (offset + 1) % elements.length;
/* Increase the number of unconsumed elements by one, then notify any
* threads that are waiting that more data is now available.
*/
++unconsumedElements;
notifyAll();
}
/**
* Returns the maximum capacity of the ring buffer.
*
* @return The maximum capacity of the ring buffer.
*/
public int capacity() {
return elements.length;
}
/**
* Observes, but does not dequeue, the next available element, blocking
* until data becomes available.
*
* @return The next available element.
* @throws InterruptedException If the caller is interrupted before data
* becomes available.
*/
public synchronized T peek() throws InterruptedException {
/* Wait for data to become available. */
while (unconsumedElements == 0)
wait();
/* Hand back the next value. The index of this next value is a bit
* tricky to compute. We know that there are unconsumedElements
* elements waiting to be read, and they're contiguously before the
* write position. However, the buffer wraps around itself, and so we
* can't just do a naive subtraction; that might end up giving us a
* negative index. To avoid this, we'll use a clever trick in which
* we'll add to the index the capacity minus the distance. This value
* must be positive, since the distance is never greater than the
* capacity, and if we then wrap this value around using the modulus
* operator we'll end up with a valid index. All of this machinery
* works because
*
* (x + (n - k)) mod n == (x - k) mod n
*
* And Java's modulus operator works best on positive values.
*/
return elements[(offset + (capacity() - unconsumedElements)) % capacity()];
}
/**
* Removes and returns the next available element, blocking until data
* becomes available.
*
* @return The next available element
* @throws InterruptedException If the caller is interrupted before data
* becomes available.
*/
public synchronized T remove() throws InterruptedException {
/* Use peek() to get the element to return. */
T result = peek();
/* Mark that one fewer elements are now available to read. */
--unconsumedElements;
/* Because there is more space left, wake up any waiting threads. */
notifyAll();
return result;
}
/**
* Returns the number of elements that are currently being stored in the
* ring buffer.
*
* @return The number of elements currently stored in the ring buffer.
*/
public synchronized int size() {
return unconsumedElements;
}
/**
* Returns whether the ring buffer is empty.
*
* @return Whether the ring buffer is empty.
*/
public synchronized boolean isEmpty() {
return size() == 0;
}
}