JavaRush /Курси /C# SELF /ConcurrentBag і

ConcurrentBag і ConcurrentDictionary

C# SELF
Рівень 58 , Лекція 2
Відкрита

1. Вступ

ConcurrentBag<T> — це потокобезпечна, неупорядкована колекція. Її ключова особливість закладена в слові «Bag» („мішок“): колекція не гарантує порядок елементів під час вилучення. Тобто елемент, який ви дістанете, може не відповідати очікуваному порядку додавання. Натомість ConcurrentBag має унікальну оптимізацію, що робить її дуже швидкою в окремих сценаріях.

Особливості ConcurrentBag

Відсутність порядку: На відміну від черг (FIFO) і стеків (LIFO), ConcurrentBag не гарантує, що TryTake() поверне елемент у будь-якому конкретному порядку щодо того, як його було додано. Це ключова відмінність.

Оптимізація для локального доступу (Thread-Local Storage): Головний сенс використання ConcurrentBag — її продуктивність у сценаріях, коли потік, що додав елемент, із великою ймовірністю буде тим самим потоком, який його вилучить.

Приклад: ConcurrentBag — додавання та вилучення

using System.Collections.Concurrent;

ConcurrentBag<string> itemBag = new ConcurrentBag<string>();

// Додавання елементів
itemBag.Add("Пункт А");
itemBag.Add("Пункт Б");
itemBag.Add("Пункт В");

Console.WriteLine($"Елементів у мішку: {itemBag.Count}"); // Вивід: Елементів у мішку: 3

// Вилучення елементів (порядок не гарантується!)
if (itemBag.TryTake(out string item1))
{
    Console.WriteLine($"Вилучено: {item1}"); // Може бути "Пункт В", "Пункт Б" або "Пункт А"
}

if (itemBag.TryTake(out string item2))
{
    Console.WriteLine($"Вилучено: {item2}");
}

Console.WriteLine($"Залишилось елементів: {itemBag.Count}"); // Вивід: Залишилось елементів: 1

Можете запустити цей код кілька разів і побачите, що порядок вилучених елементів може змінюватися.

Методи Add(), TryTake()

Add(T item): використовують для додавання елемента в ConcurrentBag. Операція потокобезпечна.

TryTake(out T item): спроба вилучення елемента з ConcurrentBag. Повертає true, якщо елемент успішно вилучено, і false, якщо мішок порожній. Важливо, що TryTake не блокує потік.

2. Сценарії використання

ConcurrentBag — це не заміна ConcurrentQueue або ConcurrentStack. Вона найкраще підходить для окремих випадків:

Пули об’єктів/ресурсів: коли у вас є пул повторно використовуваних об’єктів, бажано, щоб потік, який повернув об’єкт, найчастіше забирав його знову. Це зменшує конкуренцію за спільний ресурс.

Динамічний розподіл завдань у TPL: внутрішній устрій таких конструкцій, як Parallel.ForEach і Parallel.For, використовує локальні мішки й механізм «work-stealing» для ефективного розподілу роботи.

Пул завдань із ConcurrentBag і оптимізацією локальності

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

ConcurrentBag<string> taskPool = new ConcurrentBag<string>();

// Заповнюємо пул початковими завданнями
for (int i = 0; i < 10; i++)
{
    taskPool.Add($"Завдання {i}");
}

void Worker()
{
    // Кожен потік намагається взяти завдання
    while (taskPool.TryTake(out string task))
    {
        Console.WriteLine($"Потік {Thread.CurrentThread.ManagedThreadId}: Обробляє {task}");
        Thread.Sleep(50); // Імітація роботи
    }
    Console.WriteLine($"Потік {Thread.CurrentThread.ManagedThreadId}: Завершив роботу.");
}

// Запускаємо кілька потоків-робітників
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // Даємо час на виконання

У цьому прикладі ConcurrentBag дозволяє потокам ефективно брати завдання, мінімізуючи блокування завдяки внутрішній структурі.

Внутрішня механіка

ConcurrentBag досягає високої продуктивності завдяки використанню потоково-локального сховища (TLS). Коли потік додає елемент, той потрапляє до структури, локальної для цього потоку. Під час TryTake() спершу читається локальна структура; якщо вона порожня — виконується «work-stealing» з інших потоків або глобального пулу. Це зменшує конкуренцію й робить колекцію вдалим вибором, коли важлива локальність доступу, а порядок не має значення.

3. Потокобезпечний словник

ConcurrentDictionary<TKey, TValue> — одна з найчастіше використовуваних потокобезпечних колекцій: високопродуктивний словник для безпечних операцій додавання, читання, оновлення й видалення з кількох потоків.

Звичайний Dictionary<TKey, TValue> абсолютно не потокобезпечний. Будь-який запис (додавання/зміна/видалення) або навіть читання під час запису може призвести до винятків (InvalidOperationException) чи пошкодження даних.

Приклад: проблема звичайного Dictionary (нагадування)

