JavaRush /Курси /Модуль 4: FastAPI /Приклад асинхронної взаємодії FastAPI з Redis

Приклад асинхронної взаємодії FastAPI з Redis

Модуль 4: FastAPI
Рівень 9 , Лекція 6
Відкрита

Сьогодні перейдемо до справжньої магії сучасних веб-додатків — асинхронної взаємодії FastAPI з Redis. У цій лекції ти дізнаєшся, як використовувати бібліотеку aioredis для виконання асинхронних операцій, що дозволить нашому додатку стати ще швидшим і готовим обробляти великі об'єми даних.

FastAPI — асинхронний веб-фреймворк, що робить його ідеальним вибором для обробки великої кількості запитів з мінімальними затримками. Додаємо сюди Redis, який також відмінно підходить для масштабованих застосунків, і отримаємо практично "ракети-носії" для твоїх даних.

Асинхронні операції дозволяють уникнути блокування головного потоку виконання програми. Це потрібно, щоб наш додаток міг продовжувати обслуговувати інші запити, навіть під час виконання важких операцій (наприклад, запису даних у Redis). Наприклад, якщо користувач запросить погоду у його місті, твій додаток не завісне, очікуючи відповіді бази даних.

Встановлення та налаштування aioredis

Спочатку встановимо бібліотеку aioredis. Це асинхронний клієнт для Redis, який легко інтегрується з FastAPI:

pip install aioredis

Створюємо файл redis.py, який міститиме налаштування підключення:


from aioredis import Redis
from redis.asyncio import Redis as AsyncRedis
import aioredis

# Конфігурація підключення до Redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0

redis: AsyncRedis = aioredis.from_url(f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}')

Цей файл можна підключати в різних модулях додатку. У ньому ми використовуємо бібліотеку aioredis для створення об'єкта клієнта.


Асинхронне виконання операцій з Redis

Давай реалізуємо приклад взаємодії FastAPI з Redis. Ми будемо кешувати дані запиту з використанням асинхронних операцій.

Створимо ендпоінт /cache-item, який зберігає і повертає дані з Redis.

У файлі main.py додамо таку реалізацію:


from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from redis.exceptions import DataError
from redis import asyncio as aioredis
import asyncio

# Імпортуємо об'єкт Redis з налаштувань
from redis_config import redis

app = FastAPI()

# Модель для даних, які будемо кешувати
class Item(BaseModel):
    key: str
    value: str

@app.post("/cache-item/")
async def set_item(item: Item):
    """
    Ендпоінт для запису даних у Redis
    """
    try:
        # Встановлюємо значення в Redis з часом життя 60 секунд
        await redis.set(item.key, item.value, ex=60)
        return {"status": "success", "message": f"Item {item.key} cached for 60 seconds"}
    except DataError:
        raise HTTPException(status_code=400, detail="Invalid data for caching")

@app.get("/cache-item/{key}")
async def get_item(key: str):
    """
    Ендпоінт для отримання даних з Redis
    """
    value = await redis.get(key)
    if value:
        return {"key": key, "value": value.decode("utf-8")}
    else:
        raise HTTPException(status_code=404, detail="Key not found")

Що тут відбувається?

  1. set_item:
    Ми зберігаємо дані в Redis. Для цього використовується метод redis.set.
    Третій параметр ex=60 вказує, що ключ житиме в Redis рівно 60 секунд.
  2. get_item:
    Ми намагаємось отримати дані з Redis по ключу. Якщо дані існують, вони повертаються в відповіді. Якщо ключ не існує — повертаємо помилку 404.

Асинхронна обробка великих обсягів даних

Для демонстрації роботи з великими масивами даних реалізуємо задачу масової запису в Redis. Уяви, ми зберігаємо записи про тисячі користувачів, оновлюємо їх і формуємо звіти. Redis — ідеальне рішення для такого сценарію.

Давай наведемо приклад. Додамо новий ендпоінт:


@app.post("/bulk-cache/")
async def bulk_cache(data: list[Item]):
    """
    Масова запис даних у Redis
    """
    # Створюємо pipeline для виконання багатьох операцій за один раз
    async with redis.pipeline(transaction=True) as pipe:
        for item in data:
            # Додаємо команди в pipeline
            pipe.set(item.key, item.value)
        # Виконуємо всі команди
        await pipe.execute()
    return {"status": "success", "message": f"Cached {len(data)} items"}

Тут ми використовуємо механізм паралельного виконання операцій за допомогою pipeline. Це дозволяє скоротити мережеві затримки і підвищити продуктивність операцій з Redis.


Приклад використання Pub/Sub (Видавець/Підписник) в Redis

Redis також підтримує модель Pub/Sub для обміну повідомленнями в реальному часі. Наприклад, ти хочеш повідомляти користувачів про зміну даних.

Додамо приклад використання Pub/Sub:


@app.post("/publish/")
async def publish(channel: str, message: str):
    """
    Публікація повідомлення в канал
    """
    await redis.publish(channel, message)
    return {"status": "success", "message": f"Message published to channel {channel}"}

@app.get("/subscribe/{channel}")
async def subscribe(channel: str):
    """
    Підписка на повідомлення з каналу
    """
    pubsub = redis.pubsub()
    await pubsub.subscribe(channel)

    async def reader(channel):
        async for message in channel.listen():
            print(f"Message received: {message}")

    asyncio.create_task(reader(pubsub))
    return {"status": "success", "message": f"Subscribed to channel {channel}"}
  1. Видавець (Publisher):
    • Публікація повідомлень у Redis-канал.
  2. Підписник (Subscriber):
    • Підписка на канал і отримання повідомлень у реальному часі.

Типові помилки та підводні камені

  1. Неправильний вибір даних для кешування:
    якщо дані часто змінюються, їх не варто кешувати, щоб уникнути повернення застарілої інформації.
  2. Перевищення об'єму пам'яті:
    Redis зберігає дані в оперативній пам'яті. Якщо додати забагато даних, застосунок почне "задихатися". Використовуй політики управління витісненням, такі як LRU.
  3. Забули встановити час життя (TTL):
    без часу життя твої дані житимуть вічно в Redis, займаючи пам'ять.

Практичне застосування

Асинхронна взаємодія з Redis — потужний інструмент для оптимізації сучасних застосунків. Наприклад:

  • Кешування часто запитуваних даних (рейтинг товарів, погода).
  • Повідомлення користувачів у реальному часі через Pub/Sub (чат-застосунки).
  • Масова обробка даних у великих системах.

Такий підхід дозволяє будувати високошвидкісні застосунки з мінімальними затримками, навіть при великих об'ємах даних.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