Создание и развертывание ретранслятора Telegram каналов, используя Python и Heroku

Создание и развертывание ретранслятора Telegram каналов, используя Python и Heroku

В данной инструкции построим Python приложение для зеркалирования сообщений из Telegram каналов и развернем его на облачной платформе Heroku.

Подготовка

Идея проста — получить сообщение и тут же переотправить или переслать его.

Ввиду ограниченности Bot API, нет возможности добавить бота в канал, где вы не администратор. Поэтому будем использовать клиентский API, заходя на нужные каналы вручную (хотя и этот шаг можно автоматизировать).

По ходу действий нам понадобятся:

  • Аккаунт Telegram. Лучше перестраховаться и завести отдельный аккаунт, благо на сервисах приема SMS его цена составляет ~6₽. Для таких аккаунтов лучше сразу ставить двухфакторную аутентификацию;
  • Созданный Telegram канал, куда будем отправлять сообщения;
  • Установленный интерпретатор Python версии 3. Инструкция по установке;
  • Установленный Git клиент. Инструкция по установке;
  • База данных Postgres;
  • Аккаунт на облачной платформе Heroku. Регистрация.
  • Heroku CLI для взаимодейсвия с платформой Heroku. Установка.

Взаимодействие с Telegram API

Для работы с клиентским API необходимо создать приложение Telegram. Сделать это можно по ссылке. Здесь нас интересуют два значения: api_id и api_hash.

Значения api_id и api_hash Telegram приложения
Значения api_id и api_hash Telegram приложения

Полученные значения api_id и api_hash занесем во вновь созданный файл переменных окружения .env:

API_ID=api_id API_HASH=api_hash

При использовании системы контроля версий, не забудьте добавить файл .env в список исключения.

Создание Python приложения

Создадим папку проекта. Все последующие действия будем делать в ней.

Настроим виртуальное Python окружение для изоляции приложения от других глобальных зависимостей системы:

# Установим пакет виртуального окружения pip install --user virtualenv # Создадим новое виртуальное окружение python -m venv venv

Активируем виртуальное окружение:

# Linux source venv/bin/activate # Windows .\venv\Scripts\activate

Последующая установка новых Python пакетов будет производиться в данном виртуальной окружении.

Авторизация клиента

В качестве библиотеки для взаимодействия с Telegram API будем использовать Telethon:

pip install telethon

Для загрузки значений переменных окружения установим пакет python-dotenv:

pip install python-dotenv

Создадим файл config.py, где будем хранить и загружать переменные окружения в память приложения:

# file: config.py from os import environ from dotenv import load_dotenv # Загрузка значений переменных окружения load_dotenv() API_ID = environ.get('API_ID') API_HASH = environ.get('API_HASH')

Скрипт авторизации, целью которого является получение идентификатора сессии, выглядит следующим образом:

# file: login.py from telethon.sync import TelegramClient from telethon.sessions import StringSession from config import (API_ID, API_HASH) # Авторизация и печать ключа сессии with TelegramClient(StringSession(), API_ID, API_HASH) as client: print(client.session.save())

Запустим его:

python login.py

После ввода номера телефона и кода, пришедшего в приложение Telegram, получаем заветный идентификатор сессии, который добавим в файл .env.

# file: .env API_ID=api_id API_HASH=api_hash SESSION_STRING=sess_string

Теперь, используя вновь полученный идентификатор сессии, авторизуемся для дальнейшего взаимодействия с Telegram API:

# file: mirror.py from telethon.sync import TelegramClient from telethon.sessions import StringSession from config import (API_ID, API_HASH, SESSION_STRING) client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH)

Получение обновлений от Telegram

Присоединившись к каналу, серверы Telegram начинают слать нам соответствующие обновления (новые сообщения, отредактированные сообщения и др.). Остается лишь перенаправить их в наш канал.

Определим идентификаторы канала-источника (SOURCE_CHANNEL) и канала-приемника (TARGET_CHANNEL):

# file: .env API_ID=api_id API_HASH=api_hash SESSION_STRING=sess_string SOURCE_CHANNEL=source_channel TARGET_CHANNEL=target_channel

