JavaRush /Курсы /C# SELF /Введение в Concurrent

Введение в Concurrent-коллекции

C# SELF
58 уровень , 0 лекция
Открыта

1. Предыстория проблемы

В однопоточном приложении коллекции, такие как List<T>, Dictionary<T>, работают предсказуемо. Но как только к одной и той же коллекции начинают обращаться несколько потоков одновременно, возникает уже знакомая проблема: гонка данных (race conditions).

Если несколько потоков пытаются читать и/или записывать данные в одну и ту же коллекцию без должной синхронизации, вы можете получить:

  • Некорректные данные: элемент мог быть удалён одним потоком, пока другой пытался его обновить.
  • Потеря данных: один поток добавил элемент, а другой перезаписал его, не зная о предыдущей записи.
  • Исключения: коллекция может оказаться в невалидном состоянии, и вы получите InvalidOperationException (например, "Collection was modified; enumeration operation may not execute.") или даже NullReferenceException.

Пример 1: Гонка данных в List<T> (простое сложение)

Два потока одновременно инкрементируют один и тот же элемент списка.

using System.Collections.Generic;
using System.Threading.Tasks; // Для Task.Run

class RaceConditionExample
{
    static List<int> numbers = new List<int> { 0 }; // Список с одним элементом

    static void Main(string[] args)
    {
        Console.WriteLine("Начальное значение: " + numbers[0]); // 0

        // Запускаем два потока, каждый из которых инкрементирует numbers[0]
        Task task1 = Task.Run(() => IncrementNumbers(500_000));
        Task task2 = Task.Run(() => IncrementNumbers(500_000));

        Task.WaitAll(task1, task2); // Ждем завершения обоих потоков

        Console.WriteLine("Конечное значение: " + numbers[0]); // Ожидаем 1_000_000, но...
        // Результат почти всегда будет меньше 1_000_000!
    }

    static void IncrementNumbers(int count)
    {
        for (int i = 0; i < count; i++)
        {
            // Эта операция "numbers[0]++" на самом деле состоит из 3-х шагов:
            // 1. Прочитать numbers[0]
            // 2. Увеличить значение на 1
            // 3. Записать новое значение обратно в numbers[0]
            numbers[0]++; 
        }
    }
}

Почему это гонка? Если поток A прочитал numbers[0] (значение 0), а затем поток B прочитал numbers[0] (тоже 0) до того, как A успел записать 1, то оба потока увеличат 0 до 1 и запишут 1. Одно инкрементирование теряется. Операция numbers[0]++ не атомарна.

Пример 2: InvalidOperationException при изменении Dictionary

Один поток итерирует словарь, другой — изменяет его.

using System.Collections.Generic;
using System.Threading; // Для Thread.Sleep

class DictionaryRaceExample
{
    static Dictionary<int, string> users = new Dictionary<int, string>();

    static void Main(string[] args)
    {
        // Инициализация словаря
        for (int i = 0; i < 5; i++) users.Add(i, $"User {i}");

        // Поток-читатель
        Thread readerThread = new Thread(() =>
        {
            try
            {
                foreach (var user in users) // Итерация по словарю
                {
                    Console.WriteLine($"Читатель: {user.Key} - {user.Value}");
                    Thread.Sleep(10); // Имитация работы
                }
            }
            catch (InvalidOperationException ex)
            {
                Console.WriteLine($"Читатель: ОШИБКА! {ex.Message}");
            }
        });

        // Поток-писатель
        Thread writerThread = new Thread(() =>
        {
            Thread.Sleep(5); // Даем читателю немного времени начать
            for (int i = 5; i < 10; i++)
            {
                users.Add(i, $"New User {i}"); // Добавляем элементы
                Console.WriteLine($"Писатель: Добавил User {i}");
                Thread.Sleep(15);
            }
        });

        readerThread.Start();
        writerThread.Start();

        readerThread.Join(); // Ждем завершения потоков
        writerThread.Join();
        Console.WriteLine("Пример завершен.");
    }
}

Почему происходит ошибка? Dictionary<TKey, TValue> (как и List<T>) не предназначен для одновременного чтения и записи разными потоками без синхронизации. Когда поток-писатель меняет внутреннюю структуру, поток-читатель продолжает foreach по уже изменённым данным, что приводит к InvalidOperationException.

2. Почему простые блокировки (lock) не всегда оптимальны?

Идея «обернуть всё в lock» выглядит просто, но имеет свои минусы:

// Плохой пример: слишком много блокировки
// (Только для демонстрации, так делать не стоит!)
static object _lock = new object();
static List<int> _sharedList = new List<int>();

void AddItem(int item)
{
    lock (_lock)
    {
        _sharedList.Add(item);
    }
}

int GetItemCount()
{
    lock (_lock)
    {
        return _sharedList.Count;
    }
}
  • Производительность (bottleneck): lock блокирует доступ ко всей коллекции. При 100 потоках 99 будут ждать одного, даже если операции не конфликтуют напрямую.
  • Сложность: нужно помнить о lock в каждом месте использования коллекции. Одно забытое место — и гонка данных вернулась.
  • Взаимоблокировки: несколько lock на разных объектах легко приводят к deadlock.
  • Итераторы: foreach не спасает, если другой поток модифицирует коллекцию.

