mirror of
https://github.com/ovosimpatico/CLI-TikTok.git
synced 2026-01-15 11:52:52 -03:00
Merge pull request #2 from gageirwin/main
Replace RSS feed for users with simple webscrape to get all videos
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -134,4 +134,5 @@ user_data.json
|
||||
features-missing.txt
|
||||
/video
|
||||
/logs
|
||||
test.py
|
||||
test.py
|
||||
.vscode
|
||||
37
main.py
37
main.py
@@ -1,6 +1,7 @@
|
||||
# Detect and handle launch with arguments
|
||||
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--downloadliked", type=str)
|
||||
parser.add_argument("--downloadcreator", type=str)
|
||||
@@ -13,9 +14,10 @@ parser.add_argument("--streamtrending", type=str)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
import sys
|
||||
|
||||
from log import logtofile as log
|
||||
|
||||
import sys
|
||||
# Introduction and pre-test
|
||||
from src.init import init
|
||||
|
||||
@@ -37,12 +39,13 @@ log("Pre-test complete\n")
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from src.byuser import getLinks, streamuser
|
||||
from src.byuser import proxitok_scraper, streamuser
|
||||
from src.downloader import downloadtiktoks
|
||||
from src.functions import listas
|
||||
from src.streaming import playback, playbackrandom
|
||||
from src.trending import streamtrending
|
||||
|
||||
|
||||
def main():
|
||||
# Needlessly big code to simply prompt the user which action they want to do
|
||||
log("Main menu started")
|
||||
@@ -81,16 +84,10 @@ def main():
|
||||
## Download creator
|
||||
if downloadquestion == 2:
|
||||
log("The user chose to download videos from a creator")
|
||||
print(
|
||||
"Due to specific limitations of the current data method, downloading by creator will only get the latest 24 videos."
|
||||
)
|
||||
print(
|
||||
"This limitation is being actively researched, any contributions will be welcome."
|
||||
)
|
||||
username = str(input("Enter the tiktok username here: "))
|
||||
log(f"The creator chosen was: @{username}\n")
|
||||
links = getLinks(username)
|
||||
downloadtiktoks(links)
|
||||
links = proxitok_scraper(username)
|
||||
downloadtiktoks(links) # add handling for when zero links are passed
|
||||
sys.exit()
|
||||
|
||||
## Stream
|
||||
@@ -140,12 +137,6 @@ def main():
|
||||
## Stream creator
|
||||
if watchquestion == 2:
|
||||
log("The user chose to stream videos from a creator")
|
||||
print(
|
||||
"Due to specific limitations of the current data method, watching by creator will only get the latest 24 videos."
|
||||
)
|
||||
print(
|
||||
"This limitation is being actively researched, any contributions will be welcome."
|
||||
)
|
||||
username = str(input("Enter the tiktok username here: "))
|
||||
log(f"The creator chosen was: @{username}\n")
|
||||
streamuser(username)
|
||||
@@ -160,7 +151,7 @@ def main():
|
||||
print(
|
||||
"This limitation is being actively researched, any contributions will be welcome."
|
||||
)
|
||||
streamtrending()
|
||||
streamtrending(24) # need to implement asking how many trending videos the user wants
|
||||
sys.exit()
|
||||
|
||||
# Error handling for invalid number (3, 4, 6, 133)
|
||||
@@ -210,8 +201,8 @@ def arguments(args):
|
||||
elif args.downloadcreator:
|
||||
username = args.downloadcreator
|
||||
log(f"The creator chosen was: @{username}\n")
|
||||
links = getLinks(username)
|
||||
downloadtiktoks(links)
|
||||
links = proxitok_scraper(username)
|
||||
downloadtiktoks(links) # add handling for when zero links are passed
|
||||
|
||||
elif args.streamlikedrandom:
|
||||
log("The user chose to stream liked videos in shuffled mode\n")
|
||||
@@ -227,12 +218,6 @@ def arguments(args):
|
||||
|
||||
elif args.streamcreator:
|
||||
log("The user chose to stream videos from a creator")
|
||||
print(
|
||||
"Due to specific limitations of the current data method, watching by creator will only get the latest 24 videos."
|
||||
)
|
||||
print(
|
||||
"This limitation is being actively researched, any contributions will be welcome."
|
||||
)
|
||||
username = args.streamcreator
|
||||
log(f"The creator chosen was: @{username}\n")
|
||||
streamuser(username)
|
||||
@@ -245,7 +230,7 @@ def arguments(args):
|
||||
print(
|
||||
"This limitation is being actively researched, any contributions will be welcome."
|
||||
)
|
||||
streamtrending()
|
||||
streamtrending(24) # need to implement asking how many trending videos the user wants
|
||||
|
||||
if silent:
|
||||
arguments(args)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
yt_dlp
|
||||
requests
|
||||
atoma
|
||||
distro
|
||||
distro
|
||||
bs4
|
||||
@@ -1,45 +1,67 @@
|
||||
import sys
|
||||
|
||||
import atoma
|
||||
import time
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from log import logtofile as log
|
||||
from src.streaming import mpv, getVideoInfo
|
||||
from src.streaming import getVideoInfo, mpv
|
||||
|
||||
def info(username):
|
||||
rss_url = f"https://proxitok.pabloferreiro.es/@{username}/rss"
|
||||
response = requests.get(rss_url)
|
||||
|
||||
if response.status_code == 404:
|
||||
print("Something went wrong while getting the information. Make sure the username was correctly inserted and try again.")
|
||||
log(f"{rss_url} returned a 404 error. The username is likely incorrect.")
|
||||
sys.exit()
|
||||
|
||||
if not response.content:
|
||||
print("The specified account does not exist.")
|
||||
log(f"{rss_url} returned no information. The account likely does not exist.")
|
||||
sys.exit()
|
||||
|
||||
return atoma.parse_rss_bytes(response.content)
|
||||
|
||||
|
||||
def getLinks(username):
|
||||
feed = info(username)
|
||||
links = []
|
||||
for i in feed.items:
|
||||
links.append(f"https://www.tiktok.com/@{username}/video/" + i.guid)
|
||||
return links
|
||||
|
||||
def streamuser(username):
|
||||
links = getLinks(username)
|
||||
links = proxitok_scraper(username)
|
||||
|
||||
if not links:
|
||||
error_msg = "The link list is empty. The specified account is likely private or has no published videos"
|
||||
log(error_msg)
|
||||
print("This account is private or has no published videos.")
|
||||
return
|
||||
|
||||
for link in links:
|
||||
url = getVideoInfo(link)
|
||||
mpv(url)
|
||||
log(f"Video {link} was played.")
|
||||
log(f"Video {link} was played.")
|
||||
|
||||
|
||||
def proxitok_scraper(username: str) -> list[str]:
|
||||
print("Obtaining URLs - this can take a while with users with many posts.")
|
||||
session = requests.Session()
|
||||
direct_links = []
|
||||
next_href = ""
|
||||
rate_limit = 0
|
||||
while True:
|
||||
url = f"https://proxitok.pussthecat.org/@{username}{next_href}"
|
||||
response = session.get(url)
|
||||
|
||||
if response.status_code == 429 or response.status_code == 403:
|
||||
# may want to adjust this ratio
|
||||
rate_limit += 1
|
||||
sleep_time = 30 * rate_limit
|
||||
print(f"{response.status_code} {response.reason} sleeping for {sleep_time}")
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
if not response.ok:
|
||||
error_msg = f"{response.status_code} {response.reason} getting {url}"
|
||||
log(error_msg)
|
||||
print(error_msg)
|
||||
return direct_links
|
||||
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
posts = soup.find_all("article", class_="media")
|
||||
|
||||
if not posts:
|
||||
error_msg = "No posts found. The specified account is likely private or has no published videos"
|
||||
log(error_msg)
|
||||
print(f"@{username} is private or has no videos.")
|
||||
return direct_links
|
||||
|
||||
for post in posts:
|
||||
original_link = post.find("span", text="Original")
|
||||
|
||||
if not original_link:
|
||||
continue
|
||||
|
||||
direct_links.append(original_link.parent.parent["href"])
|
||||
|
||||
next_button = soup.find("a", class_="button", text="Next")
|
||||
if next_button.has_attr("disabled"):
|
||||
return direct_links
|
||||
next_href = next_button["href"]
|
||||
|
||||
@@ -1,44 +1,13 @@
|
||||
import sys
|
||||
|
||||
import atoma
|
||||
import time
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from log import logtofile as log
|
||||
|
||||
|
||||
def getInfoTrending():
|
||||
|
||||
response = requests.get("https://proxitok.pabloferreiro.es/trending/rss")
|
||||
if response.status_code == 404:
|
||||
log(
|
||||
f"https://proxitok.pabloferreiro.es/trending/rss returned a 404 error. This is likely a server-side issue"
|
||||
)
|
||||
print(
|
||||
"""Something went wrong while getting the trending information.
|
||||
This is likely an issue with your internet connection or with the API."""
|
||||
)
|
||||
sys.exit()
|
||||
if str(response.content) == "b''":
|
||||
print(
|
||||
"Something went wrong while parsing the trending information. If it persists, report this issue on Discord or Github."
|
||||
)
|
||||
log(
|
||||
"https://proxitok.pabloferreiro.es/trending/rss returned an empty response. This is likely a server-side issue"
|
||||
)
|
||||
sys.exit()
|
||||
return atoma.parse_rss_bytes(response.content)
|
||||
|
||||
|
||||
def getLinksTrending():
|
||||
feed = getInfoTrending()
|
||||
linklist = []
|
||||
for i in feed.items:
|
||||
linklist.append(f"https://www.tiktok.com/" + i.link.split("/")[3] + "/video/" + i.link.split("/")[5])
|
||||
return linklist
|
||||
|
||||
|
||||
def streamtrending():
|
||||
links = getLinksTrending()
|
||||
def streamtrending(amount:int = 24):
|
||||
links = proxitok_trending(amount)
|
||||
|
||||
if len(links) == 0:
|
||||
print(
|
||||
@@ -49,4 +18,57 @@ def streamtrending():
|
||||
|
||||
for i in range(len(links)):
|
||||
mpv(links[i])
|
||||
log(f"{links[i]} was played")
|
||||
log(f"{links[i]} was played")
|
||||
|
||||
|
||||
def proxitok_trending(amount: int = 24) -> list[str]:
|
||||
print("Obtaining URLs - this can take a while when requesting many posts.")
|
||||
session = requests.Session()
|
||||
direct_links = []
|
||||
next_href = ""
|
||||
rate_limit = 0
|
||||
while True:
|
||||
# The "next" page url is always the same but loads different trending videos each time
|
||||
url = f"https://proxitok.pussthecat.org/trending{next_href}"
|
||||
response = session.get(url)
|
||||
print(url)
|
||||
|
||||
if response.status_code == 429 or response.status_code == 403:
|
||||
# may want to adjust this ratio
|
||||
rate_limit += 1
|
||||
sleep_time = 30 * rate_limit
|
||||
print(f"{response.status_code} {response.reason} sleeping for {sleep_time}")
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
if not response.ok:
|
||||
error_msg = f"{response.status_code} {response.reason} getting {url}"
|
||||
log(error_msg)
|
||||
print(error_msg)
|
||||
return direct_links
|
||||
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
posts = soup.find_all("article", class_="media")
|
||||
|
||||
if not posts:
|
||||
error_msg = "No posts found for trending."
|
||||
log(error_msg)
|
||||
print(error_msg)
|
||||
return direct_links
|
||||
|
||||
for post in posts:
|
||||
original_link = post.find("span", text="Original")
|
||||
|
||||
if not original_link:
|
||||
continue
|
||||
|
||||
direct_link = original_link.parent.parent["href"]
|
||||
# stops duplicate videos from being added to the list
|
||||
if not direct_link in direct_links:
|
||||
direct_links.append(direct_link)
|
||||
if len(direct_links) == amount:
|
||||
return direct_links
|
||||
|
||||
next_button = soup.find("a", class_="button", text="Next")
|
||||
next_href = next_button["href"]
|
||||
Reference in New Issue
Block a user