Каналы имеют обыкновение менять свои ссылки, но что остается неизменным — это их идентификаторы (число с префиксом -100). Узнать идентификатор можно с помощью Telegram бота @messageinformationsbot, переслав ему любое сообщение из интересующего вас канала.

Получение новых сообщений

Следуя примеру из документации, добавим обработчик новых сообщений:

# Обработчик новых сообщений @client.on(events.NewMessage) async def handler_new_message(event): try: # event.message содержит информацию о новом сообщении print(event.message) except Exception as e: print(e)

Для каждого нового сообщения в функцию handler_new_message приходит объект типа NewMessage, содержащий поле message с необходимой нам информацией. Т.к. нам необходимы обновления только от определенных каналов, при определении обработчика передадим ему параметр chats, содержащий идентификатор либо список идентификаторов каналов.

# file: mirror.py from telethon.sync import TelegramClient from telethon.sessions import StringSession from config import (API_ID, API_HASH, SESSION_STRING, SOURCE_CHANNEL, TARGET_CHANNEL) client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH) # Обработчик новых сообщений @client.on(events.NewMessage(chats=SOURCE_CHANNEL)) async def handler_new_message(event): try: # отправим сообщение в наш канал await client.send_message(TARGET_CHANNEL, event.message) # либо вместо переотправки можно репостнуть: # await client.forward_messages(TARGET_CHANNEL, event.message) except Exception as e: print(e) if __name__ == '__main__': client.start() client.run_until_disconnected()

Для успешной отправки сообщений в Telegram канал необходимо, чтобы аккаунт, которым авторизовались в приложении, имел на это соответствующие права.

Редактирование сообщений

В случае переотправки пришедших сообщений (использование метода client.send_message), при отредактированном сообщении в канале-источнике, можем отредактировать его и в канале-приемнике.

Для реализации редактирования необходимо знать какое именно сообщение редактировать, поэтому при добавлении нового сообщения, нужно сохранить идентификаторы исходного и копии сообщения. В дальнейшем по идентификатору сообщения из источника сможем определить идентификатор сообщения в нашем канале.

Установим базу данных Postgres, если этого не было сделано ранее. Сама установка не должна вызвать сложностей:

  • Для вашей операционной системы скачать установщик;
  • Запустить его, оставив параметры по умолчанию. Единственное, что стоит запомнить это логин и пароль пользователя базы данных:
Создание и развертывание ретранслятора Telegram каналов, используя Python и Heroku

Далее первоначальное взаимодействие с базой будем производить с помощью утилиты psql:

Утилита psql в Windows
Утилита psql в Windows

Для хранения соответствий идентификаторов определим простейшую таблицу BINDING_ID:

Схема таблицы соответствия идентификаторов сообщений
Схема таблицы соответствия идентификаторов сообщений

Создадим таблицу BINDING_ID в консоли утилиты psql командой:

CREATE TABLE IF NOT EXISTS BINDING_ID ( id serial primary key not null, mirror_message_id bigint not null, message_id bigint not null ) ;
Создание таблицы в утилите psql
Создание таблицы в утилите psql

Взаимодействие с базой данных будем производить с помощью пакета psycopg2:

pip install psycopg2

В файл .env добавим строку подключения к базе данных вида:

# например, postgres://postgres:postgres@localhost:5432/postgres DATABASE_URL=postgres://{имя пользователя}:{пароль}@{хост}/{название базы}

В случае развертывания на Heroku и использования Postgresql дополнения, для приложения в облаке переменная окружения DATABASE_URL устанавливается автоматически. Поэтому при развертывании в облаке DATABASE_URL устанавливать не нужно.

Для работы с базой данных определим следующие базовые функции: добавление соответствия идентификаторов сообщений (insert) и поиск идентификатора сообщения-клона по идентификатору оригинального сообщения (find_by_id):

# database.py import psycopg2 from psycopg2 import extras from psycopg2.extensions import AsIs from config import DATABASE_URL # Добавление соответствия идентификаторов # оригинального и скопированного сообщений def insert(message): connection = psycopg2.connect(DATABASE_URL) cursor = connection.cursor() try: columns = message.keys() values = message.values() sql_insert = 'insert into BINDING_ID (%s) values %s' cursor.execute(sql_insert, (AsIs(','.join(columns)), tuple(values))) connection.commit() except Exception as e: print(e) cursor.close() connection.close() # Поиск значения идентификатора скопированного сообщения # соответствующему идентификатору оригинального сообщения def find_by_id(message_id): try: connection = psycopg2.connect(DATABASE_URL) cursor = connection.cursor( cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute(""" SELECT mirror_message_id FROM BINDING_ID WHERE message_id = %s """, (message_id, )) rows = cursor.fetchone() cursor.close() connection.close() return rows except Exception as e: print(e)

