diff --git a/docker-compose.yml b/docker-compose.yml index 48aba6a..37bb428 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,5 @@ services: - "5000:5000" environment: - FLASK_ENV=production - - GUNICORN_CMD_ARGS="--workers=3 --timeout=600 --keep-alive=10" # - PROXY_URL=https://your-domain.com restart: unless-stopped \ No newline at end of file diff --git a/run.py b/run.py index e6f9997..4dc263d 100644 --- a/run.py +++ b/run.py @@ -5,7 +5,9 @@ import logging import os import re import socket +import time import urllib.parse +from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache import dns.resolver @@ -90,6 +92,19 @@ def setup_custom_dns(): setup_custom_dns() +# Create a session with connection pooling for better performance +session = requests.Session() +session.mount('http://', requests.adapters.HTTPAdapter( + pool_connections=10, + pool_maxsize=20, + max_retries=3 +)) +session.mount('https://', requests.adapters.HTTPAdapter( + pool_connections=10, + pool_maxsize=20, + max_retries=3 +)) + # Common request function with caching for API endpoints @lru_cache(maxsize=128) def fetch_api_data(url, timeout=10): @@ -97,23 +112,32 @@ def fetch_api_data(url, timeout=10): ua = UserAgent() headers = { "User-Agent": ua.chrome, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept": "application/json,text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", + "Accept-Encoding": "gzip, deflate", } try: hostname = urllib.parse.urlparse(url).netloc.split(":")[0] logger.info(f"Making request to host: {hostname}") - response = requests.get(url, headers=headers, timeout=timeout) + # Use session for connection pooling with streaming for large responses + response = session.get(url, headers=headers, timeout=timeout, stream=True) response.raise_for_status() - # Try to parse as JSON + # For large responses, use streaming JSON parsing try: - return json.loads(response.text) + # Check content length to decide parsing strategy + content_length = response.headers.get('Content-Length') + if content_length and int(content_length) > 10_000_000: # > 10MB + logger.info(f"Large response detected ({content_length} bytes), using optimized parsing") + + # Stream the JSON content for better memory efficiency + response.encoding = 'utf-8' # Ensure proper encoding + return response.json() except json.JSONDecodeError: - # Return text if not valid JSON + # Fallback to text for non-JSON responses return response.text except requests.exceptions.SSLError: @@ -310,106 +334,171 @@ def validate_xtream_credentials(url, username, password): return data, None, None +def fetch_api_endpoint(url_info): + """Fetch a single API endpoint - used for concurrent requests""" + url, name, timeout = url_info + try: + logger.info(f"🚀 Fetching {name}...") + start_time = time.time() + data = fetch_api_data(url, timeout=timeout) + end_time = time.time() + + if isinstance(data, list): + logger.info(f"✅ Completed {name} in {end_time-start_time:.1f}s - got {len(data)} items") + else: + logger.info(f"✅ Completed {name} in {end_time-start_time:.1f}s") + return name, data + except Exception as e: + logger.warning(f"❌ Failed to fetch {name}: {e}") + return name, None + def fetch_categories_and_channels(url, username, password, include_vod=False): - """Fetch categories and channels from the Xtream API""" + """Fetch categories and channels from the Xtream API using concurrent requests""" all_categories = [] all_streams = [] try: - # Fetch live categories and streams - live_category_url = f"{url}/player_api.php?username={username}&password={password}&action=get_live_categories" - live_categories = fetch_api_data(live_category_url, timeout=60) + # Prepare all API endpoints to fetch concurrently + api_endpoints = [ + (f"{url}/player_api.php?username={username}&password={password}&action=get_live_categories", + "live_categories", 60), + (f"{url}/player_api.php?username={username}&password={password}&action=get_live_streams", + "live_streams", 180), + ] + + # Add VOD endpoints if requested (WARNING: This will be much slower!) + if include_vod: + logger.warning("⚠️ Including VOD content - this will take significantly longer!") + logger.info("💡 For faster loading, use the API without include_vod=true") + + # Only add the most essential VOD endpoints - skip the massive streams for categories-only requests + api_endpoints.extend([ + (f"{url}/player_api.php?username={username}&password={password}&action=get_vod_categories", + "vod_categories", 60), + (f"{url}/player_api.php?username={username}&password={password}&action=get_series_categories", + "series_categories", 60), + ]) + + # Only fetch the massive stream lists if explicitly needed for M3U generation + vod_for_m3u = request.endpoint == 'generate_m3u' + if vod_for_m3u: + logger.warning("🐌 Fetching massive VOD/Series streams for M3U generation...") + api_endpoints.extend([ + (f"{url}/player_api.php?username={username}&password={password}&action=get_vod_streams", + "vod_streams", 240), + (f"{url}/player_api.php?username={username}&password={password}&action=get_series", + "series", 240), + ]) + else: + logger.info("⚡ Skipping massive VOD streams for categories-only request") + + # Fetch all endpoints concurrently using ThreadPoolExecutor + logger.info(f"Starting concurrent fetch of {len(api_endpoints)} API endpoints...") + results = {} + + with ThreadPoolExecutor(max_workers=10) as executor: # Increased workers for better concurrency + # Submit all API calls + future_to_name = {executor.submit(fetch_api_endpoint, endpoint): endpoint[1] + for endpoint in api_endpoints} + + # Collect results as they complete + for future in as_completed(future_to_name): + name, data = future.result() + results[name] = data + + logger.info("All concurrent API calls completed!") + + # Process live categories and streams (required) + live_categories = results.get("live_categories") + live_streams = results.get("live_streams") if isinstance(live_categories, tuple): # Error response return None, None, live_categories[0], live_categories[1] + if isinstance(live_streams, tuple): # Error response + return None, None, live_streams[0], live_streams[1] - live_channel_url = f"{url}/player_api.php?username={username}&password={password}&action=get_live_streams" - live_channels = fetch_api_data(live_channel_url, timeout=180) # Much longer timeout for large channel lists - - if isinstance(live_channels, tuple): # Error response - return None, None, live_channels[0], live_channels[1] - - if not isinstance(live_categories, list) or not isinstance(live_channels, list): + if not isinstance(live_categories, list) or not isinstance(live_streams, list): return ( None, None, json.dumps( { "error": "Invalid Data Format", - "details": "Live categories or channels data is not in the expected format", + "details": "Live categories or streams data is not in the expected format", } ), 500, ) - # Add content type to live categories and streams - for category in live_categories: - category["content_type"] = "live" - for stream in live_channels: - stream["content_type"] = "live" + # Optimized data processing - batch operations for massive datasets + logger.info("Processing live content...") - all_categories.extend(live_categories) - all_streams.extend(live_channels) + # Batch set content_type for live content + if live_categories: + for category in live_categories: + category["content_type"] = "live" + all_categories.extend(live_categories) - # If VOD is requested, fetch VOD content with timeout handling + if live_streams: + for stream in live_streams: + stream["content_type"] = "live" + all_streams.extend(live_streams) + + logger.info(f"✅ Added {len(live_categories)} live categories and {len(live_streams)} live streams") + + # Process VOD content if requested and available if include_vod: - logger.info("Fetching VOD content - this may take longer...") + logger.info("Processing VOD content...") - try: - # Fetch VOD categories with timeout - vod_category_url = f"{url}/player_api.php?username={username}&password={password}&action=get_vod_categories" - vod_categories = fetch_api_data(vod_category_url, timeout=60) + # Process VOD categories + vod_categories = results.get("vod_categories") + if isinstance(vod_categories, list) and vod_categories: + for category in vod_categories: + category["content_type"] = "vod" + all_categories.extend(vod_categories) + logger.info(f"✅ Added {len(vod_categories)} VOD categories") - if isinstance(vod_categories, list): - # Add content type to VOD categories - for category in vod_categories: - category["content_type"] = "vod" - all_categories.extend(vod_categories) - logger.info(f"Added {len(vod_categories)} VOD categories") + # Process series categories first (lightweight) + series_categories = results.get("series_categories") + if isinstance(series_categories, list) and series_categories: + for category in series_categories: + category["content_type"] = "series" + all_categories.extend(series_categories) + logger.info(f"✅ Added {len(series_categories)} series categories") - # Only fetch VOD streams if categories were successful - vod_streams_url = f"{url}/player_api.php?username={username}&password={password}&action=get_vod_streams" - vod_streams = fetch_api_data(vod_streams_url, timeout=240) # Very long timeout for massive VOD libraries + # Only process massive stream lists if they were actually fetched + vod_streams = results.get("vod_streams") + if isinstance(vod_streams, list) and vod_streams: + logger.info(f"🔥 Processing {len(vod_streams)} VOD streams (this is the slow part)...") - if isinstance(vod_streams, list): - # Add content type to VOD streams - for stream in vod_streams: - stream["content_type"] = "vod" - all_streams.extend(vod_streams) - logger.info(f"Added {len(vod_streams)} VOD streams") + # Batch process for better performance + batch_size = 5000 + for i in range(0, len(vod_streams), batch_size): + batch = vod_streams[i:i + batch_size] + for stream in batch: + stream["content_type"] = "vod" + if i + batch_size < len(vod_streams): + logger.info(f" Processed {i + batch_size}/{len(vod_streams)} VOD streams...") - except Exception as e: - logger.warning(f"Failed to fetch VOD content: {e}") - # Continue without VOD content rather than failing completely + all_streams.extend(vod_streams) + logger.info(f"✅ Added {len(vod_streams)} VOD streams") - try: - # Fetch series categories with timeout - series_category_url = ( - f"{url}/player_api.php?username={username}&password={password}&action=get_series_categories" - ) - series_categories = fetch_api_data(series_category_url, timeout=60) + # Process series (this can also be huge!) + series = results.get("series") + if isinstance(series, list) and series: + logger.info(f"🔥 Processing {len(series)} series (this is also slow)...") - if isinstance(series_categories, list): - # Add content type to series categories - for category in series_categories: - category["content_type"] = "series" - all_categories.extend(series_categories) - logger.info(f"Added {len(series_categories)} series categories") + # Batch process for better performance + batch_size = 5000 + for i in range(0, len(series), batch_size): + batch = series[i:i + batch_size] + for show in batch: + show["content_type"] = "series" + if i + batch_size < len(series): + logger.info(f" Processed {i + batch_size}/{len(series)} series...") - # Only fetch series if categories were successful - series_url = f"{url}/player_api.php?username={username}&password={password}&action=get_series" - series = fetch_api_data(series_url, timeout=240) # Very long timeout for massive series libraries - - if isinstance(series, list): - # Add content type to series - for show in series: - show["content_type"] = "series" - all_streams.extend(series) - logger.info(f"Added {len(series)} series") - - except Exception as e: - logger.warning(f"Failed to fetch series content: {e}") - # Continue without series content rather than failing completely + all_streams.extend(series) + logger.info(f"✅ Added {len(series)} series") except Exception as e: logger.error(f"Critical error fetching API data: {e}") @@ -425,7 +514,7 @@ def fetch_categories_and_channels(url, username, password, include_vod=False): 500, ) - logger.info(f"Successfully fetched {len(all_categories)} total categories and {len(all_streams)} total streams") + logger.info(f"🚀 CONCURRENT FETCH COMPLETE: {len(all_categories)} total categories and {len(all_streams)} total streams") return all_categories, all_streams, None, None @@ -437,8 +526,9 @@ def get_categories(): if error: return error - # Check for VOD parameter - default to false to avoid timeouts + # Check for VOD parameter - default to false to avoid timeouts (VOD is massive and slow!) include_vod = request.args.get("include_vod", "false").lower() == "true" + logger.info(f"VOD content requested: {include_vod}") # Validate credentials user_data, error_json, error_code = validate_xtream_credentials(url, username, password) @@ -511,6 +601,12 @@ def generate_m3u(): no_stream_proxy = request.args.get("nostreamproxy", "").lower() == "true" include_vod = request.args.get("include_vod", "false").lower() == "true" # Default to false to avoid timeouts + # For M3U generation, warn about VOD performance impact + if include_vod: + logger.warning("⚠️ M3U generation with VOD enabled - expect 2-5 minute generation time!") + else: + logger.info("⚡ M3U generation for live content only - should be fast!") + # Log filter parameters logger.info( f"Filter parameters - wanted_groups: {wanted_groups}, unwanted_groups: {unwanted_groups}, include_vod: {include_vod}"