JavaRush /Курсы /Модуль 4: FastAPI /Создание асинхронных запросов к MongoDB через FastAPI

Создание асинхронных запросов к MongoDB через FastAPI

Модуль 4: FastAPI
8 уровень , 5 лекция
Открыта

Сегодня мы полностью сосредоточимся на создании асинхронных запросов, обсудим их значимость для высоконагруженных систем, узнаем, как работает event loop, и напишем несколько практических примеров. Поехали!

Представьте, вы заказали доставку из магазина через приложение. В синхронном сценарии — вы сидите и смотрите в экран телефона, пока курьер везёт заказ. Никуда не уходите, ни на что не отвлекаетесь — просто ждёте.

В асинхронном варианте — вы сделали заказ и пошли дальше заниматься своими делами: работаете, смотрите фильм, готовите ужин. Когда курьер подъехал — вам приходит уведомление, и вы выходите за покупками. То есть вы не теряете время, пока что-то где-то обрабатывается. Ваш "главный поток" остаётся свободным и может делать другие важные вещи.

Вот это и есть асинхронность: не блокировать процесс ожидания, а освобождать систему для других задач, пока результат где-то формируется.

В терминах серверов: асинхронность = меньше простаивания, больше клиентов одновременно.

Асинхронность делает наши приложения более отзывчивыми и способными обслуживать большее количество клиентов одновременно. Это достигается за счёт использования встроенного в Python механизма event loop и ключевых слов async/await. Подробнее про event loop можно почитать в документации Python.

Асинхронное программирование в Python

Асинхронное программирование основано на event loop — цикле, который планирует выполнение задач. В Python асинхронность достигается с помощью ключевых слов async и await. Давайте углубимся в базовые понятия.


import asyncio

async def say_hello():
    print("Привет, мир!")
    await asyncio.sleep(1)
    print("Прошёл 1 секунда")

async def main():
    await say_hello()

# Запуск event loop
asyncio.run(main())

Здесь async делает функцию асинхронной, а await используется для обозначения точек, где выполнение может быть приостановлено.

Важный момент: вы не можете вызывать асинхронную функцию напрямую, её всегда нужно запускать через event loop — напрямую или косвенно.


Асинхронные маршруты в FastAPI для MongoDB

В FastAPI поддержка асинхронности встроена "из коробки". Асинхронные маршруты позволяют нам писать API, которые эффективно работают с базами данных, например, MongoDB, используя асинхронные библиотеки вроде motor.

Шаг 1: настройка библиотеки Motor

Если вы пропустили настройку motor в предыдущей лекции, не беда. Установим её:


pip install motor

Подключаем motor и создаём соединение:


from motor.motor_asyncio import AsyncIOMotorClient

MONGO_DETAILS = "mongodb://localhost:27017"
client = AsyncIOMotorClient(MONGO_DETAILS)
database = client.my_database
collection = database.get_collection("my_collection")

Теперь у нас есть коллекция my_collection, в которую мы можем вставлять и извлекать данные. Никто не устоит перед её мощью!

Шаг 2: асинхронное добавление данных

Создаём маршрут для добавления записей в MongoDB:


from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
    name: str
    description: str

@app.post("/items/")
async def create_item(item: Item):
    new_item = await collection.insert_one(item.dict())
    created_item = await collection.find_one({"_id": new_item.inserted_id})
    return created_item

Что здесь происходит:

  1. Мы объявили модель Item через Pydantic — это удобно и позволяет валидировать входящие данные.
  2. Используем асинхронный метод insert_one для добавления новой записи в коллекцию.
  3. Метод find_one возвращает добавленный документ по его ID.

Шаг 3: асинхронное чтение данных

Давайте добавим маршрут для чтения всех записей:


@app.get("/items/")
async def get_items():
    items = []
    async for item in collection.find():
        items.append(item)
    return items

Особенность MongoDB в том, что find() возвращает асинхронный курсор. Поэтому мы используем async for для итерирования по результатам.

Шаг 4: обновление записей

Обновлять данные тоже несложно:


@app.put("/items/{item_id}")
async def update_item(item_id: str, item: Item):
    update_result = await collection.update_one(
        {"_id": item_id}, {"$set": item.dict()}
    )
    if update_result.modified_count == 1:
        updated_item = await collection.find_one({"_id": item_id})
        if updated_item:
            return updated_item
    return {"error": "Не удалось обновить элемент"}

Мы ищем документ по полю _id, обновляем его и возвращаем новое значение, если обновление успешно.

Шаг 5: удаление записей

Удаление проще простого:


@app.delete("/items/{item_id}")
async def delete_item(item_id: str):
    delete_result = await collection.delete_one({"_id": item_id})
    if delete_result.deleted_count == 1:
        return {"message": "Элемент успешно удалён"}
    return {"error": "Элемент не найден"}

Здесь мы используем delete_one для удаления записи. Если элементов с таким ID не найдено, возвращается соответствующая ошибка.


3. Практический пример: Создаём простое API

Мы построим API для управления списком книг. Оно будет поддерживать операции: добавление, чтение, обновление и удаление книг.

Добавим модель для книги:


class Book(BaseModel):
    title: str
    author: str
    summary: str

Объединим наш предыдущий код в одном приложении с маршрутом для каждой операции.


@app.post("/books/")
async def create_book(book: Book):
    new_book = await collection.insert_one(book.dict())
    return await collection.find_one({"_id": new_book.inserted_id})


@app.get("/books/")
async def list_books():
    books = []
    async for book in collection.find():
        books.append(book)
    return books


@app.put("/books/{book_id}")
async def update_book(book_id: str, book: Book):
    result = await collection.update_one({"_id": book_id}, {"$set": book.dict()})
    if result.modified_count == 1:
        return await collection.find_one({"_id": book_id})
    return {"error": "Книга не найдена или не обновлена"}


@app.delete("/books/{book_id}")
async def delete_book(book_id: str):
    result = await collection.delete_one({"_id": book_id})
    if result.deleted_count == 1:
        return {"message": "Книга успешно удалена"}
    return {"error": "Книга не найдена"}

Типичные ошибки

  1. Не использовать асинхронные методы Motor: использование обычных блокирующих методов может привести к "зависанию" приложения. Будьте внимательны и используйте только асинхронные методы (async def и await).
  2. Неправильная структура MongoDB: MongoDB позволяет хранить произвольные данные, но если вы забудете согласовать структуру записей, отладка может стать кошмаром.
  3. Отсутствие индексов: без индексации ваши запросы могут стать медленными, особенно при росте числа записей.

Реальное применение

Асинхронные запросы широко применяются в высоконагруженных приложениях: от API для социальных сетей до систем аналитики. MongoDB отлично подходит для хранения данных в формате JSON, а FastAPI позволяет использовать её возможности максимально эффективно.

1
Задача
Модуль 4: FastAPI, 8 уровень, 5 лекция
Недоступна
Создание асинхронного маршрута для добавления данных в MongoDB
Создание асинхронного маршрута для добавления данных в MongoDB
1
Задача
Модуль 4: FastAPI, 8 уровень, 5 лекция
Недоступна
Асинхронное чтение всех записей из MongoDB
Асинхронное чтение всех записей из MongoDB
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