Add files via upload

This commit is contained in:
suhail-c
2024-04-26 17:19:23 +05:30
committed by GitHub
parent 489a6ac802
commit be7edb6aeb
12 changed files with 1532 additions and 0 deletions

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,249 @@
import re
import json
import m3u8
import httpx
from TelegramBot.logging import LOGGER
from TelegramBot import httpx_client
async def get_token():
response = await httpx_client.get("https://music.apple.com/us/album/positions-deluxe-edition/1553944254")
jspath = re.search("crossorigin src=\"(/assets/index.+?\.js)\"", response.text).group(1)
response = await httpx_client.get("https://music.apple.com"+jspath)
tkn = re.search(r"(eyJhbGc.+?)\"", response.text).group(1)
return tkn
async def get_applem_info(link):
apple_rx= re.compile(r"apple\.com\/(\w\w)\/album\/.+\/(\d+|pl\..+)(?:\?i=(\d+))*")
match= apple_rx.search(link)
if not match.group(3):
region, id_, __ = match.groups()
tkn = await get_token()
headers = {'origin': 'https://music.apple.com','Authorization': f'Bearer {tkn}',}
params = {'extend': 'extendedAssetUrls',}
response = await httpx_client.get(f'https://amp-api.music.apple.com/v1/catalog/{region}/albums/{id_}/', params=params, headers=headers)
album= response.json()["data"][0]
id=album["id"]
link=album["attributes"]["url"]
name=album["attributes"]["name"]
artists=album["attributes"]["artistName"]
release_date=album["attributes"]["releaseDate"]
genere=album["attributes"]["genreNames"]
genere=', '.join(genere)
total_tracks=album["attributes"]["trackCount"]
upc=album["attributes"]["upc"]
rating=album["attributes"].get("contentRating", "Clean")
label=album["attributes"]["recordLabel"]
copyright=album["attributes"]["copyright"]
isSingle=album["attributes"]["isSingle"]
isCompilation=album["attributes"]["isCompilation"]
isPrerelease=album["attributes"]["isPrerelease"]
isComplete=album["attributes"]["isComplete"]
audiotraits=album["attributes"]["audioTraits"]
audiotraits=''.join(audiotraits)
isMasteredForItunes=album["attributes"]["isMasteredForItunes"]
try:
editorialNotes=album["attributes"]["editorialNotes"]["short"]
editorialNotes+= '<br><br>' + album["attributes"]["editorialNotes"]["standard"]
except:
editorialNotes="Null"
cover=album["attributes"]["artwork"]["url"].format(w=album["attributes"]["artwork"]["width"], h=album["attributes"]["artwork"]["height"])
title= f"{artists} - {name}"
tracks=album["relationships"]["tracks"]["data"]
text=""
for track in tracks:
track_id=track["id"]
track_url=track["attributes"]["url"]
track_name=track["attributes"]["name"]
track_artists=track["attributes"]["artistName"]
track_duration=track["attributes"]["durationInMillis"]
track_composer=track["attributes"].get("composerName", "")
track_genere=track["attributes"]["genreNames"]
track_genere=', '.join(track_genere)
track_rls_date=track["attributes"].get("releaseDate", "")
track_disc_no=track["attributes"].get("discNumber", "")
track_track_no=track["attributes"]["trackNumber"]
track_rating=track["attributes"].get("contentRating", "Clean")
track_locale=track["attributes"].get("audioLocale", "")
track_isrc=track["attributes"]["isrc"]
track_hasLyrics=track["attributes"].get("hasLyrics", "False")
track_hasTimeSyncedLyrics=track["attributes"].get("hasTimeSyncedLyrics", "False")
track_hasCredits=track["attributes"].get("hasCredits", "False")
track_isVocalAttenuationAllowed=track["attributes"].get("isVocalAttenuationAllowed", "False")
track_isAppleDigitalMaster=track["attributes"].get("isAppleDigitalMaster", "False")
track_isMasteredForItunes=track["attributes"].get("isMasteredForItunes", "False")
try:
track_audiitraits=track["attributes"]["audioTraits"]
track_audiitraits=''.join(track_audiitraits)
except: track_audiitraits=""
try: track_preview=track["attributes"]["previews"][0]["url"]
except: track_preview=""
try: track_cover=track["attributes"]["artwork"]["url"].format(w=track["attributes"]["artwork"]["width"], h=track["attributes"]["artwork"]["height"])
except: track_cover=""
try:
hls=track["attributes"]["extendedAssetUrls"]["enhancedHls"]
playlist = m3u8.parse(m3u8.load(hls).dumps())
streams=""
for stream in playlist['playlists']:
codec= stream['stream_info']['codecs']
audio= stream['stream_info']['audio']
streams+=f"{codec} | {audio}<br>"
except: streams=""
text+=f"""<strong>{track_track_no}. {track_name}</strong>
<pre>ID : {track_id}
URL : <a href="{track_url}">{track_url}</a>
Name : {track_name}
Artist : {track_artists}
Composer : {track_composer}
Duration : {track_duration} ms
Genre : {track_genere}
Release Date : {track_rls_date}
Disc No : {track_disc_no}
Track No : {track_track_no}
Content Rating : {track_rating}
Locale : {track_locale}
ISRC : {track_isrc}
Quality : {track_audiitraits}
Cover : <a href="{track_cover}">{track_cover}</a>
Preview : <a href="{track_preview}">{track_preview}</a>
Has Credits : {track_hasCredits}
Has Lyrics : {track_hasLyrics}
Has Time synced Lyrics : {track_hasTimeSyncedLyrics}
Is Apple Digital Master : {track_isAppleDigitalMaster}
Is Mastered For Itunes : {track_isMasteredForItunes}
Is Vocal Attenuation Allowed : {track_isVocalAttenuationAllowed}
<u>Available Streams</u>
{streams}</pre>
"""
message=f"""<figure><img src="{cover}"></figure>
<h4>Album</h4>
<pre>ID : {id}
URL : <a href="{link}">{link}</a>
Name : {name}
Artist : {artists}
Genre : {genere}
Release Date : {release_date}
Total Tracks : {total_tracks}
UPC : {upc}
Content Rating : {rating}
Label : {label}
Copyright : {copyright}
Quality : {audiotraits}
Cover : <a href="{cover}">{cover}</a>
Is Single : {isSingle}
Is Compilation : {isCompilation}
Is Pre-release : {isPrerelease}
Is Complete : {isComplete}
is Mastered For Itunes : {isMasteredForItunes}
</pre>
<h4>Tracks</h4>
{text}
<h4>Editorial Notes</h4>
{editorialNotes}
"""
return message, title
elif match.group(3):
region, __, id_ = match.groups()
tkn = await get_token()
headers = {'origin': 'https://music.apple.com','Authorization': f'Bearer {tkn}',}
params = {'extend': 'extendedAssetUrls',}
response = await httpx_client.get(f'https://amp-api.music.apple.com/v1/catalog/{region}/songs/{id_}/', params=params, headers=headers)
track= response.json()["data"][0]
track_id=track["id"]
track_url=track["attributes"]["url"]
track_name=track["attributes"]["name"]
track_artists=track["attributes"]["artistName"]
track_album= track["attributes"]["albumName"]
track_duration=track["attributes"]["durationInMillis"]
track_composer=track["attributes"].get("composerName", "")
track_genere=track["attributes"]["genreNames"]
track_genere=', '.join(track_genere)
track_rls_date=track["attributes"].get("releaseDate", "")
track_disc_no=track["attributes"].get("discNumber", "")
track_track_no=track["attributes"]["trackNumber"]
track_rating=track["attributes"].get("contentRating", "Clean")
track_locale=track["attributes"].get("audioLocale", "")
track_isrc=track["attributes"]["isrc"]
track_hasLyrics=track["attributes"].get("hasLyrics", "False")
track_hasTimeSyncedLyrics=track["attributes"].get("hasTimeSyncedLyrics", "False")
track_hasCredits=track["attributes"].get("hasCredits", "False")
track_isVocalAttenuationAllowed=track["attributes"].get("isVocalAttenuationAllowed", "False")
track_isAppleDigitalMaster=track["attributes"].get("isAppleDigitalMaster", "False")
track_isMasteredForItunes=track["attributes"].get("isMasteredForItunes", "False")
try:
track_audiitraits=track["attributes"]["audioTraits"]
track_audiitraits=''.join(track_audiitraits)
except: track_audiitraits=""
try: track_preview=track["attributes"]["previews"][0]["url"]
except: track_preview=""
try: track_cover=track["attributes"]["artwork"]["url"].format(w=track["attributes"]["artwork"]["width"], h=track["attributes"]["artwork"]["height"])
except: track_cover=""
title= f"{track_artists} - {track_name}"
try:
hls=track["attributes"]["extendedAssetUrls"]["enhancedHls"]
playlist = m3u8.parse(m3u8.load(hls).dumps())
streams=""
for stream in playlist['playlists']:
codec= stream['stream_info']['codecs']
audio= stream['stream_info']['audio']
streams+=f"{codec} | {audio}<br>"
except: streams=""
message=f"""<figure><img src="{track_cover}"></figure>
<h4>Track</h4>
<pre>ID : {track_id}
URL : <a href="{track_url}">{track_url}</a>
Name : {track_name}
Album : {track_album}
Artist : {track_artists}
Composer : {track_composer}
Duration : {track_duration} ms
Genre : {track_genere}
Release Date : {track_rls_date}
Disc No : {track_disc_no}
Track No : {track_track_no}
Content Rating : {track_rating}
Locale : {track_locale}
ISRC : {track_isrc}
Quality : {track_audiitraits}
Cover : <a href="{track_cover}">{track_cover}</a>
Preview : <a href="{track_preview}">{track_preview}</a>
Has Credits : {track_hasCredits}
Has Lyrics : {track_hasLyrics}
Has Time synced Lyrics : {track_hasTimeSyncedLyrics}
Is Apple Digital Master : {track_isAppleDigitalMaster}
Is Mastered For Itunes : {track_isMasteredForItunes}
Is Vocal Attenuation Allowed : {track_isVocalAttenuationAllowed}
<u>Available Streams</u>
{streams}</pre>
"""
return message, title

