# © @AvishkarPatil [ Telegram ]
from WebStreamer.bot import StreamBot
from WebStreamer.vars import Var
from WebStreamer.utils.human_readable import humanbytes
from WebStreamer.utils.database import Database
from pyrogram import filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from pyrogram.errors import UserNotParticipant
db = Database(Var.DATABASE_URL, Var.SESSION_NAME)
START_TEXT = """
👋 Hᴇʏ,{}\n
I'ᴍ Tᴇʟᴇɢʀᴀᴍ Fɪʟᴇs Sᴛʀᴇᴀᴍɪɴɢ Bᴏᴛ ᴀs ᴡᴇʟʟ Dɪʀᴇᴄᴛ Lɪɴᴋs Gᴇɴᴇʀᴀᴛᴇ\n
Cʟɪᴄᴋ ᴏɴ Hᴇʟᴘ ᴛᴏ ɢᴇᴛ ᴍᴏʀᴇ ɪɴғᴏʀᴍᴀᴛɪᴏɴ\n
𝗪𝗔𝗥𝗡𝗜𝗡𝗚 🚸
🔞 Pʀᴏɴ ᴄᴏɴᴛᴇɴᴛꜱ ʟᴇᴀᴅꜱ ᴛᴏ ᴘᴇʀᴍᴀɴᴇɴᴛ ʙᴀɴ ʏᴏᴜ.\n\n
🍃 Bᴏᴛ Mᴀɪɴᴛᴀɪɴᴇᴅ Bʏ :@AvishkarPatil"""
HELP_TEXT = """
- Sᴇɴᴅ ᴍᴇ ᴀɴʏ ꜰɪʟᴇ (ᴏʀ) ᴍᴇᴅɪᴀ ꜰʀᴏᴍ ᴛᴇʟᴇɢʀᴀᴍ.
- I ᴡɪʟʟ ᴘʀᴏᴠɪᴅᴇ ᴇxᴛᴇʀɴᴀʟ ᴅɪʀᴇᴄᴛ ᴅᴏᴡɴʟᴏᴀᴅ ʟɪɴᴋ !.
- Aᴅᴅ Mᴇ ɪɴ ʏᴏᴜʀ Cʜᴀɴɴᴇʟ Fᴏʀ Dɪʀᴇᴄᴛ Dᴏᴡɴʟᴏᴀᴅ Lɪɴᴋs Bᴜᴛᴛᴏɴ
- Tʜɪs Pᴇʀᴍᴇᴀɴᴛ Lɪɴᴋ Wɪᴛʜ Fᴀsᴛᴇsᴛ Sᴘᴇᴇᴅ\n
🔸 𝗪𝗔𝗥𝗡𝗜𝗡𝗚 🚸\n
🔞 Pʀᴏɴ ᴄᴏɴᴛᴇɴᴛꜱ ʟᴇᴀᴅꜱ ᴛᴏ ᴘᴇʀᴍᴀɴᴇɴᴛ ʙᴀɴ ʏᴏᴜ.\n
Cᴏɴᴛᴀᴄᴛ ᴅᴇᴠᴇʟᴏᴘᴇʀ (ᴏʀ) ʀᴇᴘᴏʀᴛ ʙᴜɢꜱ : [ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]"""
ABOUT_TEXT = """
⚜ Mʏ ɴᴀᴍᴇ : FileStreamX\n
🔸Vᴇʀꜱɪᴏɴ : 3.0.1\n
🔹Sᴏᴜʀᴄᴇ : Cʟɪᴄᴋ Hᴇʀᴇ\n
🔸GitHub : Fᴏʟʟᴏᴡ\n
🔹Dᴇᴠᴇʟᴏᴘᴇʀ : Aᴠɪsʜᴋᴀʀ Pᴀᴛɪʟ\n
🔸Lᴀꜱᴛ ᴜᴘᴅᴀᴛᴇᴅ : [ 01-02-21 ] 05:05 PM"""
START_BUTTONS = InlineKeyboardMarkup(
[[
InlineKeyboardButton('Hᴇʟᴘ', callback_data='help'),
InlineKeyboardButton('Aʙᴏᴜᴛ', callback_data='about'),
InlineKeyboardButton('Cʟᴏsᴇ', callback_data='close')
]]
)
HELP_BUTTONS = InlineKeyboardMarkup(
[[
InlineKeyboardButton('Hᴏᴍᴇ', callback_data='home'),
InlineKeyboardButton('Aʙᴏᴜᴛ', callback_data='about'),
InlineKeyboardButton('Cʟᴏsᴇ', callback_data='close')
]]
)
ABOUT_BUTTONS = InlineKeyboardMarkup(
[[
InlineKeyboardButton('Hᴏᴍᴇ', callback_data='home'),
InlineKeyboardButton('Hᴇʟᴘ', callback_data='help'),
InlineKeyboardButton('Cʟᴏsᴇ', callback_data='close')
]]
)
@StreamBot.on_callback_query()
async def cb_data(bot, update):
if update.data == "home":
await update.message.edit_text(
text=START_TEXT.format(update.from_user.mention),
disable_web_page_preview=True,
reply_markup=START_BUTTONS
)
elif update.data == "help":
await update.message.edit_text(
text=HELP_TEXT,
disable_web_page_preview=True,
reply_markup=HELP_BUTTONS
)
elif update.data == "about":
await update.message.edit_text(
text=ABOUT_TEXT,
disable_web_page_preview=True,
reply_markup=ABOUT_BUTTONS
)
else:
await update.message.delete()
@StreamBot.on_message(filters.command('start') & filters.private & ~filters.edited)
async def start(b, m):
if not await db.is_user_exist(m.from_user.id):
await db.add_user(m.from_user.id)
await b.send_message(
Var.BIN_CHANNEL,
f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ:** \n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{m.from_user.first_name}](tg://user?id={m.from_user.id}) __Sᴛᴀʀᴛᴇᴅ Yᴏᴜʀ Bᴏᴛ !!__"
)
usr_cmd = m.text.split("_")[-1]
if usr_cmd == "/start":
if Var.UPDATES_CHANNEL != "None":
try:
user = await b.get_chat_member(Var.UPDATES_CHANNEL, m.chat.id)
if user.status == "kicked":
await b.send_message(
chat_id=m.chat.id,
text="__Sᴏʀʀʏ Sɪʀ, Yᴏᴜ ᴀʀᴇ Bᴀɴɴᴇᴅ ᴛᴏ ᴜsᴇ ᴍᴇ. Cᴏɴᴛᴀᴄᴛ ᴛʜᴇ Dᴇᴠᴇʟᴏᴘᴇʀ__\n\n @AvishkarPatil **Tʜᴇʏ Wɪʟʟ Hᴇʟᴘ Yᴏᴜ**",
parse_mode="markdown",
disable_web_page_preview=True
)
return
except UserNotParticipant:
await b.send_message(
chat_id=m.chat.id,
text="Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton("Jᴏɪɴ ɴᴏᴡ 🔓", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
]]
),
parse_mode="HTML"
)
return
except Exception:
await b.send_message(
chat_id=m.chat.id,
text="Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ [ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]",
parse_mode="HTML",
disable_web_page_preview=True)
return
await m.reply_text(
text=START_TEXT.format(m.from_user.mention),
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=START_BUTTONS
)
else:
if Var.UPDATES_CHANNEL != "None":
try:
user = await b.get_chat_member(Var.UPDATES_CHANNEL, m.chat.id)
if user.status == "kicked":
await b.send_message(
chat_id=m.chat.id,
text="**Sᴏʀʀʏ Sɪʀ, Yᴏᴜ ᴀʀᴇ Bᴀɴɴᴇᴅ ᴛᴏ ᴜsᴇ ᴍᴇ. Qᴜɪᴄᴋʟʏ ᴄᴏɴᴛᴀᴄᴛ** @Avishkarpatil",
parse_mode="markdown",
disable_web_page_preview=True
)
return
except UserNotParticipant:
await b.send_message(
chat_id=m.chat.id,
text="**Pʟᴇᴀsᴇ Jᴏɪɴ Mʏ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴛʜɪs Bᴏᴛ**!\n\n**Dᴜᴇ ᴛᴏ Oᴠᴇʀʟᴏᴀᴅ, Oɴʟʏ Cʜᴀɴɴᴇʟ Sᴜʙsᴄʀɪʙᴇʀs ᴄᴀɴ ᴜsᴇ ᴛʜᴇ Bᴏᴛ**!",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton("🤖 Jᴏɪɴ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
]]
),
parse_mode="markdown"
)
return
except Exception:
await b.send_message(
chat_id=m.chat.id,
text="**Sᴏᴍᴇᴛʜɪɴɢ ᴡᴇɴᴛ Wʀᴏɴɢ. Cᴏɴᴛᴀᴄᴛ ᴍᴇ** [Aᴠɪsʜᴋᴀʀ Pᴀᴛɪʟ](https://t.me/Avishkarpatil).",
parse_mode="markdown",
disable_web_page_preview=True)
return
get_msg = await b.get_messages(chat_id=Var.BIN_CHANNEL, message_ids=int(usr_cmd))
file_size = None
if get_msg.video:
file_size = f"{humanbytes(get_msg.video.file_size)}"
elif get_msg.document:
file_size = f"{humanbytes(get_msg.document.file_size)}"
elif get_msg.audio:
file_size = f"{humanbytes(get_msg.audio.file_size)}"
file_name = None
if get_msg.video:
file_name = f"{get_msg.video.file_name}"
elif get_msg.document:
file_name = f"{get_msg.document.file_name}"
elif get_msg.audio:
file_name = f"{get_msg.audio.file_name}"
stream_link = "https://{}/{}".format(Var.FQDN, get_msg.message_id) if Var.ON_HEROKU or Var.NO_PORT else \
"http://{}:{}/{}".format(Var.FQDN,
Var.PORT,
get_msg.message_id)
msg_text ="""
𝗬𝗼𝘂𝗿 𝗟𝗶𝗻𝗸 𝗚𝗲𝗻𝗲𝗿𝗮𝘁𝗲𝗱 !
📂 Fɪʟᴇ ɴᴀᴍᴇ : {}
📦 Fɪʟᴇ ꜱɪᴢᴇ : {}
📥 Dᴏᴡɴʟᴏᴀᴅ : {}
🚸 Nᴏᴛᴇ : Lɪɴᴋ ᴇxᴘɪʀᴇᴅ ɪɴ 24 ʜᴏᴜʀꜱ
🍃 Bᴏᴛ Mᴀɪɴᴛᴀɪɴᴇᴅ Bʏ : @AvishkarPatil
"""
await m.reply_text(
text=msg_text.format(file_name, file_size, stream_link),
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Dᴏᴡɴʟᴏᴀᴅ ɴᴏᴡ 📥", url=stream_link)]])
)
@StreamBot.on_message(filters.private & filters.command(["about"]))
async def start(bot, update):
await update.reply_text(
text=ABOUT_TEXT.format(update.from_user.mention),
disable_web_page_preview=True,
reply_markup=ABOUT_BUTTONS
)
@StreamBot.on_message(filters.command('help') & filters.private & ~filters.edited)
async def help_handler(bot, message):
if not await db.is_user_exist(message.from_user.id):
await db.add_user(message.from_user.id)
await bot.send_message(
Var.BIN_CHANNEL,
f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ **\n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{message.from_user.first_name}](tg://user?id={message.from_user.id}) __Started Your Bot !!__"
)
if Var.UPDATES_CHANNEL is not None:
try:
user = await bot.get_chat_member(Var.UPDATES_CHANNEL, message.chat.id)
if user.status == "kicked":
await bot.send_message(
chat_id=message.chat.id,
text="Sᴏʀʀʏ Sɪʀ, Yᴏᴜ ᴀʀᴇ Bᴀɴɴᴇᴅ ᴛᴏ ᴜsᴇ ᴍᴇ. Cᴏɴᴛᴀᴄᴛ ᴛʜᴇ Dᴇᴠᴇʟᴏᴘᴇʀ",
parse_mode="HTML",
disable_web_page_preview=True
)
return
except UserNotParticipant:
await bot.send_message(
chat_id=message.chat.id,
text="**Pʟᴇᴀsᴇ Jᴏɪɴ Mʏ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴛʜɪs Bᴏᴛ!**\n\n__Dᴜᴇ ᴛᴏ Oᴠᴇʀʟᴏᴀᴅ, Oɴʟʏ Cʜᴀɴɴᴇʟ Sᴜʙsᴄʀɪʙᴇʀs ᴄᴀɴ ᴜsᴇ ᴛʜᴇ Bᴏᴛ!__",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton("🤖 Jᴏɪɴ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
]]
),
parse_mode="markdown"
)
return
except Exception:
await bot.send_message(
chat_id=message.chat.id,
text="__Sᴏᴍᴇᴛʜɪɴɢ ᴡᴇɴᴛ Wʀᴏɴɢ. Cᴏɴᴛᴀᴄᴛ ᴍᴇ__ [Aᴠɪsʜᴋᴀʀ Pᴀᴛɪʟ](https://t.me/Avishkarpatil).",
parse_mode="markdown",
disable_web_page_preview=True)
return
await message.reply_text(
text=HELP_BUTTONS,
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=HELP_BUTTONS
)