Files
FileStreamBot/FileStream/bot/plugins/start.py
2024-01-06 15:16:14 +05:30

143 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import math
from FileStream import __version__
from FileStream.bot import FileStream
from FileStream.server.exceptions import FIleNotFound
from FileStream.utils.bot_utils import gen_linkx, verify_user
from FileStream.config import Telegram
from FileStream.utils.database import Database
from FileStream.utils.translation import LANG, BUTTON
from pyrogram import filters, Client
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message
from pyrogram.enums.parse_mode import ParseMode
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
@FileStream.on_message(filters.command('start') & filters.private)
async def start(bot: Client, message: Message):
if not await verify_user(bot, message):
return
usr_cmd = message.text.split("_")[-1]
if usr_cmd == "/start":
if Telegram.START_PIC:
await message.reply_photo(
photo=Telegram.START_PIC,
caption=LANG.START_TEXT.format(message.from_user.mention, FileStream.username),
parse_mode=ParseMode.HTML,
reply_markup=BUTTON.START_BUTTONS
)
else:
await message.reply_text(
text=LANG.START_TEXT.format(message.from_user.mention, FileStream.username),
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=BUTTON.START_BUTTONS
)
else:
if "stream_" in message.text:
try:
file_check = await db.get_file(usr_cmd)
file_id = str(file_check['_id'])
if file_id == usr_cmd:
reply_markup, stream_text = await gen_linkx(m=message, _id=file_id,
name=[FileStream.username, FileStream.fname])
await message.reply_text(
text=stream_text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=reply_markup,
quote=True
)
except FIleNotFound as e:
await message.reply_text("File Not Found")
except Exception as e:
await message.reply_text("Something Went Wrong")
logging.error(e)
elif "file_" in message.text:
try:
file_check = await db.get_file(usr_cmd)
db_id = str(file_check['_id'])
file_id = file_check['file_id']
file_name = file_check['file_name']
if db_id == usr_cmd:
await message.reply_cached_media(file_id=file_id, caption=f'**{file_name}**')
except FIleNotFound as e:
await message.reply_text("**File Not Found**")
except Exception as e:
await message.reply_text("Something Went Wrong")
logging.error(e)
else:
await message.reply_text(f"**Invalid Command**")
@FileStream.on_message(filters.private & filters.command(["about"]))
async def start(bot, message):
if not await verify_user(bot, message):
return
if Telegram.START_PIC:
await message.reply_photo(
photo=Telegram.START_PIC,
caption=LANG.ABOUT_TEXT.format(FileStream.fname, __version__),
parse_mode=ParseMode.HTML,
reply_markup=BUTTON.ABOUT_BUTTONS
)
else:
await message.reply_text(
text=LANG.ABOUT_TEXT.format(FileStream.fname, __version__),
disable_web_page_preview=True,
reply_markup=BUTTON.ABOUT_BUTTONS
)
@FileStream.on_message((filters.command('help')) & filters.private)
async def help_handler(bot, message):
if not await verify_user(bot, message):
return
if Telegram.START_PIC:
await message.reply_photo(
photo=Telegram.START_PIC,
caption=LANG.HELP_TEXT.format(Telegram.OWNER_ID),
parse_mode=ParseMode.HTML,
reply_markup=BUTTON.HELP_BUTTONS
)
else:
await message.reply_text(
text=LANG.HELP_TEXT.format(Telegram.OWNER_ID),
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=BUTTON.HELP_BUTTONS
)
# ---------------------------------------------------------------------------------------------------
@FileStream.on_message(filters.command('files') & filters.private)
async def my_files(bot: Client, message: Message):
if not await verify_user(bot, message):
return
user_files, total_files = await db.find_files(message.from_user.id, [1, 10])
file_list = []
async for x in user_files:
file_list.append([InlineKeyboardButton(x["file_name"], callback_data=f"myfile_{x['_id']}_{1}")])
if total_files > 10:
file_list.append(
[
InlineKeyboardButton("", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", callback_data="N/A"),
InlineKeyboardButton("", callback_data="userfiles_2")
],
)
if not file_list:
file_list.append(
[InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")],
)
file_list.append([InlineKeyboardButton("ʟsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))