View File

@@ -0,0 +1,80 @@
from pyrogram.types import (InlineKeyboardButton, KeyboardButton, WebAppInfo)
START_CAPTION = """
Hi there, I'm an unofficial Odesli Bot! With support for all major streaming platforms, simply share a link to your favorite tune and I'll provide you with links to that same song on all your other preferred platforms.
Powered by odesli.co & songwhip.com
"""
HELP_CAPTION = """
🔗 **--Find Music Links--**
You can send a track or album link directly to the bot. Alternatively, reply to a message containing a link with the /odesli command. or send /odesli command followed by the link.
🔍 **--Inline Search Mode--**
In any chat, type "@tgodeslibot <link>" to retrieve music links using inline mode.
To search for an album by query and get music links, use "@tgodeslibot .sa <album_name>" (replace <album_name> with the actual album name).
For track searches, employ the command "@tgodeslibot .st <track_name>" (replace <track_name> with the actual track name).
**--Get track/album Info--**
Utilize the /info command with a link to retrieve comprehensive information about a track or album. Presently, this supports Spotify, Apple Music and JioSaavn links.
"""
URL_ERROR = "__There's a problem with that URL. Either we don't support this music service or the URL is malformed. Try one from a different music service.__"
RATE_TXT = "__If you like this bot, you can rate me [HERE](https://t.me/botsarchive/2726) ❤__"
INFO = "**Odesli Bot** \n\nAutomated, on-demand smart links for songs, albums, podcasts and more. For artists, for fans, for free."
INFO_M = "**Find Music Links** \n\n**Usage:** `@tgodeslibot [Type a track name, or paste any music link]`"
INFO_ST = "**Find Music Links - Track Search** \n\n**Usage:** `@tgodeslibot .st [Type a track name]`"
INFO_SA = "**Find Music Links - Album Search** \n\n**Usage:** `@tgodeslibot .sa [Type an album name]`"
START_BUTTON = [
[
InlineKeyboardButton('Help', callback_data='HELP_BUTTON')
],
[
InlineKeyboardButton('Add me to Group', url='https://t.me/tgodeslibot?startgroup=true')
],
[
InlineKeyboardButton('Inline Search', switch_inline_query_current_chat=""),
InlineKeyboardButton('Give Feedback', url='https://t.me/dmviabot')
]
]
START_BUTTON_G = [
[
InlineKeyboardButton('Help', url='https://t.me/tgodeslibot?start=help')
],
[
InlineKeyboardButton('Inline Search', switch_inline_query_current_chat=""),
InlineKeyboardButton('Give Feedback', url='https://t.me/dmviabot')
]
]
SEARCH_BUTTON = [
[
InlineKeyboardButton('Track Search', switch_inline_query_current_chat=".st ")
],
[
InlineKeyboardButton('Album Search', switch_inline_query_current_chat=".sa ")
]
]
GOBACK_BUTTON = [[InlineKeyboardButton("🔙 Go Back", callback_data="START_BUTTON")]]
RBUTTON = [
[
KeyboardButton('Odesli', web_app=WebAppInfo(url="https://odesli.co")),
KeyboardButton('Songwhip', web_app=WebAppInfo(url="https://songwhip.com/create"))
]
]

