Наша цель – создать систему, которая:
- Подключается к Telegram через Telethon для мониторинга чатов.
- Позволяет управлять уведомлениями через API, используя FastAPI.
- Уведомляет о событиях (например, новых сообщениях) через Telegram или другие каналы (например, email).
Эту систему можно использовать для мониторинга чатов сотрудников, получения уведомлений о важных событиях или даже для аналитики активности пользователей.
Разработка системы уведомлений
Архитектура системы
Для начала разберём общий план:
- Telethon: Подключается к Telegram и отслеживает события, такие как новые сообщения или упоминания.
- FastAPI: Отвечает за взаимодействие с пользователем через HTTP API. Через API мы можем:
- Настраивать, какие события отслеживать.
- Подключать пользователей для получения уведомлений.
- SQLAlchemy (или любая другая база данных): Хранение данных и настроек.
- Асинхронные задачи: Для обработки событий в реальном времени.
Архитектурная схема:
+-------------------------------+
| Пользователь (HTTP-клиент) |
+-------------------------------+
|
(API-запросы)
|
v
+-------------------------------+
| FastAPI |
+-------------------------------+
|
v
+-------------------------------+
| Telethon Client |
+-------------------------------+
|
(Телеграм-события)
|
v
+-------------------------------+
| База данных |
+-------------------------------+
|
(хранение настроек)
|
v
+-------------------------------+
| Уведомления |
+-------------------------------+
Настройка проекта
Создадим структуру проекта для лёгкой организации кода:
project/
├── main.py # Точка входа в приложение FastAPI
├── bot/
│ ├── client.py # Конфигурация Telethon клиента
│ ├── handlers.py # Обработчики событий Telethon
│ ├── tasks.py # Асинхронные задачи
├── api/
│ ├── routes.py # Роуты для API FastAPI
│ ├── models.py # Pydantic-модели для API
│ ├── db.py # Подключение к базе данных
├── requirements.txt
Начнём с установки необходимых библиотек:
pip install fastapi uvicorn telethon sqlalchemy aiosqlite
Настройка Telethon клиента
Создадим файл bot/client.py, где настроим Telethon-клиент:
from telethon import TelegramClient
# Ваши данные для подключения к Telegram API
API_ID = "your_api_id"
API_HASH = "your_api_hash"
SESSION_NAME = "session_name"
# Создаём Telethon-клиент
client = TelegramClient(SESSION_NAME, API_ID, API_HASH)
# Запуск клиента
async def start_client():
await client.start()
print("Telethon клиент запущен!")
Код выше запускает клиента Telegram, используя ваше API ID и Hash, которые можно получить здесь.
Обработка событий Telegram
Теперь добавим обработчики событий в bot/handlers.py. Например, обработка новых сообщений:
from telethon import events
from .client import client
@client.on(events.NewMessage) # Отслеживаем новые сообщения
async def new_message_handler(event):
print(f"Новое сообщение в чате {event.chat_id}: {event.text}")
# Здесь можно добавить уведомление или другие действия
Настройка FastAPI и маршрутов
Перейдём к созданию API в файле api/routes.py:
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def read_root():
return {"message": "Система уведомлений работает!"}
@router.post("/add_chat/")
async def add_chat(chat_id: int):
# Добавляем чат в список мониторинга
# Здесь можно сохранить chat_id в базе данных
return {"message": f"Чат {chat_id} добавлен в мониторинг."}
Подключим эти маршруты в корневом файле main.py:
from fastapi import FastAPI
from bot.client import start_client
from api.routes import router
app = FastAPI()
app.include_router(router)
# Запуск Telethon клиента при запуске FastAPI
@app.on_event("startup")
async def startup_event():
await start_client()
Запускаем FastAPI:
uvicorn main:app --reload
Теперь ваш API доступен, и все события Telegram обрабатываются.
Хранение данных в базе
Используем SQLAlchemy для хранения данных (например, уведомления о новых сообщениях). В файле api/db.py:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
Base = declarative_base()
engine = create_async_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Создадим таблицу для хранения настроек чатов:
from sqlalchemy import Column, Integer, String
from .db import Base
class Chat(Base):
__tablename__ = "chats"
id = Column(Integer, primary_key=True, index=True)
chat_id = Column(String, unique=True, index=True)
notification_type = Column(String) # Например, "email" или "telegram"
Уведомления
Теперь реализуем отправку уведомлений. Например, уведомление в Telegram:
from bot.client import client
async def send_notification(chat_id, message):
await client.send_message(chat_id, message)
Логика взаимодействия
Объединим всё: при получении нового сообщения в Telethon вызываем функцию записи в базу или отправки уведомлений. В bot/handlers.py:
from .notifications import send_notification
@client.on(events.NewMessage)
async def new_message_handler(event):
chat_id = event.chat_id
text = event.message.message
print(f"Новое сообщение в чате {chat_id}: {text}")
# Отправляем уведомление (например, через Телеграм)
await send_notification(chat_id, f"Новое сообщение: {text}")
Тестирование системы
Тестирование:
- Запускаем FastAPI (
uvicorn main:app --reload). - Мониторим сообщения в чате.
- Проверяем отправку уведомлений.
Жизнь без багов — это утопия, но теперь ваши уведомления всегда будут работать!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