В функцию обработчика новых сообщений добавим сохранение идентификаторов сообщений в базе данных:

# file: mirror.py from telethon.sessions import StringSession from telethon.sync import TelegramClient import database from config import (API_HASH, API_ID, SOURCE_CHANNEL, SESSION_STRING, TARGET_CHANNEL) client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH) # Обработчик новых сообщений @client.on(events.NewMessage(chats=SOURCE_CHANNEL)) async def handler_new_message(event): try: # Отправим пришедшее нам сообщение и сохраним его ид в базе mirror_message_id = await client.send_message(TARGET_CHANNEL, event.message) database.insert({ 'message_id': event.message.id, 'mirror_message_id': mirror_message_id }) except Exception as e: print(e)

Тогда редактирование сообщений будет выглядеть следующим образом:

# file: mirror.py ... # Обработчик отредактированных сообщений @client.on(events.MessageEdited(chats=SOURCE_CHANNEL)) async def handler_edit_message(event): try: # попытаемся найти в базе данных сообщение-клон message_to_edit = database.find_by_id(event.message.id) if message_to_edit is None: return # если такое нашлось -- отредактируем его id_message_to_edit = message_to_edit['mirror_message_id'] await client.edit_message(TARGET_CHANNEL, id_message_to_edit, event.message.message) except Exception as e: print(e) if __name__ == '__main__': client.start() client.run_until_disconnected()

Ознакомиться с текущим вариантом проекта можно по ссылке ниже:

Для случая зеркалирования «один к одному» у нас все готово и можно переходить к развертыванию.

Зеркалирование «много к одному»

Когда каналов-источников и каналов, в которые необходимо пересылать сообщения, больше чем один, необходимо задать их взаимное соответствие.

Задавать карту соответствий прямо в коде — не вариант, поэтому для значения новой переменной окружения (CHANNELS_MAPPING) введем специальный формат записи:

Формат строковой записи соответствий каналов
Формат строковой записи соответствий каналов

Теперь вместо ранее заданных переменных SOURCE_CHANNEL и TARGET_CHANNEL имеем CHANNELS_MAPPING:

# file: .env API_ID=api_id API_HASH=api_hash SESSION_STRING=sess_string DATABASE_URL=database_url CHANNELS_MAPPING=[-1001, -1002]:-1003;[-1004]:-1005;-1006:-1007

Для дальнейшей работы из строкового значения CHANNELS_MAPPING нужно получить представление в виде словаря (dict). Сделать это можно, разобрав строку по составляющим ее элементам с помощью регулярных выражений (модуль re) либо с помощью парсера грамматик (модуль pyparsing). В данном случае воспользуемся вторым вариантом.

Установим модуль pyparsing:

pip install pyparsing

Так как входная строка для разбора состоит из отдельных элементов, для каждого такого элемента определим свою грамматику:

  • Основной элемент — идентификатор канала, состоящий из префикса -100 и последующих цифр:
PREFIX = '-100' # Word(nums) - слово, состоящее из цифр # Соединяем в одну строку (Combine()) CHANNEL_ID = Combine(PREFIX + Word(nums))
  • Список каналов:
# [-100...,-100...] ARRAY_CHANNELS = Suppress("[") \ + CHANNEL_ID + ZeroOrMore(Suppress(",") + CHANNEL_ID) \ + Suppress("]")

Полное выражение:

# ((ARRAY|ID):ID;)+ ALL = LineStart() \ + OneOrMore((ARRAY_CHANNELS | CHANNEL_ID).setResultsName('source*') \ + Suppress(":") \ + CHANNEL_ID.setResultsName('target*') + Suppress(";")) \ + LineEnd()

Подробнее о модуле pyparsing можно узнать в документации.

После составления грамматики, формируем результат разбора строки в виде словаря, где ключи — исходные каналы, значения — список целевых каналов (функция parse_string):