View File

@@ -0,0 +1,96 @@
import asyncio
from functools import wraps
from cachetools import TTLCache
from typing import Callable, Union
from pyrogram import Client
from pyrogram.types import CallbackQuery, Message
from TelegramBot import loop
from TelegramBot.helpers.functions import isAdmin
from TelegramBot.helpers.ratelimiter import RateLimiter
ratelimit = RateLimiter()
# storing spammy user in cache for 1minute before allowing them to use commands again.
warned_users = TTLCache(maxsize=128, ttl=60)
warning_message = "Spam detected! ignoring your all requests for few minutes."
def ratelimiter(func: Callable) -> Callable:
"""
Restricts user's from spamming commands or pressing buttons multiple times
using leaky bucket algorithm and pyrate_limiter.
"""
@wraps(func)
async def decorator(client: Client, update: Union[Message, CallbackQuery]):
userid = update.from_user.id
is_limited = await ratelimit.acquire(userid)
if is_limited and userid not in warned_users:
if isinstance(update, Message):
await update.reply_text(warning_message)
warned_users[userid] = 1
return
elif isinstance(update, CallbackQuery):
await update.answer(warning_message, show_alert=True)
warned_users[userid] = 1
return
elif is_limited and userid in warned_users: pass
else: return await func(client, update)
return decorator
def admin_commands(func: Callable) -> Callable:
"""
Restricts user's from using group admin commands.
"""
@wraps(func)
async def decorator(client: Client, message: Message):
if await isAdmin(message):
return await func(client, message)
return decorator
def errors(func: Callable) -> Callable:
"""
Try and catch error of any function.
"""
@wraps(func)
async def decorator(client, message, *args,**kwargs):
try:
return await func(client, message, *args, **kwargs)
except Exception as error:
await message.reply(f"{type(error).__name__}: {error}")
return decorator
# ====================================================================================
# SOME MORE USEFUL DECORATORS
def run_sync_in_thread(func: Callable) -> Callable:
"""
A decorator for running a synchronous long running function asynchronously in a separate thread,
without blocking the main event loop which make bot unresponsive.
To use this decorator, apply it to any synchronous function, then you can then call that function to anywhere
in your program and can use it along with await keyword. This will allow the function to be run asynchronously,
and avoid blocking of the main event loop.
"""
@wraps(func)
async def wrapper(*args, **kwargs):
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper

View File

@@ -0,0 +1,20 @@
"""
Creating custom filters
https://docs.pyrogram.org/topics/create-filters
"""
from pyrogram import filters
from pyrogram.types import Message
from TelegramBot.config import SUDO_USERID, OWNER_USERID
def dev_users(_, __, message: Message) -> bool:
return message.from_user.id in OWNER_USERID if message.from_user else False
def sudo_users(_, __, message: Message) -> bool:
return message.from_user.id in SUDO_USERID if message.from_user else False
dev_cmd = filters.create(dev_users)
sudo_cmd = filters.create(sudo_users)

View File

@@ -0,0 +1,83 @@
import httpx
from tenacity import retry, stop_after_attempt, wait_fixed
from pyrogram.enums import ChatMemberStatus, ChatType
from pyrogram.types import Message
from TelegramBot.config import SUDO_USERID
from typing import Union
async def isAdmin(message: Message) -> bool:
"""
Return True if the message is from owner or admin of the group or sudo of the bot.
"""
if not message.from_user:
return
if message.chat.type not in [ChatType.SUPERGROUP, ChatType.CHANNEL]:
return
user_id = message.from_user.id
if user_id in SUDO_USERID:
return True
check_status = await message.chat.get_member(user_id)
return check_status.status in [ChatMemberStatus.OWNER,ChatMemberStatus.ADMINISTRATOR]
def get_readable_time(seconds: int) -> str:
"""
Return a human-readable time format
"""
result = ""
(days, remainder) = divmod(seconds, 86400)
days = int(days)
if days != 0:
result += f"{days}d "
(hours, remainder) = divmod(remainder, 3600)
hours = int(hours)
if hours != 0:
result += f"{hours}h "
(minutes, seconds) = divmod(remainder, 60)
minutes = int(minutes)
if minutes != 0:
result += f"{minutes}m "
seconds = int(seconds)
result += f"{seconds}s "
return result
def get_readable_bytes(size: str) -> str:
"""
Return a human readable file size from bytes.
"""
dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
if not size:
return ""
power = 2**10
raised_to_pow = 0
while size > power:
size /= power
raised_to_pow += 1
return f"{str(round(size, 2))} {dict_power_n[raised_to_pow]}B"
@retry(
stop=stop_after_attempt(2), # Retry 3 times at most
wait=wait_fixed(1), # Wait 1 second between retries
)
async def perform_request(url, params):
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=20)
response.raise_for_status() # Raise an exception for non-2xx status codes
return response

View File

