mirror of
https://github.com/ovosimpatico/xtream2m3u.git
synced 2026-01-15 16:32:55 -03:00
Add image proxy
This commit is contained in:
162
run.py
162
run.py
@@ -30,7 +30,11 @@ def resolve_dns(hostname):
|
|||||||
continue
|
continue
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def curl_request(url):
|
def curl_request(url, binary=False):
|
||||||
|
"""
|
||||||
|
Make a request with DNS fallback and custom headers
|
||||||
|
binary: If True, return raw bytes instead of text (for images)
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
ua = UserAgent()
|
ua = UserAgent()
|
||||||
headers = {
|
headers = {
|
||||||
@@ -57,18 +61,18 @@ def curl_request(url):
|
|||||||
try:
|
try:
|
||||||
response = requests.get(url, headers=headers)
|
response = requests.get(url, headers=headers)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.text
|
return response.content if binary else response.text
|
||||||
except requests.RequestException:
|
except requests.RequestException:
|
||||||
# If original URL fails, try with IP
|
# If original URL fails, try with IP
|
||||||
headers['Host'] = hostname # Keep original hostname in Host header
|
headers['Host'] = hostname # Keep original hostname in Host header
|
||||||
response = requests.get(ip_url, headers=headers)
|
response = requests.get(ip_url, headers=headers)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.text
|
return response.content if binary else response.text
|
||||||
|
|
||||||
# If DNS resolution fails or no hostname, try original URL
|
# If DNS resolution fails or no hostname, try original URL
|
||||||
response = requests.get(url, headers=headers)
|
response = requests.get(url, headers=headers)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.text
|
return response.content if binary else response.text
|
||||||
|
|
||||||
except SSLError:
|
except SSLError:
|
||||||
return {'error': 'SSL Error', 'details': 'Failed to verify SSL certificate'}, 503
|
return {'error': 'SSL Error', 'details': 'Failed to verify SSL certificate'}, 503
|
||||||
@@ -76,6 +80,146 @@ def curl_request(url):
|
|||||||
print(f"RequestException: {e}")
|
print(f"RequestException: {e}")
|
||||||
return {'error': 'Request Exception', 'details': str(e)}, 503
|
return {'error': 'Request Exception', 'details': str(e)}, 503
|
||||||
|
|
||||||
|
def encode_image_url(url):
|
||||||
|
"""Encode the image URL to be used in the proxy endpoint"""
|
||||||
|
if not url:
|
||||||
|
return ''
|
||||||
|
return urllib.parse.quote(url, safe='')
|
||||||
|
|
||||||
|
@app.route('/image-proxy/<path:image_url>')
|
||||||
|
def proxy_image(image_url):
|
||||||
|
"""Proxy endpoint for images to avoid CORS issues"""
|
||||||
|
try:
|
||||||
|
# Decode the URL
|
||||||
|
original_url = urllib.parse.unquote(image_url)
|
||||||
|
|
||||||
|
# Fetch the image using our existing curl_request function with binary=True
|
||||||
|
response = curl_request(original_url, binary=True)
|
||||||
|
|
||||||
|
if isinstance(response, tuple): # Error response
|
||||||
|
return Response('', mimetype='image/png')
|
||||||
|
|
||||||
|
# Return the image with appropriate headers
|
||||||
|
return Response(
|
||||||
|
response,
|
||||||
|
mimetype='image/*',
|
||||||
|
headers={
|
||||||
|
'Cache-Control': 'public, max-age=31536000',
|
||||||
|
'Access-Control-Allow-Origin': '*'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Image proxy error: {str(e)}")
|
||||||
|
return Response('', mimetype='image/png')
|
||||||
|
|
||||||
|
@app.route('/xmltv', methods=['GET'])
|
||||||
|
def generate_xmltv():
|
||||||
|
# Get parameters from the URL
|
||||||
|
url = request.args.get('url')
|
||||||
|
username = request.args.get('username')
|
||||||
|
password = request.args.get('password')
|
||||||
|
unwanted_groups = request.args.get('unwanted_groups', '')
|
||||||
|
|
||||||
|
if not url or not username or not password:
|
||||||
|
return json.dumps({
|
||||||
|
'error': 'Missing Parameters',
|
||||||
|
'details': 'Required parameters: url, username, and password'
|
||||||
|
}), 400, {'Content-Type': 'application/json'}
|
||||||
|
|
||||||
|
# Convert unwanted groups into a list
|
||||||
|
unwanted_groups = [group.strip() for group in unwanted_groups.split(',')] if unwanted_groups else []
|
||||||
|
|
||||||
|
# Verify credentials first
|
||||||
|
mainurl_response = curl_request(f'{url}/player_api.php?username={username}&password={password}')
|
||||||
|
if isinstance(mainurl_response, tuple):
|
||||||
|
return json.dumps(mainurl_response[0]), mainurl_response[1], {'Content-Type': 'application/json'}
|
||||||
|
|
||||||
|
# If credentials are valid, fetch the XMLTV data directly
|
||||||
|
base_url = url.rstrip('/') # Remove trailing slash if present
|
||||||
|
xmltv_response = curl_request(f'{base_url}/xmltv.php?username={username}&password={password}')
|
||||||
|
|
||||||
|
# Get the current host URL for the proxy
|
||||||
|
host_url = request.host_url.rstrip('/')
|
||||||
|
|
||||||
|
if isinstance(xmltv_response, tuple): # Check if it's an error response
|
||||||
|
return json.dumps(xmltv_response[0]), xmltv_response[1], {'Content-Type': 'application/json'}
|
||||||
|
|
||||||
|
# Replace image URLs in the XMLTV content
|
||||||
|
if not isinstance(xmltv_response, tuple):
|
||||||
|
import re
|
||||||
|
|
||||||
|
def replace_icon_url(match):
|
||||||
|
original_url = match.group(1)
|
||||||
|
proxied_url = f"{host_url}/image-proxy/{encode_image_url(original_url)}"
|
||||||
|
return f'<icon src="{proxied_url}"'
|
||||||
|
|
||||||
|
# Replace icon URLs in the XML
|
||||||
|
xmltv_response = re.sub(
|
||||||
|
r'<icon src="([^"]+)"',
|
||||||
|
replace_icon_url,
|
||||||
|
xmltv_response
|
||||||
|
)
|
||||||
|
|
||||||
|
# If unwanted_groups is specified, we need to filter the XML
|
||||||
|
if unwanted_groups:
|
||||||
|
try:
|
||||||
|
# Fetch categories and channels to get the mapping
|
||||||
|
category_response = curl_request(f'{url}/player_api.php?username={username}&password={password}&action=get_live_categories')
|
||||||
|
livechannel_response = curl_request(f'{url}/player_api.php?username={username}&password={password}&action=get_live_streams')
|
||||||
|
|
||||||
|
if not isinstance(category_response, tuple) and not isinstance(livechannel_response, tuple):
|
||||||
|
categories = json.loads(category_response)
|
||||||
|
channels = json.loads(livechannel_response)
|
||||||
|
|
||||||
|
# Create category mapping
|
||||||
|
category_names = {cat['category_id']: cat['category_name'] for cat in categories}
|
||||||
|
|
||||||
|
# Create set of channel IDs to exclude
|
||||||
|
excluded_channels = {
|
||||||
|
str(channel['stream_id'])
|
||||||
|
for channel in channels
|
||||||
|
if channel['stream_type'] == 'live'
|
||||||
|
and any(
|
||||||
|
unwanted_group.lower() in category_names.get(channel['category_id'], '').lower()
|
||||||
|
for unwanted_group in unwanted_groups
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if excluded_channels:
|
||||||
|
# Simple XML filtering using string operations
|
||||||
|
filtered_lines = []
|
||||||
|
current_channel = None
|
||||||
|
skip_current = False
|
||||||
|
|
||||||
|
for line in xmltv_response.split('\n'):
|
||||||
|
if '<channel id="' in line:
|
||||||
|
current_channel = line.split('"')[1]
|
||||||
|
skip_current = current_channel in excluded_channels
|
||||||
|
|
||||||
|
if not skip_current:
|
||||||
|
if '<programme ' in line:
|
||||||
|
channel_id = line.split('channel="')[1].split('"')[0]
|
||||||
|
skip_current = channel_id in excluded_channels
|
||||||
|
|
||||||
|
if not skip_current:
|
||||||
|
filtered_lines.append(line)
|
||||||
|
|
||||||
|
if '</channel>' in line or '</programme>' in line:
|
||||||
|
skip_current = False
|
||||||
|
|
||||||
|
xmltv_response = '\n'.join(filtered_lines)
|
||||||
|
|
||||||
|
except (json.JSONDecodeError, IndexError, KeyError):
|
||||||
|
# If filtering fails, return unfiltered XMLTV
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Return the modified XMLTV data
|
||||||
|
return Response(
|
||||||
|
xmltv_response,
|
||||||
|
mimetype='application/xml',
|
||||||
|
headers={"Content-Disposition": "attachment; filename=guide.xml"}
|
||||||
|
)
|
||||||
|
|
||||||
@app.route('/m3u', methods=['GET'])
|
@app.route('/m3u', methods=['GET'])
|
||||||
def generate_m3u():
|
def generate_m3u():
|
||||||
# Get parameters from the URL
|
# Get parameters from the URL
|
||||||
@@ -161,15 +305,19 @@ def generate_m3u():
|
|||||||
|
|
||||||
categoryname = {cat['category_id']: cat['category_name'] for cat in categoryraw}
|
categoryname = {cat['category_id']: cat['category_name'] for cat in categoryraw}
|
||||||
|
|
||||||
|
# Get the current host URL for the proxy
|
||||||
|
host_url = request.host_url.rstrip('/')
|
||||||
|
|
||||||
# Generate M3U playlist
|
# Generate M3U playlist
|
||||||
m3u_playlist = "#EXTM3U\n"
|
m3u_playlist = "#EXTM3U\n"
|
||||||
for channel in livechannelraw:
|
for channel in livechannelraw:
|
||||||
if channel['stream_type'] == 'live':
|
if channel['stream_type'] == 'live':
|
||||||
# Use a default category name if category_id is None
|
|
||||||
group_title = categoryname.get(channel["category_id"], "Uncategorized")
|
group_title = categoryname.get(channel["category_id"], "Uncategorized")
|
||||||
# Skip this channel if its group is in the unwanted list
|
|
||||||
if not any(unwanted_group.lower() in group_title.lower() for unwanted_group in unwanted_groups):
|
if not any(unwanted_group.lower() in group_title.lower() for unwanted_group in unwanted_groups):
|
||||||
logo_url = channel.get('stream_icon', '')
|
# Proxy the logo URL
|
||||||
|
original_logo = channel.get('stream_icon', '')
|
||||||
|
logo_url = f"{host_url}/image-proxy/{encode_image_url(original_logo)}" if original_logo else ''
|
||||||
|
|
||||||
m3u_playlist += f'#EXTINF:0 tvg-name="{channel["name"]}" group-title="{group_title}" tvg-logo="{logo_url}",{channel["name"]}\n'
|
m3u_playlist += f'#EXTINF:0 tvg-name="{channel["name"]}" group-title="{group_title}" tvg-logo="{logo_url}",{channel["name"]}\n'
|
||||||
m3u_playlist += f'{fullurl}{channel["stream_id"]}.ts\n'
|
m3u_playlist += f'{fullurl}{channel["stream_id"]}.ts\n'
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user