Поэтому в .NET были введены специальные потокобезопасные коллекции.

Атомарные операции

Потокобезопасная коллекция — гарантирует корректную работу при одновременном доступе из нескольких потоков без внешних блокировок пользователем. Ключ — атомарные операции: действие выполняется целиком либо не выполняется вовсе — другие потоки не видят «полусостояний».

  • Добавление, удаление, чтение — ведут себя так, будто выполняются по одному.
  • Внутри используются низкоуровневые техники: интерлокированные операции (Interlocked), Compare-And-Swap (CAS), тонкие локи — вместо глобальной блокировки всей коллекции.

3. Обзор System.Collections.Concurrent

Пространство имён System.Collections.Concurrent предоставляет набор коллекций, созданных с нуля для многопоточности. Их философия — максимальный параллелизм и минимум блокировок.

  • Производительность: масштабируются с ростом числа ядер.
  • Простота: не нужно вручную ставить lock вокруг каждой операции.
  • Меньше ошибок: уходит класс проблем с ручной синхронизацией.
  • Оптимизация под конкуренцию: эффективно работают при одновременных добавлениях/удалениях.

4. Основные классы

ConcurrentQueue<T> (потокобезопасная очередь)

Принцип: FIFO — «первый пришёл, первый ушёл». Сценарии: producer–consumer, логирование, очереди задач.

using System.Collections.Concurrent;

ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

void Producer() => messageQueue.Enqueue("Сообщение 1");

void Consumer()
{
    if (messageQueue.TryDequeue(out string message))
    {
        Console.WriteLine($"Обработано: {message}");
    }
    else
    {
        Console.WriteLine("Очередь пуста.");
    }
}

ConcurrentStack<T> (потокобезопасный стек)

Принцип: LIFO — «последний пришёл, первый ушёл». Сценарии: история действий, DFS-обход, пулы объектов.

using System.Collections.Concurrent;

ConcurrentStack<int> historyStack = new ConcurrentStack<int>();

void PushAction(int value) => historyStack.Push(value);

void PopAction()
{
    if (historyStack.TryPop(out int action))
    {
        Console.WriteLine($"Отменено действие: {action}");
    }
    else
    {
        Console.WriteLine("Стек пуст.");
    }
}

ConcurrentBag<T> (потокобезопасный «мешок»)

Неупорядоченная коллекция, порядок не гарантируется. Оптимизирована под сценарий «поток чаще берёт то, что сам положил». Отлично подходит для пулов.

using System.Collections.Concurrent;

ConcurrentBag<System.Guid> objectPool = new ConcurrentBag<System.Guid>();

void AddObject() => objectPool.Add(System.Guid.NewGuid());

void TakeObject()
{
    if (objectPool.TryTake(out System.Guid obj))
    {
        Console.WriteLine($"Взяли объект: {obj}");
    }
    else
    {
        Console.WriteLine("Пул пуст.");
    }
}

ConcurrentDictionary<TKey, TValue> (потокобезопасный словарь)

Поддерживает атомарные операции добавления, обновления и получения значения по ключу. Отлично для кэшей, сессий, счётчиков.

using System.Collections.Concurrent;

ConcurrentDictionary<string, int> userScores = new ConcurrentDictionary<string, int>();

void UpdateScore(string user, int score)
{
    // Атомарно добавит, если нет, или обновит, если есть
    userScores.AddOrUpdate(user, score, (key, existingVal) => existingVal + score);
    Console.WriteLine($"Счет {user}: {userScores[user]}");
}

void GetScore(string user)
{
    if (userScores.TryGetValue(user, out int score))
    {
        Console.WriteLine($"Текущий счет {user}: {score}");
    }
    else
    {
        Console.WriteLine($"Пользователь {user} не найден.");
    }
}

5. Когда использовать эти коллекции вместо обычных?

  • Приложение многопоточное: при одном потоке обычные коллекции быстрее (нет накладных расходов).
  • Одна общая коллекция для нескольких потоков: ключевой признак к использованию System.Collections.Concurrent.
  • Нужна высокая производительность и масштабирование: коллекции спроектированы под минимум ожиданий.
  • Хочется упростить код: без ручных lock-блоков вокруг каждой операции.
  • Нужны атомарные операции: добавление/удаление/получение не оставят коллекцию в неконсистентном состоянии.

Не используйте Concurrent-коллекции, когда:

  • Приложение строго однопоточное.
  • Нужна «транзакционность» нескольких связанных операций (возможно, потребуется внешняя синхронизация или другие механизмы).
  • Важен строгий порядок извлечения там, где он не гарантируется (например, в ConcurrentBag<T>).
2
Задача
C# SELF, 58 уровень, 0 лекция
Недоступна
Добавление элементов в ConcurrentQueue
Добавление элементов в ConcurrentQueue
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