@@ -0,0 +1,187 @@
import httpx, json
from TelegramBot import httpx_client
async def jiosaavn(link):
null = None
if "/album/" in link:
params = {'link': f"{link}"}
response = await httpx_client.get('https://jiosaavn-api-gray.vercel.app/albums', params=params, timeout=60)
if response.status_code == httpx.codes.OK:
result = response.json()
album = result.get("data")
album_id= album["id"]
album_link= album["url"]
album_name= album["name"]
album_year= album["year"]
album_releasedate= album["releaseDate"]
tracks_count= album["songCount"]
album_artists= album["artists"]
album_partists= album["primaryArtists"]
album_fartists= album["featuredArtists"]
title= f'{album_artists} - {album_name}'
images= album.get("image")
for image in images:
if image['quality'] == '50x50': small= image['link']
if image['quality'] == '150x150': medium= image['link']
if image['quality'] == '500x500': large= image['link']
album_tracks=album.get("songs")
count = 0
text= ""
for track in album_tracks:
count += 1
track_id= track["id"]
track_link= track["url"]
track_name= track["name"]
track_duration= track["duration"]
track_lang= track["language"]
has_explicit= track["explicitContent"]
has_lyrics= track["hasLyrics"]
track_playcount= track["playCount"]
track_label= track["label"]
copyright= track["copyright"]
track_partists= track["primaryArtists"]
track_fartists= track["featuredArtists"]
down_links= track["downloadUrl"]
for down_link in down_links:
if down_link['quality'] == '12kbps': sortahighish= down_link['link']
if down_link['quality'] == '48kbps': lesshigh= down_link['link']
if down_link['quality'] == '96kbps': prettyhigh= down_link['link']
if down_link['quality'] == '160kbps': reallyhigh= down_link['link']
if down_link['quality'] == '320kbps': superduperhigh= down_link['link']
text += f"""
<strong>{count}. {track_name}</strong>
<pre>ID : {track_id}
URL : <a href="{track_link}">{track_link}</a>
Name : {track_name}
Duration : {track_duration} s
Language : {track_lang}
Explicit : {has_explicit}
Lyrics : {has_lyrics}
Play Count : {track_playcount}
Label : {track_label}
Copyright : {copyright}
Primary Artists : {track_partists}
Featured Artists : {track_fartists}
<u>Streaming URL</u>
12 kbps : <a href="{sortahighish}"<a>{sortahighish}</a>
48 kbps : <a href="{lesshigh}"<a>{lesshigh}</a>
96 kbps : <a href="{prettyhigh}"<a>{prettyhigh}</a>
160 kbps : <a href="{reallyhigh}"<a>{reallyhigh}</a>
320 kbps : <a href="{superduperhigh}"<a>{superduperhigh}</a>
</pre>
"""
message=f"""<figure><img src="{large}"></figure>
<h4>Album</h4>
<pre>ID : {album_id}
URL : <a href="{album_link}"<a>{album_link}</a>
Name : {album_name}
Artists : {album_artists}
Primary Artists : {album_partists}
Featured Artists : {album_fartists}
Year : {album_year}
Release Date : {album_releasedate}
Tracks Count : {tracks_count}
</pre>
<h4>Cover</h4>
<pre> 50 x 50 : <a href="{small}"<a>{small}</a>
150 x 150 : <a href="{medium}"<a>{medium}</a>
500 x 500 : <a href="{large}"<a>{large}</a>
</pre>
<h4>Tracks</h4>
{text}
"""
return message, title
elif "/song/" in link:
params = {'link': f"{link}"}
response = await httpx_client.get('https://jiosaavn-api-gray.vercel.app/songs', params=params, timeout=60)
if response.status_code == httpx.codes.OK:
result = response.json()
track = result.get("data")[0]
track_id= track["id"]
track_link= track["url"]
track_name= track["name"]
track_year= track["year"]
track_releasedate= track["releaseDate"]
track_duration= track["duration"]
track_lang= track["language"]
has_explicit= track["explicitContent"]
has_lyrics= track["hasLyrics"]
track_playcount= track["playCount"]
track_label= track["label"]
copyright= track["copyright"]
track_partists= track["primaryArtists"]
track_fartists= track["featuredArtists"]
title= f'{track_partists} - {track_name}'
track_album_name= track["album"]["name"]
track_album_id = track["album"]["id"]
track_album_link= track["album"]["url"]
images= track.get("image")
for image in images:
if image['quality'] == '50x50': small= image['link']
if image['quality'] == '150x150': medium= image['link']
if image['quality'] == '500x500': large= image['link']
down_links= track.get("downloadUrl")
for down_link in down_links:
if down_link['quality'] == '12kbps': sortahighish= down_link['link']
if down_link['quality'] == '48kbps': lesshigh= down_link['link']
if down_link['quality'] == '96kbps': prettyhigh= down_link['link']
if down_link['quality'] == '160kbps': reallyhigh= down_link['link']
if down_link['quality'] == '320kbps': superduperhigh= down_link['link']
message= f"""<figure><img src="{large}"></figure>
<h4>Track</h4>
<pre>ID : {track_id}
URL : <a href="{track_link}">{track_link}</a>
Name : {track_name}
Primary Artists : {track_partists}
Featured Artists : {track_fartists}
Year : {track_year}
Release Date : {track_releasedate}
Duration : {track_duration} s
Language : {track_lang}
Explicit : {has_explicit}
Lyrics : {has_lyrics}
Play Count : {track_playcount}
Label : {track_label}
Copyright : {copyright}
</pre>
<h4>From Album</h4>
<pre>Name : {track_album_name}
ID : {track_album_id}
URL : <a href="{track_album_link}"<a>{track_album_link}</a>
</pre>
<h4>Cover</h4>
<pre> 50 x 50 : <a href="{small}"<a>{small}</a>
150 x 150 : <a href="{medium}"<a>{medium}</a>
500 x 500 : <a href="{large}"<a>{large}</a>
</pre>
<h4>Streaming URL</h4>
<pre>12 kbps : <a href="{sortahighish}"<a>{sortahighish}</a>
48 kbps : <a href="{lesshigh}"<a>{lesshigh}</a>
96 kbps : <a href="{prettyhigh}"<a>{prettyhigh}</a>
160 kbps : <a href="{reallyhigh}"<a>{reallyhigh}</a>
320 kbps : <a href="{superduperhigh}"<a>{superduperhigh}</a>
</pre>
"""
return message, title