# file: utils.py from pyparsing import (Combine, LineEnd, LineStart, OneOrMore, ParseException, Suppress, Word, ZeroOrMore, nums) PREFIX = "-100" # id1 = -100\d+ CHANNEL_ID = Combine(PREFIX + Word(nums)) # [id1, id2] ARRAY_CHANNELS = Suppress("[") \ + CHANNEL_ID + ZeroOrMore(Suppress(",") + CHANNEL_ID) \ + Suppress("]") # ((ARRAY|ID):ID;)+ ALL = LineStart() \ + OneOrMore((ARRAY_CHANNELS | CHANNEL_ID).setResultsName('source*') \ + Suppress(":") \ + CHANNEL_ID.setResultsName('target*') + Suppress(";")) \ + LineEnd() def parse_string(string): # {source1:[target1, ...]} channels_mapping = {} try: # Результат разбора res = ALL.parseString(string).asDict() # Цикл по кол-ву найденных отображений каналов for idx in range(len(res['source'])): if isinstance(res['source'][idx], list): for item in res['source'][idx]: channels_mapping.setdefault( int(item), []).append(int(res['target'][idx])) else: channels_mapping.setdefault( int(res['source'][idx]), []).append(int(res['target'][idx])) except ParseException as e: print('Wrong string:', e) except Exception as e: print(e) return channels_mapping

Инициализация переменной CHANNELS_MAPPING:

# file: config.py ... from utils import parse_string CHANNELS_MAPPING = parse_string(environ.get('CHANNELS_MAPPING', '')) # Для фильтра по каналам-источникам SOURCE_CHANNELS = list(CHANNELS_MAPPING.keys())

Расширим таблицу BINDING_ID двумя столбцами: идентификатор канала-зеркала (mirror_channel_id) и идентификатор канала-источника (channel_id):

ALTER TABLE BINDING_ID ADD COLUMN mirror_channel_id bigint not null, ADD COLUMN channel_id bigint not null ;
Добавление столбцов в таблицу BINDING_ID в утилите psql
Добавление столбцов в таблицу BINDING_ID в утилите psql

И обновим функцию поиска сообщений-клонов в базе данных (find_by_id):

# file: database.py ... # message_id -- идентификатор оригинального сообщения # channel_id -- идентификатор оригинального канала # По значениям message_id и channel_id найдем # идентификаторы канала-приемника (mirror_channel_id) # и сообщения клона (mirror_message_id) def find_by_id(message_id, channel_id): try: connection = psycopg2.connect(DATABASE_URL) cursor = connection.cursor( cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute(""" SELECT mirror_channel_id, mirror_message_id FROM BINDING_ID WHERE message_id = %s AND channel_id = %s """, (message_id, channel_id, )) rows = cursor.fetchone() cursor.close() connection.close() return rows except Exception as e: print(e)

Тогда добавление новых сообщений будет выглядеть следующим образом:

# file: mirror.py from telethon.sessions import StringSession from telethon.sync import TelegramClient import time import database from config import (API_HASH, API_ID, SESSION_STRING, CHANNELS_MAPPING, SOURCE_CHANNELS) client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH) # Обработчик новых сообщений @client.on(events.NewMessage(chats=SOURCE_CHANNELS)) async def handler_new_message(event): try: # По идентификатору канала-источника найдем # каналы-приемники mirror_channel = CHANNELS_MAPPING.get(event.chat_id) if mirror_channel is None and len(mirror_channel) < 1: return # Отправим сообщение в каждый канал-приемник # и сохраним в базе идентификаторы for c in mirror_channel: mirror_message_id = await client.send_message(c, event.message) database.insert({ 'mirror_message_id': mirror_message_id, 'message_id': event.message.id, 'mirror_channel_id': c, 'channel_id': event.chat_id }) except Exception as e: print(e)

Аналогично редактирование:

# file: mirror.py ... # Обработчик отредактированных сообщений @client.on(events.MessageEdited(chats=SOURCE_CHANNELS)) async def handler_edit_message(event): try: # По идентификатору канала-источника и идентификатору # сообщения найдем каналы-приемники и идентификаторы сообщений-клонов messages_to_edit = database.find_by_id(event.message.id, event.chat_id) if messages_to_edit is not None and len(messages_to_edit) < 1: return # Отредактируем их for message in messages_to_edit: await client.edit_message(message['mirror_channel_id'], message['mirror_message_id'], event.message.message) except Exception as e: print(e)

