Bump to v1.7 (Migrate to Hydrogram)

This commit is contained in:
Dr.Caduceus
2024-11-20 15:29:44 +05:30
committed by GitHub
parent f0df2839c0
commit 35e0008d2f
14 changed files with 220 additions and 306 deletions

View File

@@ -1,7 +1,5 @@
<div align="center"><h1>🌐File Stream Bot</h1>
<b>An open-source Python Telegram bot to transmit Telegram files over HTTP.</b>
<a href="https://t.me/DrFileStreamBot"><b>Demo Bot</b></a>
</div><br>
## **📑 INDEX**
@@ -34,7 +32,7 @@ For Linux:
```
sudo apt-get update && sudo apt-get install -y python3.11 git pip
```
For macOS:
For MacOS:
```
brew install python@3.11 git
```
@@ -71,13 +69,13 @@ pip install -r requirements.txt
**The variables provided below should either be completed within the [config.py](https://github.com/TheCaduceus/FileStreamBot/blob/main/bot/config.py) file or configured as environment variables.**
* `API_ID`|`TELEGRAM_API_ID`: API ID of your Telegram account, can be obtained from [My Telegram](https://my.telegram.org). `int`
* `API_HASH`|`TELEGRAM_API_HASH`: API hash of your Telegram account, can be obtained from [My Telegram](https://my.telegram.org). `str`
* `OWNER_ID`: ID of your Telegram account, can be obtained by sending **/info** to [@DrFileStreamBot](https://t.me/DrFileStreamBot). `int`
* `OWNER_ID`: ID of your Telegram account, can be obtained by sending **/info** to [@DumpJsonBot](https://t.me/DumpJsonBot). `int`
* `ALLOWED_USER_IDS`: A list of Telegram account IDs (separated by spaces) that are permitted to use the bot. Leave this field empty to allow anyone to use it. `str`
* `BOT_USERNAME`|`TELEGRAM_BOT_USERNAME`: Username of your Telegram bot, create one using [@BotFather](https://t.me/BotFather). `str`
* `BOT_TOKEN`|`TELEGRAM_BOT_TOKEN`: Telegram API token of your bot, can be obtained from [@BotFather](https://t.me/BotFather). `str`
* `CHANNEL_ID`|`TELEGRAM_CHANNEL_ID`: ID of the channel where bot will forward all files received from users, can be obtained by forwarding any message from channel to [@ShowJsonBot](https://t.me/ShowJsonBot) and then looking from `forward_from_chat` key. `int`
* `CHANNEL_ID`|`TELEGRAM_CHANNEL_ID`: ID of the channel where bot will forward all files received from users, can be obtained by forwarding any message from channel to [@DumpJsonBot](https://t.me/DumpJsonBot) and then looking from `forward_from_chat` key. `int`
* `BOT_WORKERS`: Number of updates bot should process from Telegram at once, by default to 10 updates. `int`
* `SECRET_CODE_LENGTH`: Number of characters that file code should contain, by default to 12 characters. `int`
* `SECRET_CODE_LENGTH`: Number of characters that file code should contain, by default to 24 characters. `int`
* `BASE_URL`: Base URL that bot should use while generating file links, can be FQDN and by default to `127.0.0.1`. `str`
* `BIND_ADDRESS`: Bind address for web server, by default to `0.0.0.0` to run on all possible addresses. `str`
* `PORT`: Port for web server to run on, by default to `8080`. `int`

View File

@@ -1,15 +1,19 @@
from telethon import TelegramClient
from hydrogram import Client
from logging import getLogger
from logging.config import dictConfig
from .config import Telegram, LOGGER_CONFIG_JSON
dictConfig(LOGGER_CONFIG_JSON)
version = 1.6
version = 1.7
logger = getLogger('bot')
TelegramBot = TelegramClient(
session='bot',
api_id=Telegram.API_ID,
api_hash=Telegram.API_HASH
TelegramBot = Client(
name ='bot',
api_id = Telegram.API_ID,
api_hash = Telegram.API_HASH,
bot_token = Telegram.BOT_TOKEN,
plugins = {'root': 'bot/plugins'},
workers = Telegram.BOT_WORKERS,
max_concurrent_transmissions = 1000
)

View File

@@ -1,22 +1,6 @@
from importlib import import_module
from pathlib import Path
from bot import TelegramBot, logger
from bot.config import Telegram
from bot import TelegramBot
from bot.server import server
def load_plugins():
count = 0
for path in Path('bot/plugins').rglob('*.py'):
import_module(f'bot.plugins.{path.stem}')
count += 1
logger.info(f'Loaded {count} {"plugins" if count > 1 else "plugin"}.')
if __name__ == '__main__':
logger.info('initializing...')
TelegramBot.loop.create_task(server.serve())
TelegramBot.start(bot_token=Telegram.BOT_TOKEN)
logger.info('Telegram client is now started.')
logger.info('Loading bot plugins...')
load_plugins()
logger.info('Bot is now ready!')
TelegramBot.run_until_disconnected()
TelegramBot.run()

View File

@@ -1,14 +1,15 @@
from os import environ as env
class Telegram:
API_ID = int(env.get("TELEGRAM_API_ID", 1234))
API_ID = int(env.get("TELEGRAM_API_ID", 12345))
API_HASH = env.get("TELEGRAM_API_HASH", "xyz")
OWNER_ID = int(env.get("OWNER_ID", 1234567890))
OWNER_ID = int(env.get("OWNER_ID", 5530237028))
ALLOWED_USER_IDS = env.get("ALLOWED_USER_IDS", "").split()
BOT_USERNAME = env.get("TELEGRAM_BOT_USERNAME", "BotFather")
BOT_TOKEN = env.get("TELEGRAM_BOT_TOKEN", "1234:abcd")
CHANNEL_ID = int(env.get("TELEGRAM_CHANNEL_ID", -1001234567890))
SECRET_CODE_LENGTH = int(env.get("SECRET_CODE_LENGTH", 12))
BOT_TOKEN = env.get("TELEGRAM_BOT_TOKEN", "1234567:xyz")
BOT_WORKERS = env.get("BOT_WORKERS", 10)
CHANNEL_ID = int(env.get("TELEGRAM_CHANNEL_ID", -100123456789))
SECRET_CODE_LENGTH = int(env.get("SECRET_CODE_LENGTH", 24))
class Server:
BASE_URL = env.get("BASE_URL", "http://127.0.0.1:8080")
@@ -47,6 +48,10 @@ LOGGER_CONFIG_JSON = {
'bot': {
'level': 'INFO',
'handlers': ['file_handler', 'stream_handler']
},
'hydrogram': {
'level': 'INFO',
'handlers': ['file_handler', 'stream_handler']
}
}
}

View File

@@ -1,20 +1,16 @@
from telethon.events import NewMessage, CallbackQuery
from typing import Callable
from hydrogram import Client
from hydrogram.types import Message, CallbackQuery
from typing import Union, Callable
from functools import wraps
from bot.config import Telegram
def verify_user(private: bool = False):
def decorator(func: Callable):
@wraps(func)
async def wrapper(update: NewMessage.Event | CallbackQuery.Event):
if private and not update.is_private:
return
def verify_user(func: Callable):
chat_id = str(update.chat_id)
@wraps(func)
async def decorator(client: Client, update: Union[Message, CallbackQuery]):
chat_id = str(update.from_user.id if update.from_user else update.chat.id)
if not Telegram.ALLOWED_USER_IDS or chat_id in Telegram.ALLOWED_USER_IDS:
return await func(update)
return wrapper
if not Telegram.ALLOWED_USER_IDS or chat_id in Telegram.ALLOWED_USER_IDS:
return await func(client, update)
return decorator

View File

@@ -1,35 +1,17 @@
WelcomeText = \
"""
Hi **%(first_name)s**, send me a file or add me as an admin to any channel to instantly generate file links.
Add me to your channel to instantly generate links for any downloadable media. Once received, I will automatically attach appropriate buttons to the post containing the URL. If you want me to ignore a given post, you can insert `#pass` in the post.
- /start to get this message.
- /info to get user info.
- /log to get bot logs. (admin only!)
Hi **%(first_name)s**, send me a file to instantly generate file links.
"""
UserInfoText = \
PrivacyText = \
"""
**First Name:**
`{sender.first_name}`
**Last Name:**
`{sender.last_name}`
**User ID:**
`{sender.id}`
**Username:**
`@{sender.username}`
This bot securely stores your files to deliver its service.
"""
FileLinksText = \
"""
**Download Link:**
`%(dl_link)s`
**Telegram File:**
`%(tg_link)s`
"""
MediaLinksText = \
@@ -38,8 +20,6 @@ MediaLinksText = \
`%(dl_link)s`
**Stream Link:**
`%(stream_link)s`
**Telegram File:**
`%(tg_link)s`
"""
InvalidQueryText = \
@@ -61,8 +41,3 @@ InvalidPayloadText = \
"""
Invalid payload.
"""
MediaTypeNotSupportedText = \
"""
Sorry, this media type is not supported.
"""

View File

@@ -1,5 +1,4 @@
from telethon.events import NewMessage
from telethon.tl.custom import Message
from hydrogram.types import Message
from datetime import datetime
from mimetypes import guess_type
from bot import TelegramBot
@@ -10,55 +9,47 @@ async def get_message(message_id: int) -> Message | None:
message = None
try:
message = await TelegramBot.get_messages(Telegram.CHANNEL_ID, ids=message_id)
message = await TelegramBot.get_messages(Telegram.CHANNEL_ID, message_ids=message_id)
if message.empty: message = None
except Exception:
pass
return message
async def send_message(message:Message, send_to:int = Telegram.CHANNEL_ID) -> Message:
return await TelegramBot.send_message(entity=send_to, message=message)
async def send_message(msg: Message, send_to: int = Telegram.CHANNEL_ID) -> Message:
return await TelegramBot.send_message(entity=send_to, message=msg)
def filter_files(update: NewMessage.Event | Message):
return bool(
(
update.document
or update.photo
or update.video
or update.video_note
or update.audio
or update.gif
)
and not update.sticker
def get_file_properties(msg: Message):
attributes = (
'document',
'video',
'audio',
'voice',
'photo',
'video_note'
)
for attribute in attributes:
media = getattr(msg, attribute, None)
if media:
file_type = attribute
break
def get_file_properties(message: Message):
file_name = message.file.name
file_size = message.file.size or 0
mime_type = message.file.mime_type
if not media: abort(400, 'Unknown file type.')
file_name = getattr(media, 'file_name', None)
file_size = getattr(media, 'file_size', 0)
if not file_name:
attributes = {
file_format = {
'video': 'mp4',
'audio': 'mp3',
'voice': 'ogg',
'photo': 'jpg',
'video_note': 'mp4'
}
for attribute in attributes:
media = getattr(message, attribute, None)
if media:
file_type, file_format = attribute, attributes[attribute]
break
if not media:
abort(400, 'Invalid media type.')
}.get(file_type)
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
file_name = f'{file_type}-{date}.{file_format}'
if not mime_type:
mime_type = guess_type(file_name)[0] or 'application/octet-stream'
mime_type = guess_type(file_name)[0] or 'application/octet-stream'
return file_name, file_size, mime_type

View File

@@ -1,24 +1,31 @@
from telethon.events import CallbackQuery
from hydrogram.types import CallbackQuery
from bot import TelegramBot
from bot.modules.decorators import verify_user
from bot.modules.static import *
from bot.modules.telegram import get_message
@TelegramBot.on(CallbackQuery(pattern=r'^rm_'))
@verify_user(private=True)
async def delete_file(event: CallbackQuery.Event):
query_data = event.query.data.decode().split('_')
@TelegramBot.on_callback_query()
@verify_user
async def manage_callback(bot, q: CallbackQuery):
query = q.data
if len(query_data) != 3:
return await event.answer(InvalidQueryText, alert=True)
if query.startswith('rm_'):
sq = query.split('_')
message = await get_message(int(query_data[1]))
if len(sq) != 3:
return await q.answer(InvalidQueryText, show_alert=True)
if not message:
return await event.answer(MessageNotExist, alert=True)
if query_data[2] != message.raw_text:
return await event.answer(InvalidQueryText, alert=True)
message = await get_message(int(sq[1]))
await message.delete()
if not message:
return await q.answer(MessageNotExist, show_alert=True)
sc = message.caption.split('/')
return await event.answer(LinkRevokedText, alert=True)
if q.from_user.id != int(sc[1]) or sq[2] != sc[0]:
return await q.answer(InvalidQueryText, show_alert=True)
await message.delete()
await q.answer(LinkRevokedText, show_alert=True)
else:
await q.answer(InvalidQueryText, show_alert=True)

View File

@@ -1,28 +1,18 @@
from telethon import Button
from telethon.events import NewMessage
from telethon.tl.custom.message import Message
from hydrogram import filters
from hydrogram.types import Message
from bot import TelegramBot
from bot.config import Telegram
from bot.modules.static import *
from bot.modules.decorators import verify_user
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/start$'))
@verify_user(private=True)
async def welcome(event: NewMessage.Event | Message):
await event.reply(
message=WelcomeText % {'first_name': event.sender.first_name},
buttons=[
[
Button.url('Add to Channel', f'https://t.me/{Telegram.BOT_USERNAME}?startchannel&admin=post_messages+edit_messages+delete_messages')
]
]
@TelegramBot.on_message(filters.command(['start', 'help']) & filters.private)
@verify_user
async def start_command(_, msg: Message):
await msg.reply(
text=WelcomeText % {'first_name': msg.from_user.first_name},
quote=True
)
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/info$'))
@verify_user(private=True)
async def user_info(event: Message):
await event.reply(UserInfoText.format(sender=event.sender))
@TelegramBot.on(NewMessage(chats=Telegram.OWNER_ID, incoming=True, pattern=r'^/log$'))
async def send_log(event: NewMessage.Event | Message):
await event.reply(file='event-log.txt')
@TelegramBot.on_message(filters.command('privacy') & filters.private)
@verify_user
async def privacy_command(_, msg: Message):
await msg.reply(text=PrivacyText, quote=True)

View File

@@ -1,24 +1,25 @@
from telethon.events import NewMessage
from telethon.tl.custom import Message
from bot import TelegramBot
from bot.modules.decorators import verify_user
from bot.modules.telegram import get_message, send_message
from bot.modules.static import *
# Currently not in use.
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/start file_'))
@verify_user(private=True)
async def send_file(event: NewMessage.Event | Message):
payload = event.raw_text.split()[-1].split('_')
# from hydrogram.types import Message
# from bot import TelegramBot
# from bot.modules.decorators import verify_user
# from bot.modules.telegram import get_message, send_message
# from bot.modules.static import *
if len(payload) != 3:
return await event.reply(InvalidPayloadText)
message = await get_message(int(payload[1]))
# async def deeplinks(msg: Message, payload: str):
# if payload.startswith('file_'):
# sp = payload.split('_')
if not message:
return await event.reply(MessageNotExist)
if payload[2] != message.raw_text:
return await event.reply(InvalidPayloadText)
message.raw_text = ''
await send_message(message, send_to=event.chat_id)
# if len(sp) != 3:
# return await msg.reply(InvalidPayloadText, quote=True)
# message = await get_message(int(sp[1]))
# if not message:
# return await msg.reply(MessageNotExist)
# if sp[2] != message.caption:
# return await msg.reply(InvalidPayloadText, quote=True)
# await message.copy(chat_id=msg.from_user.id, caption="")
# else:
# await msg.reply(InvalidPayloadText, quote=True)

View File

@@ -1,95 +1,60 @@
from telethon import Button
from telethon.events import NewMessage
from telethon.errors import MessageAuthorRequiredError, MessageNotModifiedError, MessageIdInvalidError
from telethon.tl.custom import Message
from hydrogram import filters
from hydrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from secrets import token_hex
from bot import TelegramBot
from bot.config import Telegram, Server
from bot.modules.decorators import verify_user
from bot.modules.telegram import send_message, filter_files
from bot.modules.static import *
@TelegramBot.on(NewMessage(incoming=True, func=filter_files))
@verify_user(private=True)
async def user_file_handler(event: NewMessage.Event | Message):
@TelegramBot.on_message(
filters.private
& (
filters.document
| filters.video
| filters.video_note
| filters.audio
| filters.voice
| filters.photo
)
)
@verify_user
async def handle_user_file(_, msg: Message):
sender_id = msg.from_user.id
secret_code = token_hex(Telegram.SECRET_CODE_LENGTH)
event.message.text = f'`{secret_code}`'
message = await send_message(event.message)
message_id = message.id
file = await msg.copy(
chat_id=Telegram.CHANNEL_ID,
caption=f'||{secret_code}/{sender_id}||'
)
file_id = file.id
dl_link = f'{Server.BASE_URL}/dl/{file_id}?code={secret_code}'
dl_link = f'{Server.BASE_URL}/dl/{message_id}?code={secret_code}'
tg_link = f'{Server.BASE_URL}/file/{message_id}?code={secret_code}'
deep_link = f'https://t.me/{Telegram.BOT_USERNAME}?start=file_{message_id}_{secret_code}'
if (event.document and 'video' in event.document.mime_type) or event.video:
stream_link = f'{Server.BASE_URL}/stream/{message_id}?code={secret_code}'
await event.reply(
message= MediaLinksText % {'dl_link': dl_link, 'tg_link': tg_link, 'tg_link': tg_link, 'stream_link': stream_link},
buttons=[
if (msg.document and 'video' in msg.document.mime_type) or msg.video:
stream_link = f'{Server.BASE_URL}/stream/{file_id}?code={secret_code}'
await msg.reply(
text=MediaLinksText % {'dl_link': dl_link, 'stream_link': stream_link},
quote=True,
reply_markup=InlineKeyboardMarkup(
[
Button.url('Download', dl_link),
Button.url('Stream', stream_link)
],
[
Button.url('Get File', deep_link),
Button.inline('Revoke', f'rm_{message_id}_{secret_code}')
]
]
)
else:
await event.reply(
message=FileLinksText % {'dl_link': dl_link, 'tg_link': tg_link},
buttons=[
[
Button.url('Download', dl_link),
Button.url('Get File', deep_link)
],
[
Button.inline('Revoke', f'rm_{message_id}_{secret_code}')
]
]
)
@TelegramBot.on(NewMessage(incoming=True, func=filter_files, forwards=False))
@verify_user()
async def channel_file_handler(event: NewMessage.Event | Message):
if event.raw_text and '#pass' in event.raw_text:
return
secret_code = token_hex(Telegram.SECRET_CODE_LENGTH)
event.message.text = f"`{secret_code}`"
message = await send_message(event.message)
message_id = message.id
dl_link = f"{Server.BASE_URL}/dl/{message_id}?code={secret_code}"
tg_link = f"{Server.BASE_URL}/file/{message_id}?code={secret_code}"
if (event.document and "video" in event.document.mime_type) or event.video:
stream_link = f"{Server.BASE_URL}/stream/{message_id}?code={secret_code}"
try:
await event.edit(
buttons=[
[Button.url("Download", dl_link), Button.url("Stream", stream_link)],
[Button.url("Get File", tg_link)],
[
InlineKeyboardButton('Download', url=dl_link),
InlineKeyboardButton('Stream', url=stream_link)
],
[
InlineKeyboardButton('Revoke', callback_data=f'rm_{file_id}_{secret_code}')
]
]
)
except (
MessageAuthorRequiredError,
MessageIdInvalidError,
MessageNotModifiedError,
):
pass
)
else:
try:
await event.edit(
buttons=[
[Button.url("Download", dl_link), Button.url("Get File", tg_link)]
await msg.reply(
text=FileLinksText % {'dl_link': dl_link},
quote=True,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton('Download', url=dl_link),
InlineKeyboardButton('Revoke', callback_data=f'rm_{file_id}_{secret_code}')
]
]
)
except (
MessageAuthorRequiredError,
MessageIdInvalidError,
MessageNotModifiedError,
):
pass
)

View File

@@ -2,12 +2,12 @@ from quart import Quart
from uvicorn import Server as UvicornServer, Config
from logging import getLogger
from bot.config import Server, LOGGER_CONFIG_JSON
from . import main, error
logger = getLogger('uvicorn')
instance = Quart(__name__)
instance.config['RESPONSE_TIMEOUT'] = None
instance.config['MAX_CONTENT_LENGTH'] = 999999999999999
@instance.before_serving
async def before_serve():

View File

@@ -1,8 +1,9 @@
from quart import Blueprint, Response, request, render_template, redirect
from math import ceil
from re import match as re_match
from .error import abort
from bot import TelegramBot
from bot.config import Telegram, Server
from math import ceil, floor
from bot.modules.telegram import get_message, get_file_properties
bp = Blueprint('main', __name__)
@@ -13,73 +14,70 @@ async def home():
@bp.route('/dl/<int:file_id>')
async def transmit_file(file_id):
file = await get_message(message_id=int(file_id)) or abort(404)
file = await get_message(file_id) or abort(404)
code = request.args.get('code') or abort(401)
range_header = request.headers.get('Range', 0)
range_header = request.headers.get('Range')
if code != file.raw_text:
if code != file.caption.split('/')[0]:
abort(403)
file_name, file_size, mime_type = get_file_properties(file)
start = 0
end = file_size - 1
chunk_size = 1 * 1024 * 1024 # 1 MB
if range_header:
from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
from_bytes = int(from_bytes)
until_bytes = int(until_bytes) if until_bytes else file_size - 1
else:
from_bytes = 0
until_bytes = file_size - 1
range_match = re_match(r'bytes=(\d+)-(\d*)', range_header)
if range_match:
start = int(range_match.group(1))
end = int(range_match.group(2)) if range_match.group(2) else file_size - 1
if start > end or start >= file_size:
abort(416, 'Requested range not satisfiable')
else:
abort(400, 'Invalid Range header')
if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
abort(416, 'Invalid range.')
offset_chunks = start // chunk_size
total_bytes_to_stream = end - start + 1
chunks_to_stream = ceil(total_bytes_to_stream / chunk_size)
chunk_size = 1024 * 1024
until_bytes = min(until_bytes, file_size - 1)
offset = from_bytes - (from_bytes % chunk_size)
first_part_cut = from_bytes - offset
last_part_cut = until_bytes % chunk_size + 1
req_length = until_bytes - from_bytes + 1
part_count = ceil(until_bytes / chunk_size) - floor(offset / chunk_size)
content_length = total_bytes_to_stream
headers = {
"Content-Type": f"{mime_type}",
"Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
"Content-Length": str(req_length),
"Content-Disposition": f'attachment; filename="{file_name}"',
"Accept-Ranges": "bytes",
}
'Content-Type': mime_type,
'Content-Disposition': f'attachment; filename={file_name}',
'Content-Range': f'bytes {start}-{end}/{file_size}',
'Accept-Ranges': 'bytes',
'Content-Length': str(content_length),
}
status_code = 206 if range_header else 200
async def file_generator():
current_part = 1
async for chunk in TelegramBot.iter_download(file, offset=offset, chunk_size=chunk_size, stride=chunk_size, file_size=file_size):
if not chunk:
break
elif part_count == 1:
yield chunk[first_part_cut:last_part_cut]
elif current_part == 1:
yield chunk[first_part_cut:]
elif current_part == part_count:
yield chunk[:last_part_cut]
else:
yield chunk
async def file_stream():
bytes_streamed = 0
chunk_index = 0
async for chunk in TelegramBot.stream_media(
file,
offset=offset_chunks,
limit=chunks_to_stream,
):
if chunk_index == 0: # Trim the first chunk if necessary
trim_start = start % chunk_size
if trim_start > 0:
chunk = chunk[trim_start:]
current_part += 1
if current_part > part_count:
remaining_bytes = content_length - bytes_streamed
if remaining_bytes <= 0:
break
return Response(file_generator(), headers=headers, status=206 if range_header else 200)
if len(chunk) > remaining_bytes: # Trim the last chunk if necessary
chunk = chunk[:remaining_bytes]
yield chunk
bytes_streamed += len(chunk)
chunk_index += 1
return Response(file_stream(), headers=headers, status=status_code)
@bp.route('/stream/<int:file_id>')
async def stream_file(file_id):
code = request.args.get('code') or abort(401)
return await render_template('player.html', mediaLink=f'{Server.BASE_URL}/dl/{file_id}?code={code}')
@bp.route('/file/<int:file_id>')
async def file_deeplink(file_id):
code = request.args.get('code') or abort(401)
return redirect(f'https://t.me/{Telegram.BOT_USERNAME}?start=file_{file_id}_{code}')

View File

@@ -1,4 +1,4 @@
telethon
cryptg
hydrogram
tgcrypto
quart
uvicorn