View File

@@ -0,0 +1,52 @@
from httpx import AsyncClient
from bs4 import BeautifulSoup
from telegraph.aio import Telegraph
async def katbin_paste(text: str) -> str:
"""
paste the text in katb.in website.
"""
katbin_url = "https://katb.in"
client = AsyncClient()
response = await client.get(katbin_url)
soup = BeautifulSoup(response.content, "html.parser")
csrf_token = soup.find("input", {"name": "_csrf_token"}).get("value")
try:
paste_post = await client.post(
katbin_url,
data={"_csrf_token": csrf_token, "paste[content]": text},
follow_redirects=False)
output_url = f"{katbin_url}{paste_post.headers['location']}"
await client.aclose()
return output_url
except:
return "something went wrong while pasting text in katb.in."
async def telegraph_paste(content: str, title="TelegramBot") -> str:
"""
paste the text in telegra.ph (graph.org) website (text should follow proper html tags).
"""
telegraph = Telegraph(domain="graph.org")
await telegraph.create_account(short_name="odesli")
html_content = content.replace("\n", "<br>")
response = await telegraph.create_page(title=title, html_content=html_content, author_name="Odesli", author_url="https://t.me/tgodeslibot")
response = response["url"]
return response
async def telegraph_image_paste(filepath: str) -> str:
"""
paste the image in telegra.ph (graph.org) website.
"""
telegraph = Telegraph(domain="graph.org")
try:
image_url = await telegraph.upload_file(filepath)
return "https://graph.org/" + image_url[0]["src"]
except Exception as error:
return "something went wrong while posting image."

View File

@@ -0,0 +1,45 @@
from typing import Union
from pyrate_limiter import (BucketFullException, Duration, Limiter,
MemoryListBucket, RequestRate)
class RateLimiter:
"""
Implement rate limit logic using leaky bucket
algorithm, via pyrate_limiter.
(https://pypi.org/project/pyrate-limiter/)
"""
def __init__(self) -> None:
# 2 requests per seconds
self.second_rate = RequestRate(2, Duration.SECOND)
# 17 requests per minute.
self.minute_rate = RequestRate(17, Duration.MINUTE)
# 1000 requests per hour
self.hourly_rate = RequestRate(1000, Duration.HOUR)
# 10000 requests per day
self.daily_rate = RequestRate(10000, Duration.DAY)
self.limiter = Limiter(
self.minute_rate,
self.hourly_rate,
self.daily_rate,
bucket_class=MemoryListBucket,
)
async def acquire(self, userid: Union[int, str]) -> bool:
"""
Acquire rate limit per userid and return True / False
based on userid ratelimit status.
"""
try:
self.limiter.try_acquire(userid)
return False
except BucketFullException:
return True

View File

@@ -0,0 +1,23 @@
import re
from shazamio import Shazam, Serialize
# fetch apple music link from shazam link
async def slink_to_alink(slink):
shazam = Shazam()
match = re.search(r"track\/([0-9]+)", slink)
if match:
track_id = match.group(1)
about_track = await shazam.track_about(track_id=track_id)
serialized = Serialize.track(data=about_track)
return serialized.apple_music_url
else: return None
# search shazam for tracks by providing query
async def shazam_search(search_query):
try:
shazam = Shazam()
result = await shazam.search_track(query=search_query, limit=50)
hits = result.get("tracks").get("hits")
return hits
except Exception as e: raise e

View File

