У Celery завдання — це функції, які ви анотуєте спеціальним декоратором @celery.task.
Ось приклад найпростішого завдання, яке повертає суму двох чисел:
# tasks.py
from celery import Celery
app = Celery('example_app', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
У цьому прикладі:
- Ми імпортували Celery і створили екземпляр додатка через
Celery('example_app'), вказавши брокера повідомлень (Redis). - Ми визначили функцію
add(x, y)і зареєстрували її як завдання за допомогою декоратора@app.task.
Не забувайте структурувати проєкт! Зазвичай створюється окремий файл tasks.py, де й розміщуються функції завдань.
Хороша структура = менше головного болю.
Виконання завдань
Щоб відправити завдання на виконання (тобто поставити її в чергу Celery), використовуйте метод .delay():
# Відправляємо завдання на виконання
result = add.delay(4, 6)
# Результат виконання *можна* отримати через result.get() (якщо потрібно)
print(result.get()) # Вивід: 10
Зверніть увагу, що виклик add.delay() не блокує виконання коду. Завдання створюється і відправляється в чергу, а Celery її обробляє незалежно.
Виконання завдань з пріоритетами
Celery дозволяє виставляти пріоритет завданням, щоб важливі оброблялися швидше. Ось як це робиться:
- Налаштовуємо пріоритети черг у конфігурації Celery:
# celeryconfig.py task_queues = { # Вказуємо ім'я черги й рівні пріоритету 'default': {'exchange': 'default', 'routing_key': 'default', 'priority_levels': 10}, } - Передаємо пріоритет при виклику завдання:
# Завдання з високим пріоритетом add.apply_async((5, 7), priority=9)Тут
priority=9вказує на високий пріоритет (чим більше число, тим вищий пріоритет).
Робота з результатами завдань
Коли наші завдання виконуються, їхні результати можна зберігати, обробляти або ігнорувати.
Celery надає механізм збереження результатів через backend (наприклад, Redis). Ось приклад:
Увімкнемо збереження результатів через Redis:
# celeryconfig.py
result_backend = 'redis://localhost:6379/1'
У цьому випадку результати виконання завдань зберігаються в Redis (у базі 1). Тепер можна отримати результат виконання:
result = add.delay(10, 20)
# Чекаємо виконання й отримуємо результат
print(result.get(timeout=10)) # Вивід: 30
Виклик result.get() блокує виконання коду до завершення завдання, що суперечить принципу асинхронності.
Використовуйте цей підхід тільки якщо це необхідно.
Практичний приклад: відправлення email
Давайте реалізуємо класичний сценарій: відправлення підтвердження про реєстрацію користувача.
Створимо завдання, яке відправляє email асинхронно.
- Переконаємось, що в нас встановлено SMTP-сервер. Для тестування можна використати MailHog.
- Створимо завдання в
tasks.py:# tasks.py from celery import Celery from time import sleep app = Celery('email_app', broker='redis://localhost:6379/0') @app.task def send_email(email, subject, body): """ Приклад емулювання відправлення листа (замість реального SMTP попрацюємо з 'print') """ sleep(2) # Імітуємо затримку відправлення print(f"Повідомлення відправлено: {email}, {subject}, {body}") return f"Email відправлено для {email}"Завдання
send_emailімітує відправлення листа. - Викликаємо завдання з нашого додатка:
# Десь у вашому коді (наприклад, у view або endpoint) from tasks import send_email # Відправляємо email у фоновому завданні result = send_email.delay( email="user@example.com", subject="Ласкаво просимо!", body="Дякуємо за реєстрацію на нашому сайті!" ) # Можна також отримати результат, якщо це потрібно print(result.get()) # Вивід: "Email відправлено для user@example.com"Цей приклад показує, як просто зв'язати бізнес-логіку додатка з Celery, розвантаживши головний процес. Асинхронність FTW (for the win)!
Ключові помилки при роботі з Celery
- Неправильна конфігурація брокера повідомлень. Якщо ви забули вказати правильну адресу брокера (
broker_url), завдання просто «зникнуть» у небутті. Завжди перевіряйте конфігурацію. - Очікування
result.get()для тривалих завдань. Мета Celery — асинхронність. Не блокуй додаток цим викликом, якщо це не вкрай необхідно. - Відсутність моніторингу. Без інструментів на кшталт Flower або логів важко помітити, якщо щось пішло не так.
Ускладнюємо завдання
- Давайте додамо завдання для обчислення факторіалу, щоб продемонструвати обчислення:
@app.task def calculate_factorial(n): result = 1 for i in range(1, n + 1): result *= i sleep(0.1) # Сповільнюємо для імітації довгої операції return result - Використаємо її:
# Запускаємо завдання factorial_result = calculate_factorial.delay(10) # Працюємо далі паралельно... (чекаємо, але не блокуємо!) print("Фонова задача запущена!")
Тепер ваш Celery вміє робити корисні штуки: від надсилання листів до обчислень.
У наступних лекціях ви зможете покращити цей функціонал, додавши моніторинг і планування завдань за допомогою Celery Beat.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