JavaRush /Курсы /Модуль 4: FastAPI /Оптимизация производительности FastAPI с использованием R...

Оптимизация производительности FastAPI с использованием Redis

Модуль 4: FastAPI
9 уровень , 9 лекция
Открыта

Как кэширование помогает ускорить работу FastAPI? Представьте, что ваш FastAPI-приложение — это ресторан. Каждый раз, когда клиент заказывает блюдо, кухня (ваша база данных) должна приготовить его с нуля. Это требует времени и ресурсов. Redis, в свою очередь, выступает в роли супер-шустрых официантов, которые сразу подают блюдо клиентам, если оно уже готово и горячо.

Преимущества Redis для оптимизации FastAPI:

  1. Мгновенный доступ к данным: Redis хранит данные в оперативной памяти, что делает его значительно быстрее традиционных баз данных.
  2. Снижение нагрузки на основную базу данных: кэширование частых запросов освобождает вашу базу данных от выполнения одинаковых операций.
  3. Поддержка TTL: механизм Time-To-Live позволяет кэшу автоматически очищаться от устаревших данных.
  4. Гибкость в настройке политик кэширования: например, можно применять стратегию «вытеснения» редкоиспользуемых данных.

Интеграция Redis для полной оптимизации

Давайте разберём основные компоненты оптимизации FastAPI с Redis, соединим все ранее изученные техники и применим их в реальном проекте.

1. Установка окружения

Для начала убедитесь, что у вас установлен Redis и подключена библиотека aioredis в ваш FastAPI проект:

pip install aioredis

Если Redis ещё не запущен, сделайте это через Docker:

docker run -d --name redis-server -p 6379:6379 redis

2. Настройка подключения

Создаём подключение к Redis в нашем FastAPI проекте в отдельном модуле redis_config.py:


import aioredis

# Пул соединений Redis
async def get_redis_pool():
    return await aioredis.from_url(
        "redis://localhost",
        encoding="utf-8",
        decode_responses=True
    )

Теперь настраиваем жизненный цикл Redis приложения: подключение при запуске и отключение при завершении работы приложения:


from fastapi import FastAPI
from redis_config import get_redis_pool

app = FastAPI()
redis_client = None

@app.on_event("startup")
async def startup():
    global redis_client
    redis_client = await get_redis_pool()

@app.on_event("shutdown")
async def shutdown():
    if redis_client:
        await redis_client.close()

3. Кэширование сложного вычислительного запроса

Допустим, в нашем приложении есть запрос, который выполняет тяжёлую аналитическую операцию над данными. Реализуем кэширование для него:

Модель:


from pydantic import BaseModel

class Product(BaseModel):
    id: int
    name: str
    price: float

Логика маршрута:


from fastapi import Depends
import json  # Для сериализации объектов

@app.get("/products/{product_id}")
async def get_product(product_id: int, redis=Depends(lambda: redis_client)):
    # Проверяем кэш
    cached_product = await redis.get(f"product:{product_id}")
    if cached_product:
        # Если данные уже есть в кэше, возвращаем их
        return json.loads(cached_product)

    # Если нет данных в кэше, выполняем "тяжёлые вычисления"
    product = {
        "id": product_id,
        "name": f"Product {product_id}",
        "price": 99.99,
    }

    # Сохраняем результат в кэш на 60 секунд
    await redis.setex(f"product:{product_id}", 60, json.dumps(product))
    return product

Что здесь происходит?

  1. При получении запроса к API сначала проверяется, существует ли объект в кэше.
  2. Если объект найден, то возвращается мгновенно, без обращения к базе данных.
  3. Если нет, данные обрабатываются (или запрашиваются из БД), а результат записывается в Redis с TTL в 60 секунд.

Кэширование списка (или страницы данных)

Задача: кэшировать результаты пагинации данных, чтобы не генерировать их каждый раз. Рассмотрим пример с постами блога:


@app.get("/posts")
async def get_posts(page: int = 1, redis=Depends(lambda: redis_client)):
    cache_key = f"posts:page:{page}"

    # Проверяем кэш
    cached_posts = await redis.get(cache_key)
    if cached_posts:
        return json.loads(cached_posts)

    # Если данных в кэше нет, имитируем "тяжёлую" операцию
    posts = [{"id": i, "title": f"Post {i}"} for i in range((page-1)*10, page*10)]

    # Сохраняем результат в кэш на 120 секунд
    await redis.setex(cache_key, 120, json.dumps(posts))
    return posts

Кэширование зависимости на уровне маршрута

FastAPI поддерживает кэширование через зависимости. Например, мы можем вынести кэширование в отдельную функцию:


from fastapi import Depends, HTTPException
from typing import Callable

async def cache_dependency(key: str, operation: Callable, ttl: int = 60):
    cached_value = await redis_client.get(key)
    if cached_value:
        return json.loads(cached_value)
    
    result = await operation()
    await redis_client.setex(key, ttl, json.dumps(result))
    return result

@app.get("/expensive-data")
async def expensive_operation(data=Depends(lambda: cache_dependency(
    key="expensive_data_cache",
    operation=lambda: mock_database_fetch(),
    ttl=300
))):
    return data

async def mock_database_fetch():
    # Эмуляция запроса к базе данных
    return {"data": "This is heavy data processing"}

Мониторинг и настройка Redis

Redis предоставляет команды для мониторинга производительности:

  • INFO — получение общей информации о сервере.
  • MONITOR — отображение всех выполняемых команд в реальном времени.
  • MEMORY USAGE key — проверка использования памяти для конкретного ключа.

Поддерживайте оптимальную настройку Redis, следя за ограничениями оперативной памяти и выбирая подходящую политику вытеснения данных, например, LRU (Least Recently Used).


Визуализация улучшений производительности

Представим простой сценарий с измерением времени отклика FastAPI:


import time

start_time = time.time()
response = await client.get("/products/1")
cached_time = time.time() - start_time
print(f"Response Time (with Redis cache): {cached_time:.5f} seconds")

Эксперимент с и без кэширования покажет драматическое снижение времени обработки при использовании Redis. Для нагрузочного тестирования можно использовать инструменты, такие как Apache Benchmark (ab) или wrk.


Реализация автоматического инвалидирования кэша

Когда данные обновляются, нужно удалять устаревшие элементы из кэша. Допустим, в нашем приложении обновляется информация о продукте:


@app.put("/products/{product_id}")
async def update_product(product_id: int, product: Product, redis=Depends(lambda: redis_client)):
    # Логика обновления данных (например, в БД)

    # Удаляем устаревший кэш
    await redis.delete(f"product:{product_id}")

    return {"message": "Product updated successfully"}

Заключительная практика

Ваше задание: интегрируйте Redis в своё приложение FastAPI, протестируйте его на реальной нагрузке, а затем замерьте время отклика до и после внедрения кэширования. Обязательно поделитесь результатами в Slack группы — давайте посмотрим, чьё приложение становится ракетой!

Документация Redis

Документация aioredis

1
Задача
Модуль 4: FastAPI, 9 уровень, 9 лекция
Недоступна
Кэширование данных о пользователях
Кэширование данных о пользователях
1
Задача
Модуль 4: FastAPI, 9 уровень, 9 лекция
Недоступна
Реализация кэширования поискового запроса
Реализация кэширования поискового запроса
3
Опрос
Кэширование сессий пользователей с Redis, 9 уровень, 9 лекция
Недоступен
Кэширование сессий пользователей с Redis
Кэширование сессий пользователей с Redis
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