@@ -0,0 +1,502 @@
import json
import httpx
import uuid
import asyncio
from pyrogram import Client, enums
from pyrogram.types import (Message, CallbackQuery, InlineQueryResultArticle, InlineQueryResultPhoto, InputTextMessageContent,
InlineKeyboardMarkup, InlineKeyboardButton, InlineQuery)
from TelegramBot import bot, odesli
from TelegramBot.logging import LOGGER
from TelegramBot.database.sqdb import *
from TelegramBot.helpers.constants import *
from TelegramBot.helpers.shazam import *
from TelegramBot.helpers.spotify import *
from TelegramBot.helpers.odesli.entity.song.SongResult import SongResult
from TelegramBot.helpers.odesli.entity.album.AlbumResult import AlbumResult
async def fetch_links(client: Client, message: Message, song_link):
reply = await message.reply_text("__Fetching links, please wait... (This may take a minute)__", quote=True)
await bot.send_chat_action(message.chat.id, enums.ChatAction.TYPING)
try: result = await odesli.getByUrl(song_link)
except Exception as e:
LOGGER(__name__).error(e)
try: r = await songwhip_full(song_link)
except:
if message.chat.type == enums.ChatType.PRIVATE:
await reply.edit_text(URL_ERROR)
await bot.send_chat_action(message.chat.id, enums.ChatAction.CANCEL)
return
else:
await reply.delete()
await bot.send_chat_action(message.chat.id, enums.ChatAction.CANCEL)
return
else:
await reply.edit_text(r, disable_web_page_preview=True)
if message.chat.type == enums.ChatType.PRIVATE:
await message.reply_text(RATE_TXT, disable_web_page_preview=True)
return
if isinstance(result, SongResult):
entity = result.song
providers = result.songsByProvider
try:
spotify= providers['spotify'].linksByPlatform['spotify']
preview= await get_spotify_preview(spotify)
except: preview= None
r = '**' + entity.title + "** by **" + entity.artistName + '** \n\n[Odesli](' + result.songLink + ')'
if preview: r = f"[\u2061]({preview})" + r
elif isinstance(result, AlbumResult):
preview= None
entity = result.album
providers = result.albumsByProvider
r = '**' + entity.title + "** by **" + entity.artistName + '** \n\n[Odesli](' + result.albumLink + ')'
else:
await reply.edit_text(URL_ERROR)
return
op = []
for provider in providers:
if provider == 'youtube':
r += (" | [YouTube](" + providers['youtube'].linksByPlatform['youtube'] + ') | [YT Music](' +
providers['youtube'].linksByPlatform['youtubeMusic'] + ')')
op.append("youtube")
continue
elif provider == 'itunes':
r += (" | [Apple Music](" + providers['itunes'].linksByPlatform['appleMusic'] + ')')
op.append("itunes")
continue
elif provider == 'amazon':
r += (' | [Amazon Music](' +
providers['amazon'].linksByPlatform['amazonMusic'] + ')')
op.append("amazon")
continue
else:
r += ' | [' + (provider.title() + '](' + providers[provider].linksByPlatform[provider] + ')')
op.append(provider)
t = r
t += "\n\n__Checking songwhip.com for additional links, please wait...__"
reply = await reply.edit_text(t, disable_web_page_preview=True)
await bot.send_chat_action(message.chat.id, enums.ChatAction.CANCEL)
try: r = await songwhip(song_link, r, op)
finally:
if preview: await reply.edit_text(f"{r}\n\n<pre>♪ Preview</pre>")
else: await reply.edit_text(r, disable_web_page_preview=True)
#if message.chat.type == enums.ChatType.PRIVATE:
#await message.reply_text(RATE_TXT, disable_web_page_preview=True)
async def songwhip(song_link, r, op):
async with httpx.AsyncClient() as client:
data = {'country': 'IN', 'url': song_link}
response = await client.post('https://songwhip.com/api/songwhip/create', json=data, timeout=60)
response.raise_for_status()
if response.status_code == httpx.codes.OK:
try:
x = response.json()
tmp = "https://songwhip.com/" + x.get("data").get("item").get("url")
r += f" | [Songwhip]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("qobuz")[0].get("link")
r += f" | [Qobuz]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("jioSaavn")[0].get("link")
r += f" | [JioSaavn]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("gaana")[0].get("link")
r += f" | [Gaana]({tmp})"
except: pass
if 'spotify' not in op:
try:
tmp = x.get("data").get("item").get("links").get("spotify")[0].get("link")
r += f" | [Spotify]({tmp})"
except: pass
if 'deezer' not in op:
try:
tmp = x.get("data").get("item").get("links").get("deezer")[0].get("link")
r += f" | [Deezer]({tmp})"
except: pass
if 'tidal' not in op:
try:
tmp = x.get("data").get("item").get("links").get("tidal")[0].get("link")
r += f" | [Tidal]({tmp})"
except: pass
if 'amazon' not in op:
try:
tmp = x.get("data").get("item").get("links").get("amazonMusic")[0].get("link")
r += f" | [Amazon Music]({tmp})"
except: pass
if 'itunes' not in op:
try:
tmp = x.get("data").get("item").get("links").get("itunes")[0].get("link")
tmp = tmp.replace("{country}", "gb", 1)
r += f" | [Apple Music]({tmp})"
except: pass
if 'napster' not in op:
try:
tmp = x.get("data").get("item").get("links").get("napster")[0].get("link")
r += f" | [Napster]({tmp})"
except: pass
if 'linemusic' not in op:
try:
tmp = x.get("data").get("item").get("links").get("lineMusic")[0].get("link")
r += f" | [Line Music]({tmp})"
except: pass
if 'youtube' not in op:
try:
tmp = x.get("data").get("item").get("links").get("youtube")[0].get("link")
tmp1 = x.get("data").get("item").get("links").get("youtubeMusic")[0].get("link")
r += f" | [Youtube]({tmp}) | [Youtube Music]({tmp1})"
except: pass
return r
else: return r
async def get_inline_result_link(client: Client, inline_query: InlineQuery, song_link):
try: result = await odesli.getByUrl(song_link)
except Exception as e:
LOGGER(__name__).error(e)
await inline_query.answer(
switch_pm_text="Odesli - On demand smart links",
switch_pm_parameter="help",
results=[
InlineQueryResultArticle(
title="Error",
description="There's a problem with that URL. Try one from a different music service.",
thumb_url="https://telegra.ph/file/bd6b2365a405a8b73b033.png",
thumb_height=512,
thumb_width=512,
input_message_content=InputTextMessageContent(INFO),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Open Bot", url="https://t.me/tgodeslibot")
]
]
)
)
]
)
return
if isinstance(result, SongResult):
entity = result.song
title = entity.title
artist = entity.artistName
thumb = entity.thumbnailUrl
elif isinstance(result, AlbumResult):
entity = result.album
title = entity.title
artist = entity.artistName
thumb = entity.thumbnailUrl
r = '**' + title + '** by **' + artist + '**'
await inline_query.answer(
switch_pm_text="Odesli - On demand smart links",
switch_pm_parameter="help",
results=[
InlineQueryResultPhoto(
title=title,
description=artist,
thumb_url=thumb,
photo_url=thumb,
caption=r,
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🌐 Fetching links...", callback_data="LOADING_BUTTON")]
]
)
)
]
)
async def get_inline_result_query(client: Client, inline_query: InlineQuery):
query = inline_query.query.strip()
try: hits = await shazam_search(query)
except Exception as e:
LOGGER(__name__).error(e)
await inline_query.answer(
switch_pm_text="Odesli - On demand smart links",
switch_pm_parameter="help",
results=[
InlineQueryResultArticle(
title="No results found",
description="No results found for your query. Try different query or paste link of desired song",
thumb_url="https://telegra.ph/file/bd6b2365a405a8b73b033.png",
input_message_content=InputTextMessageContent(INFO),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Open Bot", url="https://t.me/tgodeslibot")
]
]
)
)
]
)
return
results=[]
await create_database()
for i in hits:
id = uuid.uuid4()
title = i.get("heading").get("title")
artist = i.get("heading").get("subtitle")
thumb = i.get("images").get("default")
if thumb is None: thumb = "https://graph.org/file/a6145bf65a88feefaf6ac.jpg"
slink = i.get("url")
try: link = i.get("stores").get("apple").get("actions")[0].get("uri")
except: link = slink
caption= '**' + title + '** by **'+ artist+ '**'
await store_values(str(id), link)
results.append(
InlineQueryResultPhoto(
title=title,
description=artist,
thumb_url=thumb,
photo_url=thumb,
caption=caption,
id=id,
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🌐 Fetching links...", callback_data="LOADING_BUTTON")]
]
)
)
)
if results: await inline_query.answer(results, switch_pm_text="Odesli - On demand smart links", switch_pm_parameter="help")
async def get_inline_result_spotify(client: Client, inline_query: InlineQuery, query, type):
try: albums, tracks = await spotify_search(query)
except Exception as e:
LOGGER(__name__).error(e)
await inline_query.answer(
switch_pm_text="Odesli - On demand smart links",
switch_pm_parameter="help",
results=[
InlineQueryResultArticle(
title="No results found",
description="No results found for your query. Try different query or paste link of desired song",
thumb_url="https://telegra.ph/file/bd6b2365a405a8b73b033.png",
input_message_content=InputTextMessageContent(INFO),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Open Bot", url="https://t.me/tgodeslibot")
]
]
)
)
]
)
return
results=[]
await create_database()
if type=='a':
for album in albums:
id = uuid.uuid4()
album_name = album.get("name")
album_artists = album.get("artists")
if len(album_artists) > 1:
artist_names = []
for artist in album_artists:
artist_names.append(artist.get("name"))
album_artists = ", ".join(artist_names)
else:
album_artists = album.get("artists")[0].get("name")
total_tracks = album.get("total_tracks")
album_thumb = album.get("images")[0].get("url")
album_url = album.get("external_urls").get("spotify")
release_date = album.get("release_date")
caption= f"**{album_name}** by **{album_artists}**"
description= f"{str(release_date)}{str(total_tracks)} Songs • {album_artists}"
await store_values(str(id), album_url)
results.append(
InlineQueryResultPhoto(
title=album_name,
description=description,
thumb_url=album_thumb,
photo_url=album_thumb,
caption=caption,
id=id,
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🌐 Fetching links...", callback_data="LOADING_BUTTON")]
]
)
)
)
if results: await inline_query.answer(results, switch_pm_text="Odesli - On demand smart links", switch_pm_parameter="help")
elif type=='t':
for track in tracks:
id = uuid.uuid4()
track_name = track.get("name")
track_artists = track.get("artists")
if len(track_artists) > 1:
artist_names = []
for artist in track_artists:
artist_names.append(artist.get("name"))
track_artists = ", ".join(artist_names)
else:
track_artists = track.get("artists")[0].get("name")
track_album_name = track.get("album").get("name")
track_thumb = track.get("album").get("images")[0].get("url")
track_preview = track.get("preview_url")
track_url = track.get("external_urls").get("spotify")
release_date = track.get("album").get("release_date")
description= f"{track_artists}{track_album_name}{release_date}"
caption= f"**{track_name}** by **{track_artists}**"
await store_values(str(id), track_url)
results.append(
InlineQueryResultPhoto(
title=track_name,
description=description,
thumb_url=track_thumb,
photo_url=track_thumb,
caption=caption,
id=id,
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🌐 Fetching links...", callback_data="LOADING_BUTTON")]
]
)
)
)
if results: await inline_query.answer(results, switch_pm_text="Odesli - On demand smart links", switch_pm_parameter="help")
async def chosen_result_handler(client, query, message_id):
try: result = await odesli.getByUrl(query)
except Exception as e:
LOGGER(__name__).error(e)
try: r = await songwhip_full(query)
except:
await client.edit_inline_text(message_id, URL_ERROR)
return
else:
await client.edit_inline_text(message_id, r)
return
if isinstance(result, SongResult):
entity = result.song
title = entity.title
artist = entity.artistName
thumb = entity.thumbnailUrl
providers = result.songsByProvider
r = '**' + entity.title + "** by **" + entity.artistName + '** \n\n[Odesli](' + result.songLink + ')'
elif isinstance(result, AlbumResult):
entity = result.album
title = entity.title
artist = entity.artistName
thumb = entity.thumbnailUrl
providers = result.albumsByProvider
r = '**' + entity.title + "** by **" + entity.artistName + '** \n\n[Odesli](' + result.albumLink + ')'
op = []
for provider in providers:
if provider == 'youtube':
r += (" | [YouTube](" + providers['youtube'].linksByPlatform['youtube'] + ') | [YT Music](' +
providers['youtube'].linksByPlatform['youtubeMusic'] + ')')
op.append("youtube")
continue
if provider == 'itunes':
r += (" | [Apple Music](" + providers['itunes'].linksByPlatform['appleMusic'] + ')')
op.append("itunes")
continue
if provider == 'amazon':
r += (' | [Amazon Music](' +
providers['amazon'].linksByPlatform['amazonMusic'] + ')')
op.append("amazon")
continue
else:
r += ' | [' + (provider.title() + '](' + providers[provider].linksByPlatform[provider] + ')')
op.append(provider)
await client.edit_inline_text(message_id, r)
try:
r = await songwhip(query, r, op)
await client.edit_inline_text(message_id, r)
except:
return
async def songwhip_full(song_link):
async with httpx.AsyncClient() as client:
data = {'country': 'IN', 'url': song_link}
response = await client.post('https://songwhip.com/api/songwhip/create', json=data, timeout=60)
response.raise_for_status()
if response.status_code == httpx.codes.OK:
try:
x = response.json()
name = x.get("data").get("item").get("name")
artist = x.get("data").get("item").get("artists")[0].get("name")
r = f"**{name}** by **{artist}** \n\n"
except: pass
try:
tmp = "https://songwhip.com/" + x.get("data").get("item").get("url")
r += f"[Songwhip]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("qobuz")[0].get("link")
r += f" | [Qobuz]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("spotify")[0].get("link")
r += f" | [Spotify]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("deezer")[0].get("link")
r += f" | [Deezer]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("tidal")[0].get("link")
r += f" | [Tidal]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("amazonMusic")[0].get("link")
r += f" | [Amazon Music]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("itunes")[0].get("link")
tmp = tmp.replace("{country}", "gb", 1)
r += f" | [Apple Music]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("napster")[0].get("link")
r += f" | [Napster]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("lineMusic")[0].get("link")
r += f" | [Line Music]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("youtube")[0].get("link")
tmp1 = x.get("data").get("item").get("links").get("youtubeMusic")[0].get("link")
r += f" | [Youtube]({tmp}) | [Youtube Music]({tmp1})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("jioSaavn")[0].get("link")
r += f" | [JioSaavn]({tmp})"
except: pass
try:
tmp = x.get("data").get("item").get("links").get("gaana")[0].get("link")
r += f" | [Gaana]({tmp})"
except: pass
return r

