Коли ваш додаток стикається з довготривалими обчисленнями, такими як обробка зображень, відправка листів або взаємодія з зовнішніми API, блокування основної веб-сесії призводить до уповільнення роботи системи і погіршення користувацького досвіду. Це те саме, якби ви прийшли в кав’ярню, зробили замовлення, і офіціант стояв біля плити, поки ваша страва готується, ігноруючи інших клієнтів.
Celery вирішує цю проблему. Він відділяє виконання таких "важких операцій" у фонові процеси, щоб основний додаток продовжував свою роботу, відповідаючи на запити миттєво.
Celery будується на трьох основних компонентах:
- Продюсер (Producer) — це ваш веб-додаток (наприклад, FastAPI або Django), який створює задачі і відправляє їх у чергу. Тобто воно каже: "Ось задача, хтось, будь ласка, займіться цим".
- Брокер повідомлень (Message Broker) — посередник, через який передаються задачі. Це може бути Redis, RabbitMQ або інший інструмент. Він як поштовий сервіс: приймає задачі від продюсера і передає їх воркерам.
- Воркери (Workers) — фонові процеси, які отримують задачі з черги, виконують їх і (якщо потрібно) повертають результат. Вони як виконавці, які сидять і чекають: "Ну ж бо, дайте роботу!"
Потік роботи
graph TD
A[Клієнт] --> B{"FastAPI / Django
(Producer)"};
B -- 1 Надсилає задачу task.delay() --> C((Broker
Redis / RabbitMQ));
C -- 2 Зберігає задачу --> C;
C -- 3 Віддає задачу --> D[Celery Worker];
D -- 4 Виконує задачу --> D;
D -- 5 Зберігає результат (опц.) --> E((Result
Backend));
B -- 6 Запитує результат (опц.) --> E;
style D fill:#cfc,stroke:#333;
Ваш додаток (B) швидко віддає задачу брокеру (C) і відповідає клієнту. Воркер (D) у цей час незалежно виконує важку роботу.
Створення і реєстрація задач у Celery
Для створення задачі в Celery використовується декоратор @celery.task. Цей декоратор каже вашому додатку: "Гей, Celery, ось задача, яку я хочу виконувати у фоновому режимі!"
Приклад для Django-проєкту:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
Приклад для FastAPI:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def multiply(a, b):
return a * b
Тепер, замість виконання функції напряму, задача надсилається в чергу, наприклад:
# Django: викликаємо задачу
add.delay(3, 7)
# FastAPI: відправляємо задачу на виконання
multiply.delay(4, 10)
Тут .delay() — це метод, який надсилає задачу в чергу. Він не блокує виконання коду і не чекає результату.
Як Celery обробляє задачі
Коли ви запускаєте воркери Celery, вони підключаються до брокера повідомлень і підписуються на певні черги. Як тільки задача з'являється в черзі, воркер бере її, виконує і повертає результат на збереження (якщо так налаштовано).
Воркери запускаються за допомогою команди:
celery -A your_project worker --loglevel=info
-A your_projectвказує ім'я вашого Python-проєкту або Celery застосунку.workerзапускає процес, який буде обробляти задачі.--loglevel=infoпоказує інформацію про кожну задачу в консолі.
Проста демонстрація виконання задачі
Припустимо, у нас є задача в tasks.py:
from celery import shared_task
@shared_task
def reverse_string(s):
return s[::-1]
Ми викликаємо її з користувацького коду (наприклад, під час обробки HTTP-запиту):
# Django View
from .tasks import reverse_string
def my_view(request):
string_to_reverse = "Celery"
task = reverse_string.delay(string_to_reverse)
return HttpResponse(f"Task ID: {task.id}")
Воркери підключаються до брокера (Redis/RabbitMQ), отримують задачу і виконують її:
- Вони реверсують рядок "Celery" у "yreleC".
- Результат можна зберегти, або почекати виконання (це буде розглянуто далі).
Керування задачами та їх статусами
Коли задача виконана, результат можна зберегти. Celery надає налаштовуваний механізм збереження (backends), наприклад, ви можете використовувати Redis або базу даних.
Приклад підключення backend у конфігураційний файл celery.py:
from celery import Celery
app = Celery(
'my_project',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1', # Збереження результатів
)
Тепер подивитися результат можна так:
result = reverse_string.delay("Celery")
print(result.get(timeout=10)) # Виведе: "yreleC"
Основні стани задач
Celery використовує систему станів для відстеження виконання задач:
- PENDING: задача поставлена в чергу, але ще не оброблена воркером.
- STARTED: воркери почали виконання задачі.
- SUCCESS: задача успішно виконана.
- FAILURE: задача завершилася з помилкою.
- RETRY: задача була відправлена на повтор.
- REVOKED: задача була скасована.
Приклад отримання статусу задачі:
status = result.status
print(f"Status of the task: {status}")
Налаштування збереження результату виконання задач
Іноді результат виконання задачі краще вимкнути (наприклад, коли це не потрібно). Це можна зробити через налаштування CELERY_TASK_IGNORE_RESULT:
app.conf.update(
task_ignore_result=True
)
Потенційні нюанси та типові помилки
При роботі з Celery корисно враховувати кілька моментів. Наприклад, задачі можуть застрягати в статусі PENDING, якщо воркери не підключені або неправильно налаштований брокер. Перевірте підключення воркерів командою celery -A my_project inspect active.
Ще одна популярна помилка — додавання складних об'єктів (таких як файли або несеріалізовувані Python-об'єкти) у задачу. Celery вимагає, щоб вхідні дані задач були серіалізовуваними, наприклад JSON.
Цілі, які ми розглянули сьогодні, є основами для створення і масштабування фонових задач. На наступній лекції ми будемо інтегрувати Celery з використанням RabbitMQ, щоб підсилити нашу асинхронну обробку задач.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