class QueueReentrantLock<T> { private volatile int size = 0; private final Object[] content; private final int capacity; private int out; private int in; private final ReentrantLock lock = new ReentrantLock(); private final Condition isEmpty = lock.newCondition(); private final Condition isFull = lock.newCondition(); QueueReentrantLock(int capacity) { try { lock.lock(); this.capacity = capacity; content = new Object[capacity]; out = 0; in = 0; } finally { lock.unlock(); } } private int cycleInc(int index) { return (++index == capacity) ? 0 : index; } @SuppressWarnings("unchecked") T get() throws InterruptedException { try { lock.lockInterruptibly(); if (size == 0) { while (size < 1) { isEmpty.await(); } } final Object value = content[out]; content[out] = null; if (size > 1) { out = cycleInc(out); } size--; isFull.signal(); return (T) value; } finally { lock.unlock(); } } QueueReentrantLock<T> put(T value) throws InterruptedException { try { lock.lockInterruptibly(); if (size == capacity) { while (size == capacity) { isFull.await(); } } if (size == 0) { content[in] = value; } else { in = cycleInc(in); content[in] = value; } size++; isEmpty.signal(); } finally { lock.unlock(); } return this; } }