View File

@@ -0,0 +1,194 @@
import httpx
import json
import asyncio
import math
import aiofiles
import os
import re
from TelegramBot import spotify, httpx_client
# search spotify for tracks, albums
async def spotify_search(query):
try:
results = spotify.search(q=query, limit=50, offset=0, type="track,album", market="IN")
albums = results.get("albums").get("items")
tracks = results.get("tracks").get("items")
return albums, tracks
except Exception as e: raise e
async def get_spotify_data(url):
if "spotify.link" in url:
async with httpx.AsyncClient() as client:
url = await client.get(url)
url = url.headers['Location']
if "/track/" in url:
result = spotify.track(url)
track = result
track_album= result["album"]
id= track["id"]
uri= track["uri"]
url=track["external_urls"]["spotify"]
name=track["name"]
artists = track["artists"]
artist = []
for i in artists:
artist.append(i["name"])
artists = ', '.join(artist)
title= f'{artists} - {name}'
duration= track["duration_ms"]
explicit = track["explicit"]
disc_number = track["disc_number"]
track_number=track["track_number"]
isrc = track["external_ids"]["isrc"]
preview_url=track["preview_url"]
markets=track["available_markets"]
markets = ', '.join(markets)
album_id=track_album["id"]
album_uri=track_album["uri"]
album_url=track_album["external_urls"]["spotify"]
album_name= track_album["name"]
album_type = track_album["album_type"]
album_artists = track_album["artists"]
album_artist = []
for i in album_artists:
album_artist.append(i["name"])
album_artists = ','.join(album_artist)
album_release_date = track_album["release_date"]
total_tracks= track_album["total_tracks"]
images = track_album["images"]
for image in images:
if image["height"] == 640: large = image["url"]
if image["height"] == 300: medium= image["url"]
if image["height"] == 64: small = image["url"]
text=f"""<figure><img src="{large}"></figure>
<h4>Track</h4><pre>ID : {id}
URI : {uri}
URL : <a href="{url}"<a>{url}</a>
Name : {name}
Artists : {artists}
Duration : {duration} ms
Explicit : {explicit}
Disc No : {disc_number}
Track No : {track_number}
ISRC : {isrc}
Preview : <a href="{preview_url}"<a>{preview_url}</a>
</pre>
<h4>From Album</h4>
<pre>ID : {album_id}
URI : {album_uri}
URL : <a href="{album_url}"<a>{album_url}</a>
Name : {album_name}
Type : {album_type}
Artists : {album_artists}
Release Date : {album_release_date}
Total Tracks : {total_tracks}
</pre>
<h4>Cover</h4><pre> 64 x 64 : <a href="{small}"<a>{small}</a>
300 x 300 : <a href="{medium}"<a>{medium}</a>
600 x 600 : <a href="{large}"<a>{large}</a>
</pre>
<h4>Available Markets</h4>
{markets}
"""
return text, title
elif "/album/" in url:
result = spotify.album(url)
album= result
album_tracks=result["tracks"]["items"]
album_id=album["id"]
album_uri=album["uri"]
album_url=album["external_urls"]["spotify"]
album_name= album["name"]
album_type = album["album_type"]
album_artists = album["artists"]
artist = []
for i in album_artists:
artist.append(i["name"])
album_artists= ', '.join(artist)
title= f'{album_artists} - {album_name}'
markets = album["available_markets"]
markets = ', '.join(markets)
album_release_date = album["release_date"]
label = album["label"]
upc = album["external_ids"]["upc"]
total_tracks= album["total_tracks"]
images = album["images"]
for image in images:
if image["height"] == 640: large = image["url"]
if image["height"] == 300: medium= image["url"]
if image["height"] == 64: small = image["url"]
count=0
t=""
for track in album_tracks:
count+=1
track_id= track["id"]
track_uri= track["uri"]
track_url=track["external_urls"]["spotify"]
track_name=track["name"]
track_artists = track["artists"]
track_artist = []
for i in track_artists:
track_artist.append(i["name"])
track_artists = ','.join(track_artist)
track_duration= track["duration_ms"]
explicit = track["explicit"]
disc_number = track["disc_number"]
track_number=track["track_number"]
preview_url=track["preview_url"]
t+=f"""
<strong>{count}. {track_name}</strong>
<pre>ID : {track_id}
URI : {track_uri}
URL : <a href="{track_url}"<a>{track_url}</a>
Name : {track_name}
Artists : {track_artists}
Duration : {track_duration} ms
Explicit : {explicit}
Disc No : {disc_number}
Track No : {track_number}
Preview : <a href="{preview_url}"<a>{preview_url}</a>
</pre>
"""
text=f"""<figure><img src="{large}"></figure>
<h4>Album</h4><pre>ID : {album_id}
URI : {album_uri}
URL : <a href="{album_url}"<a>{album_url}</a>
Name : {album_name}
Type : {album_type}
Artists : {album_artists}
Release Date : {album_release_date}
Label : {label}
Total Tracks : {total_tracks}
UPC : {upc}
</pre>
<h4>Cover</h4><pre> 64 x 64 : <a href="{small}"<a>{small}</a>
300 x 300 : <a href="{medium}"<a>{medium}</a>
600 x 600 : <a href="{large}"<a>{large}</a>
</pre>
<h4>Tracks</h4>
{t}
<h4>Available Markets</h4>
{markets}
"""
return text, title
async def get_spotify_preview(url):
try:
result = spotify.track(url)
preview=result["preview_url"]
except:
preview=None
return preview