На вариант с множественным зеркалированием можно посмотреть по ссылке ниже:

Развертывание

Для развертывания приложения Heroku необходимо сделать следующее:

1. Зарегистрироваться на Heroku, если это не было сделано ранее;

2. Для бесплатного использования — увеличить число бесплатных часов работы до 1000;

3. Создать Heroku Procfile:

# file: Procfile run: python mirror.py

Procfile — файл описания команд, которые будут выполняться в приложении Heroku.

4. Создать локальный git репозиторий и сделать первый коммит, если это не было сделано ранее;

heroku create {НАЗВАНИЕ_ПРИЛОЖЕНИЯ}
Создание нового приложения в Heroku CLI
Создание нового приложения в Heroku CLI

6. Добавить Postgres расширение к приложению Heroku:

# hobby-dev здесь наименование тарифного плана (0$/месяц) # для него существует ограничение в 10000 строк # Для большего стоит выбрать другой тарифный план либо регулярно чистить таблицу heroku addons:create heroku-postgresql:hobby-dev
Добавление Postgres расширения к приложению Heroku
Добавление Postgres расширения к приложению Heroku

7. Скопировать данные из локальной базы данных в удаленную базу данных Heroku:

# например, PGUSER=postgres PGPASSWORD=postgres heroku pg:push postgres DATABASE_URL PGUSER={ПОЛЬЗОВАТЕЛЬ_ЛОКАЛЬНОЙ_БАЗЫ} PGPASSWORD={ПАРОЛЬ} heroku pg:push {ЛОКАЛЬНАЯ_БАЗА} DATABASE_URL # Пример для Windows: SET PGUSER=postgres&& SET PGPASSWORD=postgres&& heroku pg:push postgres DATABASE_URL

Копирование данных может закончиться ошибкой, если таблица была пустой, но присоединившись к удаленной базе командой:

heroku pg:psql

Убедимся, что таблица была перенесена:

Успешный перенос локальной базы данных
Успешный перенос локальной базы данных

8. Добавить удаленный git репозиторий Heroku:

heroku git:remote -a {НАЗВАНИЕ_ПРИЛОЖЕНИЯ}​

9. Установить переменные окружения для приложения Heroku из файла .env с исключением переменной DATABASE_URL (установится автоматически):

# Установка переменных окружения значениями из .env файла heroku config:set $(cat .env | sed '/^DATABASE_URL/d; /#[[:print:]]*$/d')

Для пользователей Windows команду выше можно выполнить в Git Bash либо воспользоваться методами из статьи.

10. Создать файл requirements.txt со всеми зависимостями приложения:

pip freeze > requirements.txt

11. Загрузить приложение на Heroku:

git push heroku master

12. Запустить приложение:

heroku ps:scale run=1

Итог

В итоге получили инструмент зеркалирования Telegram каналов, конфигурируемый с помощью переменных окружения и работающий удаленно в облаке Heroku.

Готовое решение с развертыванием на Heroku в один клик:

99
8 комментариев

Полезная штука. Тут чувак сделал наподобие, только управление редиректами сделано через бота https://github.com/rumble-key/feed-bot-telegram

1

классный гайд, можешь написать в личку - есть подобная задача, вдруг возможно сотрудничество с тобой!

Спасибо! Настроил себе все)

Подскажите, а почему позникает ошибка "Cant adapt type message"?

Отличная работа! А вы не думали добавить фильтрацию измененных/удаленных сообщений?
Пример: добавляем новые сообщения в базу, и мониторим их. Если такое сообщение будет изменено/удалено - только тогда отправляем в "зеркальный" канал

По ссылке в конце статьи находится обновленный проект, где можно добиться такого поведения изменив реплицирующие методы в EventProcessor'e ( https://github.com/khoben/telemirror/blob/a555136d3844381016916794ff8c78b9879eebf6/telemirror/mirroring.py#L16 ): на событие NewMessage только пишем в БД без отправки, на остальные события по наличию в БД отправляем сообщение.