using System.Collections.Generic;
using System.Threading.Tasks;

Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();

void AddToDict(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        // Спроба одночасного додавання/зміни
        // Призведе до винятків або некоректної поведінки
        concurrentDictProblem[start + i] = start + i;
    }
}

//Запуск прикладу в Main:
try
{
    Task t1 = Task.Run(() => AddToDict(0, 10000));
    Task t2 = Task.Run(() => AddToDict(5000, 10000)); // Перетин ключів
    Task.WaitAll(t1, t2);
    Console.WriteLine($"Елементів у словнику (проблемному): {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
    Console.WriteLine($"Помилка у звичайному словнику: {ex.Message}");
}

Цей код майже гарантовано викине виняток або зависне через проблеми з потокобезпекою.

4. Базові операції

ConcurrentDictionary надає спеціалізовані атомарні операції «перевірка + дія».

TryAdd(TKey key, TValue value): атомарно додає пару ключ-значення. Повертає true, якщо ключ було додано, і false, якщо ключ уже існує.

ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
    Console.WriteLine("Alice додана."); // Вивід: Alice додана.
if (!scores.TryAdd("Alice", 150))
    Console.WriteLine("Alice вже є."); // Вивід: Alice вже є.

TryGetValue(TKey key, out TValue value): атомарно отримує значення за ключем.

if (scores.TryGetValue("Alice", out int aliceScore))
    Console.WriteLine($"Рахунок Alice: {aliceScore}"); // Вивід: Рахунок Alice: 100

TryUpdate(TKey key, TValue newValue, TValue comparisonValue): атомарно оновлює значення, лише якщо поточне дорівнює comparisonValue. Запобігає станам гонки.

// Поточне значення Alice = 100
if (scores.TryUpdate("Alice", 120, 100)) // Оновить 100 на 120
    Console.WriteLine("Рахунок Alice оновлено до 120."); // Вивід: Рахунок Alice оновлено до 120.
if (!scores.TryUpdate("Alice", 130, 100)) // Не оновить, бо поточне 120, а не 100
    Console.WriteLine("Рахунок Alice не оновлено (застарілі дані)."); // Вивід: ...

TryRemove(TKey key, out TValue value): атомарно видаляє елемент за ключем.

if (scores.TryRemove("Alice", out int removedScore))
    Console.WriteLine($"Alice видалена, рахунок був: {removedScore}"); // Вивід: Alice видалена, рахунок був: 120

5. Розширені атомарні операції

Ці два методи — ключові для ConcurrentDictionary: вони покривають безліч сценаріїв.

GetOrAdd(TKey key, TValue valueFactory(TKey key)): атомарно повертає існуюче значення за ключем або створює й додає нове через фабрику. Ідеально підходить для кешів і унікальних сутностей.

// Припустимо, ми кешуємо важкі об'єкти
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();

HeavyObject GetOrCreateHeavyObject(int id)
{
    // Якщо вже є — поверне його, інакше створить і додасть
    return objectCache.GetOrAdd(id, (key) =>
    {
        Console.WriteLine($"Створюємо новий HeavyObject для ID: {key}");
        return new HeavyObject(key); // Імітація створення дорогого об'єкта
    });
}

// У Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // Створить новий
HeavyObject obj2 = GetOrCreateHeavyObject(2); // Створить новий
HeavyObject obj3 = GetOrCreateHeavyObject(1); // Поверне наявний obj1

AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): атомарно додає значення, якщо ключ відсутній, або оновлює наявне через фабрику.

  • addValue: значення для додавання, якщо ключ не знайдено.
  • updateValueFactory: функція, що обчислює нове значення на основі ключа й поточного значення.
// Підрахунок кількості відвідувань сторінки
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();

void IncrementPageView(string page)
{
    pageViews.AddOrUpdate(page, 1, // Якщо сторінка нова, додати 1
                          (key, existingVal) => existingVal + 1); // Інакше збільшити на 1
    Console.WriteLine($"Сторінку '{page}' відвідали {pageViews[page]} разів.");
}

// У Main:
IncrementPageView("Home");   // Home: 1
IncrementPageView("About");  // About: 1
IncrementPageView("Home");   // Home: 2
IncrementPageView("Home");   // Home: 3
IncrementPageView("Contact"); // Contact: 1

6. Приклади використання для кешування або керування станами

Кешування даних: ConcurrentDictionary — чудовий вибір для in-memory кешу: GetOrAdd запобігає повторному створенню дорогих об’єктів.

Керування користувацькими сеансами: безпечне зберігання й оновлення даних сеансів із різних запитів.

Підрахунок статистики: за допомогою AddOrUpdate зручно інкрементувати лічильники подій, переглядів, голосів тощо.

Реєстри/Service Locator: зберігання зареєстрованих сервісів або плагінів, доступних із різних потоків.

ConcurrentDictionary<TKey, TValue> — високооптимізована колекція, що суттєво спрощує багатопотокову розробку зі словниками завдяки набору атомарних операцій без ручної синхронізації.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