Non-Blocking Queues

Потокобезпечні та найважливіше – неблокуючі імплементації Queue на зв'язаних нодах (linked nodes).

ConcurrentLinkedQueue<E> — тут використовується wait-free алгоритм, адаптований до роботи з garbage collector. Цей алгоритм є досить ефективним і дуже швидким, оскільки побудований на CAS. Метод size() може працювати довго, тому краще постійно його не смикати.

ConcurrentLinkedDeque<E> — Deque розшифровується як Double ended queue. Це означає, що дані можна додавати та витягувати з обох сторін. Відповідно, клас підтримує обидва режими роботи: FIFO (First In First Out) та LIFO (Last In First Out).

На практиці ConcurrentLinkedDeque варто використовувати в тому випадку, якщо обов'язково потрібне саме LIFO, тому що за рахунок двоспрямованості нод цей клас програє за продуктивністю наполовину порівняно з ConcurrentLinkedQueue.


import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Клас для додавання елементів до черги");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Елемент #" + i);
               System.out.println("Додаили: Елемент #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Клас для отримання елементів із черги");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Витягнули: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

2. Blocking Queues

Інтерфейс BlockingQueue<E> – при великій кількості даних ConcurrentLinkedQueue не вистачає.

Коли потоки не можуть впоратися із завданням, ти легко можеш отримати OutOfMemmoryException. Щоб такого не траплялося, у нас для роботи є BlockingQueue з наявністю різних методів для заповнення та роботи з чергою і блокуванням згідно з умовами.

BlockingQueue не визнає нульових елементів (null) і викликає NullPointerException при спробі додати або отримати такий елемент. Нульовий елемент повертає метод poll, якщо протягом таймауту в черзі не було розміщено черговий елемент.

3. Реалізації BlockingQueue<E>

Давай розберемо докладно кожну з реалізацій нашої BlockingQueue:

ArrayBlockingQueue<E> – клас блокувальної черги, який побудовано на класичному кільцевому буфері. Тут нам доступна можливість керувати "чесністю" блокувань. Якщо fair=false (за умовчанням), черговість роботи потоків не гарантується.

DelayQueue<E extends Delayed> – клас, який дозволяє витягувати елементи з черги лише після деякої затримки, визначеної в кожному елементі через метод getDelay інтерфейсу Delayed.

LinkedBlockingQueue<E> – блокуюча черга на зв'язаних нодах, реалізована на "two lock queue" алгоритмі: перший лок - на додавання, другий - на витягування елемента з черги. За рахунок локів, у порівнянні з ArrayBlockingQueue, даний клас має високу продуктивність, але для нього потрібна більша кількість пам'яті. Розмір черги визначається через конструктор і за умовчанням дорівнює Integer.MAX_VALUE.

PriorityBlockingQueue<E> – багатопотокова обгортка над PriorityQueue. Comparator відповідає за те, за якою логікою буде додано елемент. Першим із черги виходить найменший елемент.

SynchronousQueue<E> – черга працює за принципом FIFO(first-in-first-out). Кожна операція вставки блокує потік "Producer", поки потік "Consumer" не витягне елемент з черги і навпаки, "Consumer" буде чекати, поки "Producer" не вставить елемент.

BlockingDeque<E> – інтерфейс, який описує додаткові методи для двонаправленої блокуючої черги. Дані можна вставляти та витягувати з обох боків черги.

LinkedBlockingDeque<E> – двонаправлена блокуюча черга на зв'язаних нодах, реалізована як простий двонаправлений список з одним локом. Розмір черги визначається через конструктор і за замовчуванням дорівнює Integer.MAX_VALUE.

TransferQueue<E> – інтерфейс цікавий тим, що під час додавання елемента до черги існує можливість заблокувати вставляючий потік Producer доти, доки інший потік Consumer не витягне елемент з черги. Також можна додати перевірку на певний тайм-аут або виставити перевірку на наявність тих, що очікують Consumer. Як результат, ми отримуємо механізм передачі даних із підтримкою асинхронних та синхронних повідомлень.

LinkedTransferQueue<E> – реалізація TransferQueue на основі алгоритму Dual Queues with Slack. Активно використовує CAS (дивись вище) та паркування потоків, коли вони знаходяться в режимі очікування.