Non-Blocking Queues

Потокобезпечні та найважливіше – неблокуючі імплементації Queue на зв'язаних нодах (linked nodes).
ConcurrentLinkedQueue<E> — тут використовується wait-free алгоритм, адаптований до роботи з garbage collector. Цей алгоритм є досить ефективним і дуже швидким, оскільки побудований на CAS. Метод size() може працювати довго, тому краще постійно його не смикати.
ConcurrentLinkedDeque<E> — Deque розшифровується як Double ended queue. Це означає, що дані можна додавати та витягувати з обох сторін. Відповідно, клас підтримує обидва режими роботи: FIFO (First In First Out) та LIFO (Last In First Out).
На практиці ConcurrentLinkedDeque варто використовувати в тому випадку, якщо обов'язково потрібне саме LIFO, тому що за рахунок двоспрямованості нод цей клас програє за продуктивністю наполовину порівняно з ConcurrentLinkedQueue.
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
}
}
class Producer implements Runnable {
ConcurrentLinkedQueue<String> queue;
Producer(ConcurrentLinkedQueue<String> queue){
this.queue = queue;
}
public void run() {
System.out.println("Клас для додавання елементів до черги");
try {
for (int i = 1; i < 5; i++) {
queue.add("Елемент #" + i);
System.out.println("Додаили: Елемент #" + i);
Thread.sleep(300);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
ConcurrentLinkedQueue<String> queue;
Consumer(ConcurrentLinkedQueue<String> queue){
this.queue = queue;
}
public void run() {
String str;
System.out.println("Клас для отримання елементів із черги");
for (int x = 0; x < 5; x++) {
while ((str = queue.poll()) != null) {
System.out.println("Витягнули: " + str);
}
try {
Thread.sleep(600);
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
}
2. Blocking Queues

Інтерфейс BlockingQueue<E> – при великій кількості даних ConcurrentLinkedQueue не вистачає.
Коли потоки не можуть впоратися із завданням, ти легко можеш отримати OutOfMemmoryException. Щоб такого не траплялося, у нас для роботи є BlockingQueue з наявністю різних методів для заповнення та роботи з чергою і блокуванням згідно з умовами.
BlockingQueue не визнає нульових елементів (null) і викликає NullPointerException при спробі додати або отримати такий елемент. Нульовий елемент повертає метод poll, якщо протягом таймауту в черзі не було розміщено черговий елемент.
3. Реалізації BlockingQueue<E>
Давай розберемо докладно кожну з реалізацій нашої BlockingQueue:
ArrayBlockingQueue<E> – клас блокувальної черги, який побудовано на класичному кільцевому буфері. Тут нам доступна можливість керувати "чесністю" блокувань. Якщо fair=false (за умовчанням), черговість роботи потоків не гарантується.
DelayQueue<E extends Delayed> – клас, який дозволяє витягувати елементи з черги лише після деякої затримки, визначеної в кожному елементі через метод getDelay інтерфейсу Delayed.
LinkedBlockingQueue<E> – блокуюча черга на зв'язаних нодах, реалізована на "two lock queue" алгоритмі: перший лок - на додавання, другий - на витягування елемента з черги. За рахунок локів, у порівнянні з ArrayBlockingQueue, даний клас має високу продуктивність, але для нього потрібна більша кількість пам'яті. Розмір черги визначається через конструктор і за умовчанням дорівнює Integer.MAX_VALUE.
PriorityBlockingQueue<E> – багатопотокова обгортка над PriorityQueue. Comparator відповідає за те, за якою логікою буде додано елемент. Першим із черги виходить найменший елемент.
SynchronousQueue<E> – черга працює за принципом FIFO(first-in-first-out). Кожна операція вставки блокує потік "Producer", поки потік "Consumer" не витягне елемент з черги і навпаки, "Consumer" буде чекати, поки "Producer" не вставить елемент.
BlockingDeque<E> – інтерфейс, який описує додаткові методи для двонаправленої блокуючої черги. Дані можна вставляти та витягувати з обох боків черги.
LinkedBlockingDeque<E> – двонаправлена блокуюча черга на зв'язаних нодах, реалізована як простий двонаправлений список з одним локом. Розмір черги визначається через конструктор і за замовчуванням дорівнює Integer.MAX_VALUE.
TransferQueue<E> – інтерфейс цікавий тим, що під час додавання елемента до черги існує можливість заблокувати вставляючий потік Producer доти, доки інший потік Consumer не витягне елемент з черги. Також можна додати перевірку на певний тайм-аут або виставити перевірку на наявність тих, що очікують Consumer. Як результат, ми отримуємо механізм передачі даних із підтримкою асинхронних та синхронних повідомлень.
LinkedTransferQueue<E> – реалізація TransferQueue на основі алгоритму Dual Queues with Slack. Активно використовує CAS (дивись вище) та паркування потоків, коли вони знаходяться в режимі очікування.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