mirror of
https://github.com/avipatilpro/FileStreamBot.git
synced 2026-01-15 14:22:53 -03:00
262 lines
12 KiB
Python
262 lines
12 KiB
Python
import urllib.parse
|
||
from WebStreamer.bot import StreamBot
|
||
from WebStreamer.vars import Var
|
||
from WebStreamer.utils.human_readable import humanbytes
|
||
from WebStreamer.utils.database import Database
|
||
from pyrogram import filters
|
||
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
|
||
from pyrogram.errors import UserNotParticipant
|
||
from pyrogram.enums.parse_mode import ParseMode
|
||
|
||
db = Database(Var.DATABASE_URL, Var.SESSION_NAME)
|
||
|
||
START_TEXT = """
|
||
<i>👋 Hᴇʏ,</i>{}\n
|
||
<i>I'ᴍ Tᴇʟᴇɢʀᴀᴍ Fɪʟᴇs Sᴛʀᴇᴀᴍɪɴɢ Bᴏᴛ ᴀs ᴡᴇʟʟ Dɪʀᴇᴄᴛ Lɪɴᴋs Gᴇɴᴇʀᴀᴛᴇ</i>\n
|
||
<i>Cʟɪᴄᴋ ᴏɴ Hᴇʟᴘ ᴛᴏ ɢᴇᴛ ᴍᴏʀᴇ ɪɴғᴏʀᴍᴀᴛɪᴏɴ</i>\n
|
||
<i><u>𝗪𝗔𝗥𝗡𝗜𝗡𝗚 🚸</u></i>
|
||
<b>🔞 Pʀᴏɴ ᴄᴏɴᴛᴇɴᴛꜱ ʟᴇᴀᴅꜱ ᴛᴏ ᴘᴇʀᴍᴀɴᴇɴᴛ ʙᴀɴ ʏᴏᴜ.</b>\n\n
|
||
<i><b>🍃 Bᴏᴛ Mᴀɪɴᴛᴀɪɴᴇᴅ Bʏ :</b>@AvishkarPatil</i>"""
|
||
|
||
HELP_TEXT = """
|
||
<i>- Sᴇɴᴅ ᴍᴇ ᴀɴʏ ꜰɪʟᴇ (ᴏʀ) ᴍᴇᴅɪᴀ ꜰʀᴏᴍ ᴛᴇʟᴇɢʀᴀᴍ.</i>
|
||
<i>- I ᴡɪʟʟ ᴘʀᴏᴠɪᴅᴇ ᴇxᴛᴇʀɴᴀʟ ᴅɪʀᴇᴄᴛ ᴅᴏᴡɴʟᴏᴀᴅ ʟɪɴᴋ !.</i>
|
||
<i>- Aᴅᴅ Mᴇ ɪɴ ʏᴏᴜʀ Cʜᴀɴɴᴇʟ Fᴏʀ Dɪʀᴇᴄᴛ Dᴏᴡɴʟᴏᴀᴅ Lɪɴᴋs Bᴜᴛᴛᴏɴ</i>
|
||
<i>- Tʜɪs Pᴇʀᴍᴇᴀɴᴛ Lɪɴᴋ Wɪᴛʜ Fᴀsᴛᴇsᴛ Sᴘᴇᴇᴅ</i>\n
|
||
<u>🔸 𝗪𝗔𝗥𝗡𝗜𝗡𝗚 🚸</u>\n
|
||
<b>🔞 Pʀᴏɴ ᴄᴏɴᴛᴇɴᴛꜱ ʟᴇᴀᴅꜱ ᴛᴏ ᴘᴇʀᴍᴀɴᴇɴᴛ ʙᴀɴ ʏᴏᴜ.</b>\n
|
||
<i>Cᴏɴᴛᴀᴄᴛ ᴅᴇᴠᴇʟᴏᴘᴇʀ (ᴏʀ) ʀᴇᴘᴏʀᴛ ʙᴜɢꜱ</i> <b>: <a href='https://t.me/Avishkarpatil'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>"""
|
||
|
||
ABOUT_TEXT = """
|
||
<b>⚜ Mʏ ɴᴀᴍᴇ : FileStreamX</b>\n
|
||
<b>🔸Vᴇʀꜱɪᴏɴ : <a href='https://telegram.me/AvishkarPatil'>3.0.1</a></b>\n
|
||
<b>🔹Sᴏᴜʀᴄᴇ : <a href='https://github.com/avipatilpro/FileStreamBot'>Cʟɪᴄᴋ Hᴇʀᴇ</a></b>\n
|
||
<b>🔸GitHub : <a href='https://GitHub.com/avipatilpro'>Fᴏʟʟᴏᴡ</a></b>\n
|
||
<b>🔹Dᴇᴠᴇʟᴏᴘᴇʀ : <a href='https://telegram.me/Avishkarpatil'>Aᴠɪsʜᴋᴀʀ Pᴀᴛɪʟ</a></b>\n
|
||
<b>🔸Lᴀꜱᴛ ᴜᴘᴅᴀᴛᴇᴅ : <a href='https://telegram.me/AvishkarPatil'>[ 26 - ᴊᴜɴᴇ - 2022 ] 03:35 ᴀᴍ</a></b>"""
|
||
|
||
START_BUTTONS = InlineKeyboardMarkup(
|
||
[[
|
||
InlineKeyboardButton('Hᴇʟᴘ', callback_data='help'),
|
||
InlineKeyboardButton('Aʙᴏᴜᴛ', callback_data='about'),
|
||
InlineKeyboardButton('Cʟᴏsᴇ', callback_data='close')
|
||
]]
|
||
)
|
||
HELP_BUTTONS = InlineKeyboardMarkup(
|
||
[[
|
||
InlineKeyboardButton('Hᴏᴍᴇ', callback_data='home'),
|
||
InlineKeyboardButton('Aʙᴏᴜᴛ', callback_data='about'),
|
||
InlineKeyboardButton('Cʟᴏsᴇ', callback_data='close')
|
||
]]
|
||
)
|
||
ABOUT_BUTTONS = InlineKeyboardMarkup(
|
||
[[
|
||
InlineKeyboardButton('Hᴏᴍᴇ', callback_data='home'),
|
||
InlineKeyboardButton('Hᴇʟᴘ', callback_data='help'),
|
||
InlineKeyboardButton('Cʟᴏsᴇ', callback_data='close')
|
||
]]
|
||
)
|
||
|
||
@StreamBot.on_callback_query()
|
||
async def cb_data(bot, update):
|
||
if update.data == "home":
|
||
await update.message.edit_text(
|
||
text=START_TEXT.format(update.from_user.mention),
|
||
disable_web_page_preview=True,
|
||
reply_markup=START_BUTTONS
|
||
)
|
||
elif update.data == "help":
|
||
await update.message.edit_text(
|
||
text=HELP_TEXT,
|
||
disable_web_page_preview=True,
|
||
reply_markup=HELP_BUTTONS
|
||
)
|
||
elif update.data == "about":
|
||
await update.message.edit_text(
|
||
text=ABOUT_TEXT,
|
||
disable_web_page_preview=True,
|
||
reply_markup=ABOUT_BUTTONS
|
||
)
|
||
else:
|
||
await update.message.delete()
|
||
|
||
def get_media_file_size(m):
|
||
media = m.video or m.audio or m.document
|
||
if media and media.file_size:
|
||
return media.file_size
|
||
else:
|
||
return None
|
||
|
||
|
||
def get_media_file_name(m):
|
||
media = m.video or m.document or m.audio
|
||
if media and media.file_name:
|
||
return urllib.parse.quote_plus(media.file_name)
|
||
else:
|
||
return None
|
||
|
||
|
||
@StreamBot.on_message(filters.command('start') & filters.private)
|
||
async def start(b, m):
|
||
if not await db.is_user_exist(m.from_user.id):
|
||
await db.add_user(m.from_user.id)
|
||
await b.send_message(
|
||
Var.BIN_CHANNEL,
|
||
f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ:** \n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{m.from_user.first_name}](tg://user?id={m.from_user.id}) __Sᴛᴀʀᴛᴇᴅ Yᴏᴜʀ Bᴏᴛ !!__"
|
||
)
|
||
usr_cmd = m.text.split("_")[-1]
|
||
if usr_cmd == "/start":
|
||
if Var.UPDATES_CHANNEL != "None":
|
||
try:
|
||
user = await b.get_chat_member(Var.UPDATES_CHANNEL, m.chat.id)
|
||
if user.status == "kicked":
|
||
await b.send_message(
|
||
chat_id=m.chat.id,
|
||
text="__Sᴏʀʀʏ Sɪʀ, Yᴏᴜ ᴀʀᴇ Bᴀɴɴᴇᴅ ᴛᴏ ᴜsᴇ ᴍᴇ. Cᴏɴᴛᴀᴄᴛ ᴛʜᴇ Dᴇᴠᴇʟᴏᴘᴇʀ__\n\n @AvishkarPatil **Tʜᴇʏ Wɪʟʟ Hᴇʟᴘ Yᴏᴜ**",
|
||
parse_mode=ParseMode.MARKDOWN,
|
||
disable_web_page_preview=True
|
||
)
|
||
return
|
||
except UserNotParticipant:
|
||
await b.send_message(
|
||
chat_id=m.chat.id,
|
||
text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[
|
||
InlineKeyboardButton("Jᴏɪɴ ɴᴏᴡ 🔓", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
|
||
]]
|
||
),
|
||
parse_mode=ParseMode.HTML
|
||
)
|
||
return
|
||
except Exception:
|
||
await b.send_message(
|
||
chat_id=m.chat.id,
|
||
text="<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='http://t.me/Avishkarpatil'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>",
|
||
parse_mode=ParseMode.HTML,
|
||
disable_web_page_preview=True)
|
||
return
|
||
await m.reply_text(
|
||
text=START_TEXT.format(m.from_user.mention),
|
||
parse_mode=ParseMode.HTML,
|
||
disable_web_page_preview=True,
|
||
reply_markup=START_BUTTONS
|
||
)
|
||
|
||
|
||
else:
|
||
if Var.UPDATES_CHANNEL != "None":
|
||
try:
|
||
user = await b.get_chat_member(Var.UPDATES_CHANNEL, m.chat.id)
|
||
if user.status == "kicked":
|
||
await b.send_message(
|
||
chat_id=m.chat.id,
|
||
text="**Sᴏʀʀʏ Sɪʀ, Yᴏᴜ ᴀʀᴇ Bᴀɴɴᴇᴅ ᴛᴏ ᴜsᴇ ᴍᴇ. Qᴜɪᴄᴋʟʏ ᴄᴏɴᴛᴀᴄᴛ** @Avishkarpatil",
|
||
parse_mode=ParseMode.MARKDOWN,
|
||
disable_web_page_preview=True
|
||
)
|
||
return
|
||
except UserNotParticipant:
|
||
await b.send_message(
|
||
chat_id=m.chat.id,
|
||
text="**Pʟᴇᴀsᴇ Jᴏɪɴ Mʏ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴛʜɪs Bᴏᴛ**!\n\n**Dᴜᴇ ᴛᴏ Oᴠᴇʀʟᴏᴀᴅ, Oɴʟʏ Cʜᴀɴɴᴇʟ Sᴜʙsᴄʀɪʙᴇʀs ᴄᴀɴ ᴜsᴇ ᴛʜᴇ Bᴏᴛ**!",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[
|
||
InlineKeyboardButton("🤖 Jᴏɪɴ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ", url=f"https://t.me/{Var.UPDATES_CHANNEL}")],
|
||
[InlineKeyboardButton("🔄 Refresh / Try Again", url=f"https://t.me/{(await b.get_me()).username}?start=AvishkarPatil_{usr_cmd}")
|
||
|
||
]]
|
||
),
|
||
parse_mode=ParseMode.MARKDOWN
|
||
)
|
||
return
|
||
except Exception:
|
||
await b.send_message(
|
||
chat_id=m.chat.id,
|
||
text="**Sᴏᴍᴇᴛʜɪɴɢ ᴡᴇɴᴛ Wʀᴏɴɢ. Cᴏɴᴛᴀᴄᴛ ᴍᴇ** [Aᴠɪsʜᴋᴀʀ Pᴀᴛɪʟ](https://t.me/Avishkarpatil).",
|
||
parse_mode=ParseMode.MARKDOWN,
|
||
disable_web_page_preview=True)
|
||
return
|
||
|
||
get_msg = await b.get_messages(chat_id=Var.BIN_CHANNEL, message_ids=int(usr_cmd))
|
||
file_name = get_media_file_name(get_msg)
|
||
file_size = humanbytes(get_media_file_size(get_msg))
|
||
|
||
stream_link = "https://{}/{}/{}".format(Var.FQDN, get_msg.id, file_name) if Var.ON_HEROKU or Var.NO_PORT else \
|
||
"http://{}:{}/{}/{}".format(Var.FQDN,
|
||
Var.PORT,
|
||
get_msg.id,
|
||
file_name)
|
||
|
||
msg_text ="""
|
||
<i><u>𝗬𝗼𝘂𝗿 𝗟𝗶𝗻𝗸 𝗚𝗲𝗻𝗲𝗿𝗮𝘁𝗲𝗱 !</u></i>\n
|
||
<b>📂 Fɪʟᴇ ɴᴀᴍᴇ :</b> <i>{}</i>\n
|
||
<b>📦 Fɪʟᴇ ꜱɪᴢᴇ :</b> <i>{}</i>\n
|
||
<b>📥 Dᴏᴡɴʟᴏᴀᴅ :</b> <i>{}</i>\n
|
||
<b>🚸 Nᴏᴛᴇ : Lɪɴᴋ ᴇxᴘɪʀᴇᴅ ɪɴ 24 ʜᴏᴜʀꜱ</b>\n
|
||
<i>🍃 Bᴏᴛ Mᴀɪɴᴛᴀɪɴᴇᴅ Bʏ :</i> <b>@AvishkarPatil</b>
|
||
"""
|
||
|
||
await m.reply_text(
|
||
text=msg_text.format(file_name, file_size, stream_link),
|
||
parse_mode=ParseMode.HTML,
|
||
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Dᴏᴡɴʟᴏᴀᴅ ɴᴏᴡ 📥", url=stream_link)]])
|
||
)
|
||
|
||
|
||
|
||
@StreamBot.on_message(filters.private & filters.command(["about"]))
|
||
async def start(bot, update):
|
||
await update.reply_text(
|
||
text=ABOUT_TEXT.format(update.from_user.mention),
|
||
disable_web_page_preview=True,
|
||
reply_markup=ABOUT_BUTTONS
|
||
)
|
||
|
||
|
||
@StreamBot.on_message(filters.command('help') & filters.private)
|
||
async def help_handler(bot, message):
|
||
if not await db.is_user_exist(message.from_user.id):
|
||
await db.add_user(message.from_user.id)
|
||
await bot.send_message(
|
||
Var.BIN_CHANNEL,
|
||
f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ **\n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{message.from_user.first_name}](tg://user?id={message.from_user.id}) __Started Your Bot !!__"
|
||
)
|
||
if Var.UPDATES_CHANNEL is not None:
|
||
try:
|
||
user = await bot.get_chat_member(Var.UPDATES_CHANNEL, message.chat.id)
|
||
if user.status == "kicked":
|
||
await bot.send_message(
|
||
chat_id=message.chat.id,
|
||
text="<i>Sᴏʀʀʏ Sɪʀ, Yᴏᴜ ᴀʀᴇ Bᴀɴɴᴇᴅ ᴛᴏ ᴜsᴇ ᴍᴇ. Cᴏɴᴛᴀᴄᴛ ᴛʜᴇ Dᴇᴠᴇʟᴏᴘᴇʀ</i>",
|
||
parse_mode=ParseMode.HTML,
|
||
disable_web_page_preview=True
|
||
)
|
||
return
|
||
except UserNotParticipant:
|
||
await bot.send_message(
|
||
chat_id=message.chat.id,
|
||
text="**Pʟᴇᴀsᴇ Jᴏɪɴ Mʏ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴛʜɪs Bᴏᴛ!**\n\n__Dᴜᴇ ᴛᴏ Oᴠᴇʀʟᴏᴀᴅ, Oɴʟʏ Cʜᴀɴɴᴇʟ Sᴜʙsᴄʀɪʙᴇʀs ᴄᴀɴ ᴜsᴇ ᴛʜᴇ Bᴏᴛ!__",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[
|
||
InlineKeyboardButton("🤖 Jᴏɪɴ Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
|
||
]]
|
||
),
|
||
parse_mode=ParseMode.MARKDOWN
|
||
)
|
||
return
|
||
except Exception:
|
||
await bot.send_message(
|
||
chat_id=message.chat.id,
|
||
text="__Sᴏᴍᴇᴛʜɪɴɢ ᴡᴇɴᴛ Wʀᴏɴɢ. Cᴏɴᴛᴀᴄᴛ ᴍᴇ__ [Aᴠɪsʜᴋᴀʀ Pᴀᴛɪʟ](https://t.me/Avishkarpatil).",
|
||
parse_mode=ParseMode.MARKDOWN,
|
||
disable_web_page_preview=True)
|
||
return
|
||
await message.reply_text(
|
||
text=HELP_TEXT,
|
||
parse_mode=ParseMode.HTML,
|
||
disable_web_page_preview=True,
|
||
reply_markup=HELP_BUTTONS
|
||
)
|
||
|