JavaRush /Курси /C# SELF /Вступ до Concurrent...

Вступ до Concurrent-колекцій

C# SELF
Рівень 58 , Лекція 0
Відкрита

1. Передісторія проблеми

В однопотоковому застосунку колекції, такі, як List<T> і Dictionary<T>, працюють передбачувано. Та щойно до однієї й тієї самої колекції одночасно звертаються кілька потоків, виникає знайома біда: гонка даних (race conditions).

Якщо кілька потоків намагаються читати та/або записувати дані у одну й ту саму колекцію без належної синхронізації, можна отримати:

  • Некоректні дані: елемент міг бути видалений одним потоком, поки інший намагався його оновити.
  • Втрата даних: один потік додав елемент, а інший перезаписав його, не знаючи про попередній запис.
  • Винятки: колекція може опинитися у некоректному стані, і ви отримаєте InvalidOperationException (наприклад, "Collection was modified; enumeration operation may not execute.") або навіть NullReferenceException.

Приклад 1: Гонка даних у List<T> (просте додавання)

Два потоки одночасно інкрементують один і той самий елемент списку.

using System.Collections.Generic;
using System.Threading.Tasks; // Для Task.Run

class RaceConditionExample
{
    static List<int> numbers = new List<int> { 0 }; // Список з одним елементом

    static void Main(string[] args)
    {
        Console.WriteLine("Початкове значення: " + numbers[0]); // 0

        // Стартуємо два потоки, кожен інкрементує numbers[0]
        Task task1 = Task.Run(() => IncrementNumbers(500_000));
        Task task2 = Task.Run(() => IncrementNumbers(500_000));

        Task.WaitAll(task1, task2); // Чекаємо завершення обох потоків

        Console.WriteLine("Кінцеве значення: " + numbers[0]); // Очікуємо 1_000_000, але...
        // Результат майже завжди буде меншим за 1_000_000!
    }

    static void IncrementNumbers(int count)
    {
        for (int i = 0; i < count; i++)
        {
            // Ця операція "numbers[0]++" насправді складається з трьох кроків:
            // 1. Прочитати numbers[0]
            // 2. Збільшити значення на 1
            // 3. Записати нове значення назад у numbers[0]
            numbers[0]++; 
        }
    }
}

Чому це гонка? Якщо потік A прочитав numbers[0] (значення 0), а потім потік B прочитав numbers[0] (теж 0) до того, як A встиг записати 1, то обидва потоки збільшать 0 до 1 і запишуть 1. Одне інкрементування втрачається. Операція numbers[0]++ не атомарна.

Приклад 2: InvalidOperationException під час зміни Dictionary

Один потік перебирає словник, інший — змінює його.

using System.Collections.Generic;
using System.Threading; // Для Thread.Sleep

class DictionaryRaceExample
{
    static Dictionary<int, string> users = new Dictionary<int, string>();

    static void Main(string[] args)
    {
        // Ініціалізація словника
        for (int i = 0; i < 5; i++) users.Add(i, $"User {i}");

        // Потік-читач
        Thread readerThread = new Thread(() =>
        {
            try
            {
                foreach (var user in users) // Перебирання словника
                {
                    Console.WriteLine($"Читач: {user.Key} - {user.Value}");
                    Thread.Sleep(10); // Імітація роботи
                }
            }
            catch (InvalidOperationException ex)
            {
                Console.WriteLine($"Читач: ПОМИЛКА! {ex.Message}");
            }
        });

        // Потік-записувач
        Thread writerThread = new Thread(() =>
        {
            Thread.Sleep(5); // Дамо читачеві трохи часу, щоб почати
            for (int i = 5; i < 10; i++)
            {
                users.Add(i, $"New User {i}"); // Додаємо елементи
                Console.WriteLine($"Записувач: Додав User {i}");
                Thread.Sleep(15);
            }
        });

        readerThread.Start();
        writerThread.Start();

        readerThread.Join(); // Чекаємо завершення потоків
        writerThread.Join();
        Console.WriteLine("Приклад завершено.");
    }
}

Чому трапляється помилка? Dictionary<TKey, TValue> (як і List<T>) не призначений для одночасного читання й запису різними потоками без синхронізації. Коли потік-записувач змінює внутрішню структуру, потік-читач продовжує foreach за вже зміненими даними, що призводить до InvalidOperationException.

2. Чому прості блокування (lock) не завжди оптимальні?

Ідея «обгорнути усе у lock» здається простою, але має свої мінуси:

// Поганий приклад: занадто багато блокувань
// (Лише для демонстрації, так робити не варто!)
static object _lock = new object();
static List<int> _sharedList = new List<int>();

void AddItem(int item)
{
    lock (_lock)
    {
        _sharedList.Add(item);
    }
}

int GetItemCount()
{
    lock (_lock)
    {
        return _sharedList.Count;
    }
}
  • Продуктивність (вузьке місце): lock блокує доступ до всієї колекції. Якщо потоків 100, то 99 чекатимуть один, навіть якщо операції напряму не конфліктують.
  • Складність: треба памʼятати про lock у кожному місці використання колекції. Одне забуте місце — і гонка даних повернеться.
  • Взаємоблокування: кілька lock на різних обʼєктах легко призводять до deadlock.
  • Ітератори: foreach не рятує, якщо інший потік модифікує колекцію.

Тому у .NET зʼявилися спеціальні потокобезпечні колекції.

Атомарні операції

Потокобезпечна колекція гарантує коректну роботу за одночасного доступу з кількох потоків без зовнішніх блокувань з боку користувача. Ключ — атомарні операції: дія виконується повністю або не виконується взагалі — інші потоки не бачать «проміжних станів».

  • Додавання, видалення, читання — поводяться так, ніби виконуються по одному.
  • Всередині використовуються низькорівневі техніки: інтерлоковані операції (Interlocked), Compare-And-Swap (CAS), тонкі локи — замість глобального блокування всієї колекції.

3. Огляд System.Collections.Concurrent

Простір імен System.Collections.Concurrent надає набір колекцій, створених з нуля для багатопоточності. Їхня філософія — максимальний паралелізм і мінімум блокувань.

  • Продуктивність: масштабується зі зростанням кількості ядер.
  • Простота: не треба вручну ставити lock навколо кожної операції.
  • Менше помилок: зникає цілий клас проблем із ручною синхронізацією.
  • Оптимізація під конкуренцію: оптимізовані для конкурентного доступу, ефективно працюють за одночасних додавань й видалень.

4. Основні класи

ConcurrentQueue<T> (потокобезпечна черга)

Принцип: FIFO — «першим прийшов, першим пішов». Сценарії: producer–consumer, логування, черги задач.

using System.Collections.Concurrent;

ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

void Producer() => messageQueue.Enqueue("Повідомлення 1");

void Consumer()
{
    if (messageQueue.TryDequeue(out string message))
    {
        Console.WriteLine($"Оброблено: {message}");
    }
    else
    {
        Console.WriteLine("Черга порожня.");
    }
}

ConcurrentStack<T> (потокобезпечний стек)

Принцип: LIFO — «останнім прийшов, першим пішов». Сценарії: історія дій, DFS-обхід, пули обʼєктів.

using System.Collections.Concurrent;

ConcurrentStack<int> historyStack = new ConcurrentStack<int>();

void PushAction(int value) => historyStack.Push(value);

void PopAction()
{
    if (historyStack.TryPop(out int action))
    {
        Console.WriteLine($"Скасовано дію: {action}");
    }
    else
    {
        Console.WriteLine("Стек порожній.");
    }
}

ConcurrentBag<T> (потокобезпечний «мішок»)

Неупорядкована колекція, порядок не гарантується. Оптимізована для сценарію «потік частіше бере те, що сам поклав». Чудово підходить для пулів.

using System.Collections.Concurrent;

ConcurrentBag<System.Guid> objectPool = new ConcurrentBag<System.Guid>();

void AddObject() => objectPool.Add(System.Guid.NewGuid());

void TakeObject()
{
    if (objectPool.TryTake(out System.Guid obj))
    {
        Console.WriteLine($"Взяли обʼєкт: {obj}");
    }
    else
    {
        Console.WriteLine("Пул порожній.");
    }
}

ConcurrentDictionary<TKey, TValue> (потокобезпечний словник)

Підтримує атомарні операції додавання, оновлення та отримання значення за ключем. Чудово підходить для кешів, сесій, лічильників.

using System.Collections.Concurrent;

ConcurrentDictionary<string, int> userScores = new ConcurrentDictionary<string, int>();

void UpdateScore(string user, int score)
{
    // Атомарно додасть, якщо немає, або оновить, якщо є
    userScores.AddOrUpdate(user, score, (key, existingVal) => existingVal + score);
    Console.WriteLine($"Рахунок {user}: {userScores[user]}");
}

void GetScore(string user)
{
    if (userScores.TryGetValue(user, out int score))
    {
        Console.WriteLine($"Поточний рахунок {user}: {score}");
    }
    else
    {
        Console.WriteLine($"Користувача {user} не знайдено.");
    }
}

5. Коли використовувати ці колекції замість звичайних?

  • Застосунок багатопотоковий: за одного потоку звичайні колекції швидші (немає накладних витрат).
  • Одна спільна колекція для кількох потоків: ключова ознака, що варто брати System.Collections.Concurrent.
  • Потрібна висока продуктивність і масштабованість: колекції спроєктовані для мінімального очікування.
  • Хочеться спростити код: без ручних lock-блоків навколо кожної операції.
  • Потрібні атомарні операції: додавання/видалення/отримання не залишать колекцію в неузгодженому стані.

Не використовуйте Concurrent-колекції, коли:

  • Застосунок суто однопотоковий.
  • Потрібна «транзакційність» кількох пов’язаних операцій (може знадобитися зовнішня синхронізація чи інші механізми).
  • Важливий суворий порядок вилучення там, де він не гарантується (наприклад, у ConcurrentBag<T>).
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