Refactor to use f-strings
This commit is contained in:
parent
fe31152a4f
commit
7feef8c07f
@ -27,11 +27,11 @@ class User(UserMixin, db.Model):
|
|||||||
posts = db.relationship('Post', backref='author', lazy='dynamic')
|
posts = db.relationship('Post', backref='author', lazy='dynamic')
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<User {}>'.format(self.username)
|
return f'<User {self.username}>'
|
||||||
|
|
||||||
def set_last_seen(self):
|
def set_last_seen(self):
|
||||||
self.last_seen = datetime.utcnow()
|
self.last_seen = datetime.utcnow()
|
||||||
|
|
||||||
def set_admin_user(self):
|
def set_admin_user(self):
|
||||||
self.is_admin = True
|
self.is_admin = True
|
||||||
|
|
||||||
@ -40,7 +40,7 @@ class User(UserMixin, db.Model):
|
|||||||
|
|
||||||
def check_password(self, password):
|
def check_password(self, password):
|
||||||
return check_password_hash(self.password_hash, password)
|
return check_password_hash(self.password_hash, password)
|
||||||
|
|
||||||
def follow(self, user):
|
def follow(self, user):
|
||||||
if not self.is_following(user):
|
if not self.is_following(user):
|
||||||
self.followed.append(user)
|
self.followed.append(user)
|
||||||
@ -52,7 +52,7 @@ class User(UserMixin, db.Model):
|
|||||||
def is_following(self, user):
|
def is_following(self, user):
|
||||||
return self.followed.filter(
|
return self.followed.filter(
|
||||||
followers.c.followed_id == user.id).count() > 0
|
followers.c.followed_id == user.id).count() > 0
|
||||||
|
|
||||||
def following_list(self):
|
def following_list(self):
|
||||||
return self.followed.all()
|
return self.followed.all()
|
||||||
|
|
||||||
@ -62,7 +62,7 @@ class User(UserMixin, db.Model):
|
|||||||
# TWITTER
|
# TWITTER
|
||||||
def twitter_following_list(self):
|
def twitter_following_list(self):
|
||||||
return self.twitterFollowed.all()
|
return self.twitterFollowed.all()
|
||||||
|
|
||||||
def is_following_tw(self, uname):
|
def is_following_tw(self, uname):
|
||||||
temp_cid = twitterFollow.query.filter_by(username = uname).first()
|
temp_cid = twitterFollow.query.filter_by(username = uname).first()
|
||||||
if temp_cid is None:
|
if temp_cid is None:
|
||||||
@ -73,11 +73,11 @@ class User(UserMixin, db.Model):
|
|||||||
if f.username == uname:
|
if f.username == uname:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# YOUTUBE
|
# YOUTUBE
|
||||||
def youtube_following_list(self):
|
def youtube_following_list(self):
|
||||||
return self.youtubeFollowed.all()
|
return self.youtubeFollowed.all()
|
||||||
|
|
||||||
def is_following_yt(self, cid):
|
def is_following_yt(self, cid):
|
||||||
temp_cid = youtubeFollow.query.filter_by(channelId = cid).first()
|
temp_cid = youtubeFollow.query.filter_by(channelId = cid).first()
|
||||||
if temp_cid is None:
|
if temp_cid is None:
|
||||||
@ -88,7 +88,7 @@ class User(UserMixin, db.Model):
|
|||||||
if f.channelId == cid:
|
if f.channelId == cid:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
followed = db.relationship(
|
followed = db.relationship(
|
||||||
'User', secondary=followers,
|
'User', secondary=followers,
|
||||||
primaryjoin=(followers.c.follower_id == id),
|
primaryjoin=(followers.c.follower_id == id),
|
||||||
@ -148,23 +148,23 @@ class youtubeFollow(db.Model):
|
|||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
channelId = db.Column(db.String(30), nullable=False)
|
channelId = db.Column(db.String(30), nullable=False)
|
||||||
channelName = db.Column(db.String(100))
|
channelName = db.Column(db.String(100))
|
||||||
followers = db.relationship('User',
|
followers = db.relationship('User',
|
||||||
secondary=channel_association,
|
secondary=channel_association,
|
||||||
back_populates="youtubeFollowed")
|
back_populates="youtubeFollowed")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<youtubeFollow {}>'.format(self.channelName)
|
return f'<youtubeFollow {self.channelName}>'
|
||||||
|
|
||||||
class twitterFollow(db.Model):
|
class twitterFollow(db.Model):
|
||||||
__tablename__ = 'twitterAccount'
|
__tablename__ = 'twitterAccount'
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
username = db.Column(db.String(100), nullable=False)
|
username = db.Column(db.String(100), nullable=False)
|
||||||
followers = db.relationship('User',
|
followers = db.relationship('User',
|
||||||
secondary=twitter_association,
|
secondary=twitter_association,
|
||||||
back_populates="twitterFollowed")
|
back_populates="twitterFollowed")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<twitterFollow {}>'.format(self.username)
|
return f'<twitterFollow {self.username}>'
|
||||||
|
|
||||||
class Post(db.Model):
|
class Post(db.Model):
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
@ -175,5 +175,4 @@ class Post(db.Model):
|
|||||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<Post {}>'.format(self.body)
|
return f'<Post {self.body}>'
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ def twitter(page=0):
|
|||||||
followList.append(f.username)
|
followList.append(f.username)
|
||||||
posts = []
|
posts = []
|
||||||
|
|
||||||
cache_file = glob.glob("app/cache/{}_*".format(current_user.username))
|
cache_file = glob.glob(f"app/cache/{current_user.username}_*")
|
||||||
if (len(cache_file) > 0):
|
if (len(cache_file) > 0):
|
||||||
time_diff = round(time.time() - os.path.getmtime(cache_file[0]))
|
time_diff = round(time.time() - os.path.getmtime(cache_file[0]))
|
||||||
else:
|
else:
|
||||||
@ -103,20 +103,20 @@ def twitter(page=0):
|
|||||||
for f in cache_file:
|
for f in cache_file:
|
||||||
os.remove(f)
|
os.remove(f)
|
||||||
feed = nitterfeed.get_feed(followList)
|
feed = nitterfeed.get_feed(followList)
|
||||||
cache_file = "{u}_{d}.json".format(u=current_user.username, d=time.strftime("%Y%m%d-%H%M%S"))
|
cache_file = f"{current_user.username}_{time.strftime('%Y%m%d-%H%M%S')}.json"
|
||||||
with open("app/cache/{}".format(cache_file), 'w') as fp:
|
with open(f"app/cache/{cache_file}", 'w') as fp:
|
||||||
json.dump(feed, fp)
|
json.dump(feed, fp)
|
||||||
|
|
||||||
# Else, refresh feed
|
# Else, refresh feed
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
cache_file = glob.glob("app/cache/{}*".format(current_user.username))[0]
|
cache_file = glob.glob(f"app/cache/{current_user.username}*")[0]
|
||||||
with open(cache_file, 'r') as fp:
|
with open(cache_file, 'r') as fp:
|
||||||
feed = json.load(fp)
|
feed = json.load(fp)
|
||||||
except:
|
except:
|
||||||
feed = nitterfeed.get_feed(followList)
|
feed = nitterfeed.get_feed(followList)
|
||||||
cache_file = "{u}_{d}.json".format(u=current_user.username, d=time.strftime("%Y%m%d-%H%M%S"))
|
cache_file = f"{current_user.username}_{time.strftime('%Y%m%d-%H%M%S')}.json"
|
||||||
with open("app/cache/{}".format(cache_file), 'w') as fp:
|
with open(f"app/cache/{cache_file}", 'w') as fp:
|
||||||
json.dump(feed, fp)
|
json.dump(feed, fp)
|
||||||
|
|
||||||
posts.extend(feed)
|
posts.extend(feed)
|
||||||
@ -187,7 +187,7 @@ def follow(username):
|
|||||||
form = EmptyForm()
|
form = EmptyForm()
|
||||||
if form.validate_on_submit():
|
if form.validate_on_submit():
|
||||||
if followTwitterAccount(username):
|
if followTwitterAccount(username):
|
||||||
flash("{} followed!".format(username))
|
flash(f"{username} followed!")
|
||||||
return redirect(request.referrer)
|
return redirect(request.referrer)
|
||||||
|
|
||||||
|
|
||||||
@ -202,7 +202,7 @@ def followTwitterAccount(username):
|
|||||||
db.session.commit()
|
db.session.commit()
|
||||||
return True
|
return True
|
||||||
except:
|
except:
|
||||||
flash("Twitter: Couldn't follow {}. Already followed?".format(username))
|
flash(f"Twitter: Couldn't follow {username}. Already followed?")
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
flash("Something went wrong... try again")
|
flash("Something went wrong... try again")
|
||||||
@ -215,7 +215,7 @@ def unfollow(username):
|
|||||||
form = EmptyForm()
|
form = EmptyForm()
|
||||||
if form.validate_on_submit():
|
if form.validate_on_submit():
|
||||||
if twUnfollow(username):
|
if twUnfollow(username):
|
||||||
flash("{} unfollowed!".format(username))
|
flash(f"{username} unfollowed!")
|
||||||
return redirect(request.referrer)
|
return redirect(request.referrer)
|
||||||
|
|
||||||
|
|
||||||
@ -248,7 +248,7 @@ def search():
|
|||||||
if results:
|
if results:
|
||||||
return render_template('search.html', form=form, results=results, config=config)
|
return render_template('search.html', form=form, results=results, config=config)
|
||||||
else:
|
else:
|
||||||
flash("User {} not found...".format(user))
|
flash(f"User {user} not found...")
|
||||||
return redirect(request.referrer)
|
return redirect(request.referrer)
|
||||||
else:
|
else:
|
||||||
return render_template('search.html', form=form, config=config)
|
return render_template('search.html', form=form, config=config)
|
||||||
@ -262,7 +262,7 @@ def u(username, page=1):
|
|||||||
if username == "favicon.ico":
|
if username == "favicon.ico":
|
||||||
return redirect(url_for('static', filename='favicons/favicon.ico'))
|
return redirect(url_for('static', filename='favicons/favicon.ico'))
|
||||||
form = EmptyForm()
|
form = EmptyForm()
|
||||||
avatarPath = "img/avatars/{}.png".format(str(random.randint(1, 12)))
|
avatarPath = f"img/avatars/{str(random.randint(1, 12))}.png"
|
||||||
user = nitteruser.get_user_info(username)
|
user = nitteruser.get_user_info(username)
|
||||||
if not user:
|
if not user:
|
||||||
flash("This user is not on Twitter.")
|
flash("This user is not on Twitter.")
|
||||||
@ -281,7 +281,7 @@ def u(username, page=1):
|
|||||||
prev_page = 0
|
prev_page = 0
|
||||||
else:
|
else:
|
||||||
prev_page = page-1
|
prev_page = page-1
|
||||||
|
|
||||||
if page > 2:
|
if page > 2:
|
||||||
page =2
|
page =2
|
||||||
|
|
||||||
@ -300,7 +300,7 @@ def youtube():
|
|||||||
videos = getYoutubePosts(ids)
|
videos = getYoutubePosts(ids)
|
||||||
if videos:
|
if videos:
|
||||||
videos.sort(key=lambda x: x.date, reverse=True)
|
videos.sort(key=lambda x: x.date, reverse=True)
|
||||||
print("--- {} seconds fetching youtube feed---".format(time.time() - start_time))
|
print(f"--- {time.time() - start_time} seconds fetching youtube feed---")
|
||||||
return render_template('youtube.html', title="Yotter | Youtube", videos=videos, followCount=followCount,
|
return render_template('youtube.html', title="Yotter | Youtube", videos=videos, followCount=followCount,
|
||||||
config=config)
|
config=config)
|
||||||
|
|
||||||
@ -337,22 +337,21 @@ def ytsearch():
|
|||||||
filters = {"time": 0, "type": 0, "duration": 0}
|
filters = {"time": 0, "type": 0, "duration": 0}
|
||||||
results = yts.search_by_terms(query, page, autocorrect, sort, filters)
|
results = yts.search_by_terms(query, page, autocorrect, sort, filters)
|
||||||
|
|
||||||
next_page = "/ytsearch?q={q}&s={s}&p={p}".format(q=query, s=sort, p=int(page) + 1)
|
next_page = f"/ytsearch?q={query}&s={sort}&p={int(page)+1}"
|
||||||
if int(page) == 1:
|
if int(page) == 1:
|
||||||
prev_page = "/ytsearch?q={q}&s={s}&p={p}".format(q=query, s=sort, p=1)
|
prev_page = f"/ytsearch?q={query}&s={sort}&p={1}"
|
||||||
else:
|
else:
|
||||||
prev_page = "/ytsearch?q={q}&s={s}&p={p}".format(q=query, s=sort, p=int(page) - 1)
|
prev_page = f"/ytsearch?q={query}&s={sort}&p={int(page)-1}"
|
||||||
|
|
||||||
for video in results['videos']:
|
for video in results['videos']:
|
||||||
hostname = urllib.parse.urlparse(video['videoThumb']).netloc
|
hostname = urllib.parse.urlparse(video['videoThumb']).netloc
|
||||||
video['videoThumb'] = video['videoThumb'].replace("https://{}".format(hostname), "") + "&host=" + hostname
|
video['videoThumb'] = video['videoThumb'].replace(f"https://{hostname}", "") + "&host=" + hostname
|
||||||
|
|
||||||
for channel in results['channels']:
|
for channel in results['channels']:
|
||||||
if config['isInstance']:
|
if config['isInstance']:
|
||||||
channel['thumbnail'] = channel['thumbnail'].replace("~", "/")
|
channel['thumbnail'] = channel['thumbnail'].replace("~", "/")
|
||||||
hostName = urllib.parse.urlparse(channel['thumbnail']).netloc
|
hostName = urllib.parse.urlparse(channel['thumbnail']).netloc
|
||||||
channel['thumbnail'] = channel['thumbnail'].replace("https://{}".format(hostName),
|
channel['thumbnail'] = channel['thumbnail'].replace(f"https://{hostName}", "") + "?host=" + hostName
|
||||||
"") + "?host=" + hostName
|
|
||||||
return render_template('ytsearch.html', form=form, btform=button_form, results=results,
|
return render_template('ytsearch.html', form=form, btform=button_form, results=results,
|
||||||
restricted=config['restrictPublicUsage'], config=config, npage=next_page,
|
restricted=config['restrictPublicUsage'], config=config, npage=next_page,
|
||||||
ppage=prev_page)
|
ppage=prev_page)
|
||||||
@ -380,7 +379,7 @@ def followYoutubeChannel(channelId):
|
|||||||
follow.followers.append(current_user)
|
follow.followers.append(current_user)
|
||||||
db.session.add(follow)
|
db.session.add(follow)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
flash("{} followed!".format(channelData['channel_name']))
|
flash(f"{channelData['channel_name']} followed!")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
@ -388,8 +387,8 @@ def followYoutubeChannel(channelId):
|
|||||||
print(e)
|
print(e)
|
||||||
return False
|
return False
|
||||||
except KeyError as ke:
|
except KeyError as ke:
|
||||||
print("KeyError: {}:'{}' could not be found".format(ke, channelId))
|
print(f"KeyError: {ke}:'{channelId}' could not be found")
|
||||||
flash("Youtube: ChannelId '{}' is not valid".format(channelId))
|
flash(f"Youtube: ChannelId '{channelId}' is not valid")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ -410,7 +409,7 @@ def unfollowYoutubeChannel(channelId):
|
|||||||
if channel:
|
if channel:
|
||||||
db.session.delete(channel)
|
db.session.delete(channel)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
flash("{} unfollowed!".format(name))
|
flash(f"{name} unfollowed!")
|
||||||
except:
|
except:
|
||||||
flash("There was an error unfollowing the user. Try again.")
|
flash("There was an error unfollowing the user. Try again.")
|
||||||
|
|
||||||
@ -435,22 +434,22 @@ def channel(id):
|
|||||||
for video in data['items']:
|
for video in data['items']:
|
||||||
if config['isInstance']:
|
if config['isInstance']:
|
||||||
hostName = urllib.parse.urlparse(video['thumbnail'][1:]).netloc
|
hostName = urllib.parse.urlparse(video['thumbnail'][1:]).netloc
|
||||||
video['thumbnail'] = video['thumbnail'].replace("https://{}".format(hostName), "")[1:].replace("hqdefault",
|
video['thumbnail'] = video['thumbnail'].replace(f"https://{hostName}", "")[1:].replace("hqdefault",
|
||||||
"mqdefault") + "&host=" + hostName
|
"mqdefault") + "&host=" + hostName
|
||||||
else:
|
else:
|
||||||
video['thumbnail'] = video['thumbnail'].replace('/', '~')
|
video['thumbnail'] = video['thumbnail'].replace('/', '~')
|
||||||
|
|
||||||
if config['isInstance']:
|
if config['isInstance']:
|
||||||
hostName = urllib.parse.urlparse(data['avatar'][1:]).netloc
|
hostName = urllib.parse.urlparse(data['avatar'][1:]).netloc
|
||||||
data['avatar'] = data['avatar'].replace("https://{}".format(hostName), "")[1:] + "?host=" + hostName
|
data['avatar'] = data['avatar'].replace(f"https://{hostName}", "")[1:] + "?host=" + hostName
|
||||||
else:
|
else:
|
||||||
data['avatar'] = data['avatar'].replace('/', '~')
|
data['avatar'] = data['avatar'].replace('/', '~')
|
||||||
|
|
||||||
next_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=int(page) + 1)
|
next_page = f"/channel/{id}?s={sort}&p={int(page)+1}"
|
||||||
if int(page) == 1:
|
if int(page) == 1:
|
||||||
prev_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=1)
|
prev_page = f"/channel/{id}?s={sort}&p={1}"
|
||||||
else:
|
else:
|
||||||
prev_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=int(page) - 1)
|
prev_page = f"/channel/{id}?s={sort}&p={int(page)-1}"
|
||||||
|
|
||||||
return render_template('channel.html', form=form, btform=button_form, data=data,
|
return render_template('channel.html', form=form, btform=button_form, data=data,
|
||||||
restricted=config['restrictPublicUsage'], config=config, next_page=next_page,
|
restricted=config['restrictPublicUsage'], config=config, next_page=next_page,
|
||||||
@ -488,11 +487,11 @@ def watch():
|
|||||||
if info['error'] == False:
|
if info['error'] == False:
|
||||||
for format in info['formats']:
|
for format in info['formats']:
|
||||||
hostName = urllib.parse.urlparse(format['url']).netloc
|
hostName = urllib.parse.urlparse(format['url']).netloc
|
||||||
format['url'] = format['url'].replace("https://{}".format(hostName), "") + "&host=" + hostName
|
format['url'] = format['url'].replace(f"https://{hostName}", "") + "&host=" + hostName
|
||||||
|
|
||||||
for format in info['audio_formats']:
|
for format in info['audio_formats']:
|
||||||
hostName = urllib.parse.urlparse(format['url']).netloc
|
hostName = urllib.parse.urlparse(format['url']).netloc
|
||||||
format['url'] = format['url'].replace("https://{}".format(hostName), "") + "&host=" + hostName
|
format['url'] = format['url'].replace(f"https://{hostName}", "") + "&host=" + hostName
|
||||||
|
|
||||||
# Markup description
|
# Markup description
|
||||||
try:
|
try:
|
||||||
@ -804,7 +803,7 @@ def status():
|
|||||||
|
|
||||||
@app.route('/error/<errno>')
|
@app.route('/error/<errno>')
|
||||||
def error(errno):
|
def error(errno):
|
||||||
return render_template('{}.html'.format(str(errno)), config=config)
|
return render_template(f'{str(errno)}.html', config=config)
|
||||||
|
|
||||||
|
|
||||||
def getTimeDiff(t):
|
def getTimeDiff(t):
|
||||||
@ -812,24 +811,26 @@ def getTimeDiff(t):
|
|||||||
|
|
||||||
if diff.days == 0:
|
if diff.days == 0:
|
||||||
if diff.seconds > 3599:
|
if diff.seconds > 3599:
|
||||||
timeString = "{}h".format(int((diff.seconds / 60) / 60))
|
num = int((diff.seconds / 60) / 60)
|
||||||
|
timeString = f"{num}h"
|
||||||
else:
|
else:
|
||||||
timeString = "{}m".format(int(diff.seconds / 60))
|
num = int(diff.seconds / 60)
|
||||||
|
timeString = f"{num}m"
|
||||||
else:
|
else:
|
||||||
timeString = "{}d".format(diff.days)
|
timeString = f"{diff.days}d"
|
||||||
return timeString
|
return timeString
|
||||||
|
|
||||||
|
|
||||||
def isTwitterUser(username):
|
def isTwitterUser(username):
|
||||||
response = requests.get('{instance}{user}/rss'.format(instance=NITTERINSTANCE, user=username))
|
response = requests.get(f'{NITTERINSTANCE}{username}/rss')
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def twitterUserSearch(terms):
|
def twitterUserSearch(terms):
|
||||||
response = urllib.request.urlopen(
|
url = f'{NITTERINSTANCE}search?f=users&q={urllib.parse.quote(terms)}'
|
||||||
'{instance}search?f=users&q={user}'.format(instance=NITTERINSTANCE, user=urllib.parse.quote(terms))).read()
|
response = urllib.request.urlopen(url).read()
|
||||||
html = BeautifulSoup(str(response), "lxml")
|
html = BeautifulSoup(str(response), "lxml")
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
@ -843,14 +844,14 @@ def twitterUserSearch(terms):
|
|||||||
'unicode_escape').encode('latin_1').decode('utf8'),
|
'unicode_escape').encode('latin_1').decode('utf8'),
|
||||||
"username": item.find('a', attrs={'class': 'username'}).getText().encode('latin_1').decode(
|
"username": item.find('a', attrs={'class': 'username'}).getText().encode('latin_1').decode(
|
||||||
'unicode_escape').encode('latin_1').decode('utf8'),
|
'unicode_escape').encode('latin_1').decode('utf8'),
|
||||||
'avatar': "{i}{s}".format(i=NITTERINSTANCE, s=item.find('img', attrs={'class': 'avatar'})['src'][1:])
|
'avatar': NITTERINSTANCE + item.find('img', attrs={'class': 'avatar'})['src'][1:],
|
||||||
}
|
}
|
||||||
results.append(user)
|
results.append(user)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
def getTwitterUserInfo(username):
|
def getTwitterUserInfo(username):
|
||||||
response = urllib.request.urlopen('{instance}{user}'.format(instance=NITTERINSTANCE, user=username)).read()
|
response = urllib.request.urlopen('{NITTERINSTANCE}{username}').read()
|
||||||
# rssFeed = feedparser.parse(response.content)
|
# rssFeed = feedparser.parse(response.content)
|
||||||
|
|
||||||
html = BeautifulSoup(str(response), "lxml")
|
html = BeautifulSoup(str(response), "lxml")
|
||||||
@ -881,9 +882,7 @@ def getTwitterUserInfo(username):
|
|||||||
"followers": numerize.numerize(
|
"followers": numerize.numerize(
|
||||||
int(html.find_all('span', attrs={'class': 'profile-stat-num'})[2].string.replace(",", ""))),
|
int(html.find_all('span', attrs={'class': 'profile-stat-num'})[2].string.replace(",", ""))),
|
||||||
"likes": html.find_all('span', attrs={'class': 'profile-stat-num'})[3].string,
|
"likes": html.find_all('span', attrs={'class': 'profile-stat-num'})[3].string,
|
||||||
"profilePic": "{instance}{pic}".format(instance=NITTERINSTANCE,
|
"profilePic": NITTERINSTANCE + html.find('a', attrs={'class': 'profile-card-avatar'})['href'][1:],
|
||||||
pic=html.find('a', attrs={'class': 'profile-card-avatar'})['href'][
|
|
||||||
1:])
|
|
||||||
}
|
}
|
||||||
return user
|
return user
|
||||||
|
|
||||||
@ -891,7 +890,7 @@ def getTwitterUserInfo(username):
|
|||||||
def getFeed(urls):
|
def getFeed(urls):
|
||||||
feedPosts = []
|
feedPosts = []
|
||||||
with FuturesSession() as session:
|
with FuturesSession() as session:
|
||||||
futures = [session.get('{instance}{user}'.format(instance=NITTERINSTANCE, user=u.username)) for u in urls]
|
futures = [session.get(f'{NITTERINSTANCE}{u.username}') for u in urls]
|
||||||
for future in as_completed(futures):
|
for future in as_completed(futures):
|
||||||
res= future.result().content
|
res= future.result().content
|
||||||
html = BeautifulSoup(res, "html.parser")
|
html = BeautifulSoup(res, "html.parser")
|
||||||
@ -960,7 +959,7 @@ def getPosts(account):
|
|||||||
feedPosts = []
|
feedPosts = []
|
||||||
|
|
||||||
# Gather profile info.
|
# Gather profile info.
|
||||||
rssFeed = urllib.request.urlopen('{instance}{user}'.format(instance=NITTERINSTANCE, user=account)).read()
|
rssFeed = urllib.request.urlopen(f'{NITTERINSTANCE}{account}').read()
|
||||||
# Gather feedPosts
|
# Gather feedPosts
|
||||||
res = rssFeed.decode('utf-8')
|
res = rssFeed.decode('utf-8')
|
||||||
html = BeautifulSoup(res, "html.parser")
|
html = BeautifulSoup(res, "html.parser")
|
||||||
@ -1018,8 +1017,7 @@ def getPosts(account):
|
|||||||
def getYoutubePosts(ids):
|
def getYoutubePosts(ids):
|
||||||
videos = []
|
videos = []
|
||||||
with FuturesSession() as session:
|
with FuturesSession() as session:
|
||||||
futures = [session.get('https://www.youtube.com/feeds/videos.xml?channel_id={id}'.format(id=id.channelId)) for
|
futures = [session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={id.channelId}') for id in ids]
|
||||||
id in ids]
|
|
||||||
for future in as_completed(futures):
|
for future in as_completed(futures):
|
||||||
resp = future.result()
|
resp = future.result()
|
||||||
rssFeed = feedparser.parse(resp.content)
|
rssFeed = feedparser.parse(resp.content)
|
||||||
@ -1050,7 +1048,7 @@ def getYoutubePosts(ids):
|
|||||||
video.timeStamp = getTimeDiff(vid.published_parsed)
|
video.timeStamp = getTimeDiff(vid.published_parsed)
|
||||||
except:
|
except:
|
||||||
if time != 0:
|
if time != 0:
|
||||||
video.timeStamp = "{} days".format(str(time.days))
|
video.timeStamp = f"{str(time.days)} days"
|
||||||
else:
|
else:
|
||||||
video.timeStamp = "Unknown"
|
video.timeStamp = "Unknown"
|
||||||
|
|
||||||
@ -1061,7 +1059,7 @@ def getYoutubePosts(ids):
|
|||||||
video.videoTitle = vid.title
|
video.videoTitle = vid.title
|
||||||
if config['isInstance']:
|
if config['isInstance']:
|
||||||
hostName = urllib.parse.urlparse(vid.media_thumbnail[0]['url']).netloc
|
hostName = urllib.parse.urlparse(vid.media_thumbnail[0]['url']).netloc
|
||||||
video.videoThumb = vid.media_thumbnail[0]['url'].replace("https://{}".format(hostName), "").replace(
|
video.videoThumb = vid.media_thumbnail[0]['url'].replace(f"https://{hostName}", "").replace(
|
||||||
"hqdefault", "mqdefault") + "?host=" + hostName
|
"hqdefault", "mqdefault") + "?host=" + hostName
|
||||||
else:
|
else:
|
||||||
video.videoThumb = vid.media_thumbnail[0]['url'].replace('/', '~')
|
video.videoThumb = vid.media_thumbnail[0]['url'].replace('/', '~')
|
||||||
@ -1070,4 +1068,4 @@ def getYoutubePosts(ids):
|
|||||||
video.description = re.sub(r'^https?:\/\/.*[\r\n]*', '', video.description[0:120] + "...",
|
video.description = re.sub(r'^https?:\/\/.*[\r\n]*', '', video.description[0:120] + "...",
|
||||||
flags=re.MULTILINE)
|
flags=re.MULTILINE)
|
||||||
videos.append(video)
|
videos.append(video)
|
||||||
return videos
|
return videos
|
||||||
|
@ -22,13 +22,13 @@ def get_feed(usernames, daysMaxOld=10, includeRT=True):
|
|||||||
'''
|
'''
|
||||||
feedTweets = []
|
feedTweets = []
|
||||||
with FuturesSession() as session:
|
with FuturesSession() as session:
|
||||||
futures = [session.get('{instance}{user}'.format(instance=config['nitterInstance'], user=u)) for u in usernames]
|
futures = [session.get(f'{config["nitterInstance"]}{u}') for u in usernames]
|
||||||
for future in as_completed(futures):
|
for future in as_completed(futures):
|
||||||
res = future.result().content.decode('utf-8')
|
res = future.result().content.decode('utf-8')
|
||||||
html = BeautifulSoup(res, "html.parser")
|
html = BeautifulSoup(res, "html.parser")
|
||||||
feedPosts = user.get_feed_tweets(html)
|
feedPosts = user.get_feed_tweets(html)
|
||||||
feedTweets.append(feedPosts)
|
feedTweets.append(feedPosts)
|
||||||
|
|
||||||
userFeed = []
|
userFeed = []
|
||||||
for feed in feedTweets:
|
for feed in feedTweets:
|
||||||
if not includeRT:
|
if not includeRT:
|
||||||
@ -46,4 +46,4 @@ def get_feed(usernames, daysMaxOld=10, includeRT=True):
|
|||||||
userFeed.sort(key=lambda item:item['timeStamp'], reverse=True)
|
userFeed.sort(key=lambda item:item['timeStamp'], reverse=True)
|
||||||
except:
|
except:
|
||||||
return userFeed
|
return userFeed
|
||||||
return userFeed
|
return userFeed
|
||||||
|
@ -19,7 +19,7 @@ config = json.load(open('yotter-config.json'))
|
|||||||
config['nitterInstance']
|
config['nitterInstance']
|
||||||
|
|
||||||
def get_user_info(username):
|
def get_user_info(username):
|
||||||
response = urllib.request.urlopen('{instance}{user}'.format(instance=config['nitterInstance'], user=username)).read()
|
response = urllib.request.urlopen(f'{config["nitterInstance"]}{username}').read()
|
||||||
#rssFeed = feedparser.parse(response.content)
|
#rssFeed = feedparser.parse(response.content)
|
||||||
|
|
||||||
html = BeautifulSoup(str(response), "lxml")
|
html = BeautifulSoup(str(response), "lxml")
|
||||||
@ -32,7 +32,7 @@ def get_user_info(username):
|
|||||||
fullName = html.find('a', attrs={'class':'profile-card-fullname'}).getText().encode('latin1').decode('unicode_escape').encode('latin1').decode('utf8')
|
fullName = html.find('a', attrs={'class':'profile-card-fullname'}).getText().encode('latin1').decode('unicode_escape').encode('latin1').decode('utf8')
|
||||||
else:
|
else:
|
||||||
fullName = None
|
fullName = None
|
||||||
|
|
||||||
if html.find('div', attrs={'class':'profile-bio'}):
|
if html.find('div', attrs={'class':'profile-bio'}):
|
||||||
profileBio = html.find('div', attrs={'class':'profile-bio'}).getText().encode('latin1').decode('unicode_escape').encode('latin1').decode('utf8')
|
profileBio = html.find('div', attrs={'class':'profile-bio'}).getText().encode('latin1').decode('unicode_escape').encode('latin1').decode('utf8')
|
||||||
else:
|
else:
|
||||||
@ -46,12 +46,12 @@ def get_user_info(username):
|
|||||||
"following":html.find_all('span', attrs={'class':'profile-stat-num'})[1].string,
|
"following":html.find_all('span', attrs={'class':'profile-stat-num'})[1].string,
|
||||||
"followers":numerize.numerize(int(html.find_all('span', attrs={'class':'profile-stat-num'})[2].string.replace(",",""))),
|
"followers":numerize.numerize(int(html.find_all('span', attrs={'class':'profile-stat-num'})[2].string.replace(",",""))),
|
||||||
"likes":html.find_all('span', attrs={'class':'profile-stat-num'})[3].string,
|
"likes":html.find_all('span', attrs={'class':'profile-stat-num'})[3].string,
|
||||||
"profilePic":"{instance}{pic}".format(instance=config['nitterInstance'], pic=html.find('a', attrs={'class':'profile-card-avatar'})['href'][1:])
|
"profilePic":config['nitterInstance'] + html.find('a', attrs={'class':'profile-card-avatar'})['href'][1:],
|
||||||
}
|
}
|
||||||
return user
|
return user
|
||||||
|
|
||||||
def get_tweets(user, page=1):
|
def get_tweets(user, page=1):
|
||||||
feed = urllib.request.urlopen('{instance}{user}'.format(instance=config['nitterInstance'], user=user)).read()
|
feed = urllib.request.urlopen(f'{config["nitterInstance"]}{user}').read()
|
||||||
#Gather feedPosts
|
#Gather feedPosts
|
||||||
res = feed.decode('utf-8')
|
res = feed.decode('utf-8')
|
||||||
html = BeautifulSoup(res, "html.parser")
|
html = BeautifulSoup(res, "html.parser")
|
||||||
@ -59,8 +59,9 @@ def get_tweets(user, page=1):
|
|||||||
|
|
||||||
if page == 2:
|
if page == 2:
|
||||||
nextPage = html.find('div', attrs={'class':'show-more'}).find('a')['href']
|
nextPage = html.find('div', attrs={'class':'show-more'}).find('a')['href']
|
||||||
print('{instance}{user}{page}'.format(instance=config['nitterInstance'], user=user, page=nextPage))
|
url = f'{config["nitterInstance"]}{user}{nextPage}'
|
||||||
feed = urllib.request.urlopen('{instance}{user}{page}'.format(instance=config['nitterInstance'], user=user, page=nextPage)).read()
|
print(url)
|
||||||
|
feed = urllib.request.urlopen(url).read()
|
||||||
res = feed.decode('utf-8')
|
res = feed.decode('utf-8')
|
||||||
html = BeautifulSoup(res, "html.parser")
|
html = BeautifulSoup(res, "html.parser")
|
||||||
feedPosts = get_feed_tweets(html)
|
feedPosts = get_feed_tweets(html)
|
||||||
@ -96,17 +97,17 @@ def get_feed_tweets(html):
|
|||||||
tweet['timeStamp'] = str(datetime.datetime.strptime(date_time_str, '%d/%m/%Y %H:%M:%S'))
|
tweet['timeStamp'] = str(datetime.datetime.strptime(date_time_str, '%d/%m/%Y %H:%M:%S'))
|
||||||
tweet['date'] = post.find('span', attrs={'class':'tweet-date'}).find('a').text
|
tweet['date'] = post.find('span', attrs={'class':'tweet-date'}).find('a').text
|
||||||
tweet['content'] = Markup(yotterify(post.find('div', attrs={'class':'tweet-content'}).decode_contents().replace("\n", "<br>")))
|
tweet['content'] = Markup(yotterify(post.find('div', attrs={'class':'tweet-content'}).decode_contents().replace("\n", "<br>")))
|
||||||
|
|
||||||
if post.find('div', attrs={'class':'retweet-header'}):
|
if post.find('div', attrs={'class':'retweet-header'}):
|
||||||
tweet['username'] = post.find('div', attrs={'class':'retweet-header'}).find('div', attrs={'class':'icon-container'}).text
|
tweet['username'] = post.find('div', attrs={'class':'retweet-header'}).find('div', attrs={'class':'icon-container'}).text
|
||||||
tweet['isRT'] = True
|
tweet['isRT'] = True
|
||||||
else:
|
else:
|
||||||
tweet['username'] = tweet['op']
|
tweet['username'] = tweet['op']
|
||||||
tweet['isRT'] = False
|
tweet['isRT'] = False
|
||||||
|
|
||||||
tweet['profilePic'] = config['nitterInstance']+post.find('a', attrs={'class':'tweet-avatar'}).find('img')['src'][1:]
|
tweet['profilePic'] = config['nitterInstance']+post.find('a', attrs={'class':'tweet-avatar'}).find('img')['src'][1:]
|
||||||
tweet['url'] = config['nitterInstance'] + post.find('a', attrs={'class':'tweet-link'})['href'][1:]
|
tweet['url'] = config['nitterInstance'] + post.find('a', attrs={'class':'tweet-link'})['href'][1:]
|
||||||
|
|
||||||
# Is quoting another tweet
|
# Is quoting another tweet
|
||||||
if post.find('div', attrs={'class':'quote'}):
|
if post.find('div', attrs={'class':'quote'}):
|
||||||
tweet['isReply'] = True
|
tweet['isReply'] = True
|
||||||
@ -123,7 +124,7 @@ def get_feed_tweets(html):
|
|||||||
tweet['replyingTweetContent'] = Markup(quote.find('div', attrs={'class':'quote-text'}).replace("\n", "<br>"))
|
tweet['replyingTweetContent'] = Markup(quote.find('div', attrs={'class':'quote-text'}).replace("\n", "<br>"))
|
||||||
except:
|
except:
|
||||||
tweet['replyingTweetContent'] = Markup(quote.find('div', attrs={'class':'quote-text'}))
|
tweet['replyingTweetContent'] = Markup(quote.find('div', attrs={'class':'quote-text'}))
|
||||||
|
|
||||||
if quote.find('a', attrs={'class':'still-image'}):
|
if quote.find('a', attrs={'class':'still-image'}):
|
||||||
tweet['replyAttachedImages'] = []
|
tweet['replyAttachedImages'] = []
|
||||||
images = quote.find_all('a', attrs={'class':'still-image'})
|
images = quote.find_all('a', attrs={'class':'still-image'})
|
||||||
@ -135,7 +136,7 @@ def get_feed_tweets(html):
|
|||||||
post.find('div', attrs={'class':'quote'}).decompose()
|
post.find('div', attrs={'class':'quote'}).decompose()
|
||||||
else:
|
else:
|
||||||
tweet['isReply'] = False
|
tweet['isReply'] = False
|
||||||
|
|
||||||
# Has attatchments
|
# Has attatchments
|
||||||
if post.find('div', attrs={'class':'attachments'}):
|
if post.find('div', attrs={'class':'attachments'}):
|
||||||
# Images
|
# Images
|
||||||
@ -167,8 +168,8 @@ def get_feed_tweets(html):
|
|||||||
elif 'heart' in str(stat):
|
elif 'heart' in str(stat):
|
||||||
tweet['likes'] = stat.find('div',attrs={'class':'icon-container'}).text
|
tweet['likes'] = stat.find('div',attrs={'class':'icon-container'}).text
|
||||||
else:
|
else:
|
||||||
tweet['quotes'] = stat.find('div',attrs={'class':'icon-container'}).text
|
tweet['quotes'] = stat.find('div',attrs={'class':'icon-container'}).text
|
||||||
feedPosts.append(tweet)
|
feedPosts.append(tweet)
|
||||||
else:
|
else:
|
||||||
return {"emptyFeed": True}
|
return {"emptyFeed": True}
|
||||||
return feedPosts
|
return feedPosts
|
||||||
|
@ -258,5 +258,3 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
|
|||||||
parameters_dictionary = request.args,
|
parameters_dictionary = request.args,
|
||||||
**info
|
**info
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -155,13 +155,13 @@ def get_info_grid_video_item(item, channel=None):
|
|||||||
'timeStamp':published,
|
'timeStamp':published,
|
||||||
'duration':duration,
|
'duration':duration,
|
||||||
'channelName':channel['username'],
|
'channelName':channel['username'],
|
||||||
'authorUrl':"/channel/{}".format(channel['channelId']),
|
'authorUrl':f"/channel/{channel['channelId']}",
|
||||||
'channelId':channel['channelId'],
|
'channelId':channel['channelId'],
|
||||||
'id':item['videoId'],
|
'id':item['videoId'],
|
||||||
'videoUrl':"/watch?v={}".format(item['videoId']),
|
'videoUrl':f"/watch?v={item['videoId']}",
|
||||||
'isLive':isLive,
|
'isLive':isLive,
|
||||||
'isUpcoming':isUpcoming,
|
'isUpcoming':isUpcoming,
|
||||||
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
|
'videoThumb':item['thumbnail']['thumbnails'][0]['url'],
|
||||||
}
|
}
|
||||||
return video
|
return video
|
||||||
|
|
||||||
@ -172,18 +172,18 @@ def get_author_info_from_channel(content):
|
|||||||
channel = {
|
channel = {
|
||||||
"channelId": cmd['channelId'],
|
"channelId": cmd['channelId'],
|
||||||
"username": cmd['title'],
|
"username": cmd['title'],
|
||||||
"thumbnail": "https:{}".format(cmd['avatar']['thumbnails'][0]['url'].replace("/", "~")),
|
"thumbnail": f"https:{cmd['avatar']['thumbnails'][0]['url'].replace('/', '~')}",
|
||||||
"description":description,
|
"description":description,
|
||||||
"suscribers": cmd['subscriberCountText']['runs'][0]['text'].split(" ")[0],
|
"suscribers": cmd['subscriberCountText']['runs'][0]['text'].split(" ")[0],
|
||||||
"banner": cmd['banner']['thumbnails'][0]['url']
|
"banner": cmd['banner']['thumbnails'][0]['url'],
|
||||||
}
|
}
|
||||||
return channel
|
return channel
|
||||||
|
|
||||||
def get_channel_info(channelId, videos=True, page=1, sort=3):
|
def get_channel_info(channelId, videos=True, page=1, sort=3):
|
||||||
if id_or_username(channelId) == "channel":
|
if id_or_username(channelId) == "channel":
|
||||||
videos = []
|
videos = []
|
||||||
ciUrl = "https://www.youtube.com/channel/{}".format(channelId)
|
ciUrl = f"https://www.youtube.com/channel/{channelId}"
|
||||||
mainUrl = "https://www.youtube.com/browse_ajax?ctoken={}".format(channel_ctoken_desktop(channelId, page, sort, "videos"))
|
mainUrl = f"https://www.youtube.com/browse_ajax?ctoken={channel_ctoken_desktop(channelId, page, sort, 'videos')}"
|
||||||
content = json.loads(requests.get(mainUrl, headers=headers).text)
|
content = json.loads(requests.get(mainUrl, headers=headers).text)
|
||||||
req = requests.get(ciUrl, headers=headers).text
|
req = requests.get(ciUrl, headers=headers).text
|
||||||
|
|
||||||
@ -210,4 +210,4 @@ def get_channel_info(channelId, videos=True, page=1, sort=3):
|
|||||||
return {"channel":authorInfo}
|
return {"channel":authorInfo}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
baseUrl = "https://www.youtube.com/user/{}".format(channelId)
|
baseUrl = f"https://www.youtube.com/user/{channelId}"
|
||||||
|
@ -21,7 +21,7 @@ from youtube.util import concat_or_none
|
|||||||
def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''):
|
def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''):
|
||||||
video_id = proto.as_bytes(video_id)
|
video_id = proto.as_bytes(video_id)
|
||||||
secret_key = proto.as_bytes(secret_key)
|
secret_key = proto.as_bytes(secret_key)
|
||||||
|
|
||||||
|
|
||||||
page_info = proto.string(4,video_id) + proto.uint(6, sort)
|
page_info = proto.string(4,video_id) + proto.uint(6, sort)
|
||||||
offset_information = proto.nested(4, page_info) + proto.uint(5, offset)
|
offset_information = proto.nested(4, page_info) + proto.uint(5, offset)
|
||||||
@ -35,11 +35,11 @@ def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''):
|
|||||||
result = proto.nested(2, page_params) + proto.uint(3,6) + proto.nested(6, offset_information)
|
result = proto.nested(2, page_params) + proto.uint(3,6) + proto.nested(6, offset_information)
|
||||||
return base64.urlsafe_b64encode(result).decode('ascii')
|
return base64.urlsafe_b64encode(result).decode('ascii')
|
||||||
|
|
||||||
def comment_replies_ctoken(video_id, comment_id, max_results=500):
|
def comment_replies_ctoken(video_id, comment_id, max_results=500):
|
||||||
|
|
||||||
params = proto.string(2, comment_id) + proto.uint(9, max_results)
|
params = proto.string(2, comment_id) + proto.uint(9, max_results)
|
||||||
params = proto.nested(3, params)
|
params = proto.nested(3, params)
|
||||||
|
|
||||||
result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params)
|
result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params)
|
||||||
return base64.urlsafe_b64encode(result).decode('ascii')
|
return base64.urlsafe_b64encode(result).decode('ascii')
|
||||||
|
|
||||||
|
@ -14,15 +14,15 @@ import flask
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def playlist_ctoken(playlist_id, offset):
|
def playlist_ctoken(playlist_id, offset):
|
||||||
|
|
||||||
offset = proto.uint(1, offset)
|
offset = proto.uint(1, offset)
|
||||||
# this is just obfuscation as far as I can tell. It doesn't even follow protobuf
|
# this is just obfuscation as far as I can tell. It doesn't even follow protobuf
|
||||||
offset = b'PT:' + proto.unpadded_b64encode(offset)
|
offset = b'PT:' + proto.unpadded_b64encode(offset)
|
||||||
offset = proto.string(15, offset)
|
offset = proto.string(15, offset)
|
||||||
|
|
||||||
continuation_info = proto.string( 3, proto.percent_b64encode(offset) )
|
continuation_info = proto.string( 3, proto.percent_b64encode(offset) )
|
||||||
|
|
||||||
playlist_id = proto.string(2, 'VL' + playlist_id )
|
playlist_id = proto.string(2, 'VL' + playlist_id )
|
||||||
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
|
pointless_nest = proto.string(80226972, playlist_id + continuation_info)
|
||||||
|
|
||||||
@ -51,7 +51,7 @@ def playlist_first_page(playlist_id, report_text = "Retrieved playlist"):
|
|||||||
content = json.loads(util.uppercase_escape(content.decode('utf-8')))
|
content = json.loads(util.uppercase_escape(content.decode('utf-8')))
|
||||||
|
|
||||||
return content
|
return content
|
||||||
|
|
||||||
|
|
||||||
#https://m.youtube.com/playlist?itct=CBMQybcCIhMIptj9xJaJ2wIV2JKcCh3Idwu-&ctoken=4qmFsgI2EiRWTFBMT3kwajlBdmxWWlB0bzZJa2pLZnB1MFNjeC0tN1BHVEMaDmVnWlFWRHBEUWxFJTNE&pbj=1
|
#https://m.youtube.com/playlist?itct=CBMQybcCIhMIptj9xJaJ2wIV2JKcCh3Idwu-&ctoken=4qmFsgI2EiRWTFBMT3kwajlBdmxWWlB0bzZJa2pLZnB1MFNjeC0tN1BHVEMaDmVnWlFWRHBEUWxFJTNE&pbj=1
|
||||||
def get_videos(playlist_id, page):
|
def get_videos(playlist_id, page):
|
||||||
|
@ -5,13 +5,13 @@ import io
|
|||||||
def byte(n):
|
def byte(n):
|
||||||
return bytes((n,))
|
return bytes((n,))
|
||||||
|
|
||||||
|
|
||||||
def varint_encode(offset):
|
def varint_encode(offset):
|
||||||
'''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
|
'''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
|
||||||
The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
|
The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
|
||||||
aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
|
aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
|
||||||
1ccccccc 1bbbbbbb 0aaaaaaa
|
1ccccccc 1bbbbbbb 0aaaaaaa
|
||||||
|
|
||||||
This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
|
This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
|
||||||
See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
|
See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
|
||||||
needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case.
|
needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case.
|
||||||
@ -20,20 +20,20 @@ def varint_encode(offset):
|
|||||||
encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
|
encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
|
||||||
offset = offset >> 7
|
offset = offset >> 7
|
||||||
encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
|
encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
|
||||||
|
|
||||||
return bytes(encoded_bytes)
|
return bytes(encoded_bytes)
|
||||||
|
|
||||||
|
|
||||||
def varint_decode(encoded):
|
def varint_decode(encoded):
|
||||||
decoded = 0
|
decoded = 0
|
||||||
for i, byte in enumerate(encoded):
|
for i, byte in enumerate(encoded):
|
||||||
decoded |= (byte & 127) << 7*i
|
decoded |= (byte & 127) << 7*i
|
||||||
|
|
||||||
if not (byte & 128):
|
if not (byte & 128):
|
||||||
break
|
break
|
||||||
return decoded
|
return decoded
|
||||||
|
|
||||||
|
|
||||||
def string(field_number, data):
|
def string(field_number, data):
|
||||||
data = as_bytes(data)
|
data = as_bytes(data)
|
||||||
return _proto_field(2, field_number, varint_encode(len(data)) + data)
|
return _proto_field(2, field_number, varint_encode(len(data)) + data)
|
||||||
@ -41,20 +41,20 @@ nested = string
|
|||||||
|
|
||||||
def uint(field_number, value):
|
def uint(field_number, value):
|
||||||
return _proto_field(0, field_number, varint_encode(value))
|
return _proto_field(0, field_number, varint_encode(value))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def _proto_field(wire_type, field_number, data):
|
def _proto_field(wire_type, field_number, data):
|
||||||
''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
|
''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
|
||||||
return varint_encode( (field_number << 3) | wire_type) + data
|
return varint_encode( (field_number << 3) | wire_type) + data
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def percent_b64encode(data):
|
def percent_b64encode(data):
|
||||||
return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
|
return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
|
||||||
|
|
||||||
|
|
||||||
def unpadded_b64encode(data):
|
def unpadded_b64encode(data):
|
||||||
return base64.urlsafe_b64encode(data).replace(b'=', b'')
|
return base64.urlsafe_b64encode(data).replace(b'=', b'')
|
||||||
|
|
||||||
@ -81,7 +81,7 @@ def read_varint(data):
|
|||||||
i += 1
|
i += 1
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def read_group(data, end_sequence):
|
def read_group(data, end_sequence):
|
||||||
start = data.tell()
|
start = data.tell()
|
||||||
index = data.original.find(end_sequence, start)
|
index = data.original.find(end_sequence, start)
|
||||||
@ -101,7 +101,7 @@ def read_protobuf(data):
|
|||||||
break
|
break
|
||||||
wire_type = tag & 7
|
wire_type = tag & 7
|
||||||
field_number = tag >> 3
|
field_number = tag >> 3
|
||||||
|
|
||||||
if wire_type == 0:
|
if wire_type == 0:
|
||||||
value = read_varint(data)
|
value = read_varint(data)
|
||||||
elif wire_type == 1:
|
elif wire_type == 1:
|
||||||
|
@ -61,7 +61,7 @@ def get_channel_renderer_item_info(item):
|
|||||||
suscribers = item['subscriberCountText']['simpleText'].split(" ")[0]
|
suscribers = item['subscriberCountText']['simpleText'].split(" ")[0]
|
||||||
except:
|
except:
|
||||||
suscribers = "?"
|
suscribers = "?"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
description = utils.get_description_snippet_text(item['descriptionSnippet']['runs'])
|
description = utils.get_description_snippet_text(item['descriptionSnippet']['runs'])
|
||||||
except KeyError:
|
except KeyError:
|
||||||
@ -159,10 +159,9 @@ def get_video_renderer_item_info(item):
|
|||||||
'authorUrl':"/channel/{}".format(item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId']),
|
'authorUrl':"/channel/{}".format(item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId']),
|
||||||
'channelId':item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'],
|
'channelId':item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'],
|
||||||
'id':item['videoId'],
|
'id':item['videoId'],
|
||||||
'videoUrl':"/watch?v={}".format(item['videoId']),
|
'videoUrl':f"/watch?v={item['videoId']}",
|
||||||
'isLive':isLive,
|
'isLive':isLive,
|
||||||
'isUpcoming':isUpcoming,
|
'isUpcoming':isUpcoming,
|
||||||
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
|
'videoThumb':item['thumbnail']['thumbnails'][0]['url'],
|
||||||
}
|
}
|
||||||
return video
|
return video
|
||||||
|
|
||||||
|
@ -304,7 +304,7 @@ def video_id(url):
|
|||||||
# default, sddefault, mqdefault, hqdefault, hq720
|
# default, sddefault, mqdefault, hqdefault, hq720
|
||||||
def get_thumbnail_url(video_id):
|
def get_thumbnail_url(video_id):
|
||||||
return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
|
return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
|
||||||
|
|
||||||
def seconds_to_timestamp(seconds):
|
def seconds_to_timestamp(seconds):
|
||||||
seconds = int(seconds)
|
seconds = int(seconds)
|
||||||
hours, seconds = divmod(seconds,3600)
|
hours, seconds = divmod(seconds,3600)
|
||||||
@ -394,4 +394,3 @@ def check_gevent_exceptions(*tasks):
|
|||||||
for task in tasks:
|
for task in tasks:
|
||||||
if task.exception:
|
if task.exception:
|
||||||
raise task.exception
|
raise task.exception
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ def parse_comment(raw_comment):
|
|||||||
cmnt = {}
|
cmnt = {}
|
||||||
imgHostName = urllib.parse.urlparse(raw_comment['author_avatar'][1:]).netloc
|
imgHostName = urllib.parse.urlparse(raw_comment['author_avatar'][1:]).netloc
|
||||||
cmnt['author'] = raw_comment['author']
|
cmnt['author'] = raw_comment['author']
|
||||||
cmnt['thumbnail'] = raw_comment['author_avatar'].replace("https://{}".format(imgHostName),"")[1:] + "?host=" + imgHostName
|
cmnt['thumbnail'] = raw_comment['author_avatar'].replace(f"https://{imgHostName}","")[1:] + "?host=" + imgHostName
|
||||||
|
|
||||||
print(cmnt['thumbnail'])
|
print(cmnt['thumbnail'])
|
||||||
cmnt['channel'] = raw_comment['author_url']
|
cmnt['channel'] = raw_comment['author_url']
|
||||||
@ -58,4 +58,4 @@ def post_process_comments_info(comments_info):
|
|||||||
comments = []
|
comments = []
|
||||||
for comment in comments_info['comments']:
|
for comment in comments_info['comments']:
|
||||||
comments.append(parse_comment(comment))
|
comments.append(parse_comment(comment))
|
||||||
return comments
|
return comments
|
||||||
|
@ -32,7 +32,7 @@ def get_info(url):
|
|||||||
video['subtitles'] = info['subtitles']
|
video['subtitles'] = info['subtitles']
|
||||||
video['duration'] = info['duration']
|
video['duration'] = info['duration']
|
||||||
video['view_count'] = info['view_count']
|
video['view_count'] = info['view_count']
|
||||||
|
|
||||||
if(info['like_count'] is None):
|
if(info['like_count'] is None):
|
||||||
video['like_count'] = 0
|
video['like_count'] = 0
|
||||||
else:
|
else:
|
||||||
@ -75,4 +75,4 @@ def get_video_formats(formats, audio=False):
|
|||||||
if audio:
|
if audio:
|
||||||
return audio_formats
|
return audio_formats
|
||||||
else:
|
else:
|
||||||
return best_formats
|
return best_formats
|
||||||
|
@ -266,5 +266,3 @@ def format_bytes(bytes):
|
|||||||
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
|
suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
|
||||||
converted = float(bytes) / float(1024 ** exponent)
|
converted = float(bytes) / float(1024 ** exponent)
|
||||||
return '%.2f%s' % (converted, suffix)
|
return '%.2f%s' % (converted, suffix)
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user