Compare commits

...
This repository has been archived on 2022-06-28. You can view files and clone it, but cannot push or open issues or pull requests.

14 Commits

Author SHA1 Message Date
pluja
48cacb7af0 print cookie 2020-10-17 13:22:15 +02:00
pluja
d14a81acff Fix 2020-10-15 06:57:51 +02:00
pluja
6fee62a491 print(response) 2020-10-12 12:39:19 +02:00
pluja
6c5ce51b26 Ditch python-anticaptcha and use requests 2020-10-12 12:20:42 +02:00
pluja
c1a6c67fea Fix for look dict typo 2020-10-12 09:52:20 +02:00
pluja
438374890d Cookie management 2020-10-12 09:50:41 +02:00
pluja
e028ee929c Warn when solving captcha 2020-10-12 09:11:36 +02:00
pluja
c11eec9555 Fix problem with url var 2020-10-12 08:52:20 +02:00
pluja
4ef4a28e03 Fix requirements.txt 2020-10-12 08:50:03 +02:00
pluja
a678d9a6e1 Merge branch 'yotter-dev' of https://github.com/ytorg/yotter into yotter-dev 2020-10-12 08:47:32 +02:00
pluja
99b9ad5591 New version / CaptchaBypass testing 2020-10-12 08:47:15 +02:00
PLUJA
da47a690e5
Update yotter-config.json 2020-10-12 08:23:21 +02:00
pluja
ceffdcfe24 merge conflict 2020-10-12 08:15:45 +02:00
pluja
c66afd6485 Solve merge conflict 2020-10-12 08:08:52 +02:00
15 changed files with 956 additions and 279 deletions

View File

@ -1,4 +1,3 @@
import datetime
import glob
import json
@ -28,10 +27,10 @@ from youtube_search import YoutubeSearch
from app import app, db
from app.forms import LoginForm, RegistrationForm, EmptyForm, SearchForm, ChannelForm
from app.models import User, twitterPost, ytPost, Post, youtubeFollow, twitterFollow
from youtube import comments, utils
from youtube import comments, utils, channel as ytch, search as yts
from youtube import watch as ytwatch
#########################################
from youtube_data import search as yts
#########################################
@ -326,6 +325,10 @@ def ytsearch():
else:
prev_page = "/ytsearch?q={q}&s={s}&p={p}".format(q=query, s=sort, p=int(page) - 1)
for video in results['videos']:
hostname = urllib.parse.urlparse(video['videoThumb']).netloc
video['videoThumb'] = video['videoThumb'].replace("https://{}".format(hostname), "") + "&host=" + hostname
for channel in results['channels']:
if config['nginxVideoStream']:
channel['thumbnail'] = channel['thumbnail'].replace("~", "/")
@ -342,8 +345,6 @@ def ytsearch():
@app.route('/ytfollow/<channelId>', methods=['POST'])
@login_required
def ytfollow(channelId):
form = EmptyForm()
if form.validate_on_submit():
r = followYoutubeChannel(channelId)
return redirect(request.referrer)
@ -376,8 +377,6 @@ def followYoutubeChannel(channelId):
@app.route('/ytunfollow/<channelId>', methods=['POST'])
@login_required
def ytunfollow(channelId):
form = EmptyForm()
if form.validate_on_submit():
unfollowYoutubeChannel(channelId)
return redirect(request.referrer)
@ -404,27 +403,38 @@ def unfollowYoutubeChannel(channelId):
def channel(id):
form = ChannelForm()
button_form = EmptyForm()
data = requests.get('https://www.youtube.com/feeds/videos.xml?channel_id={id}'.format(id=id))
data = feedparser.parse(data.content)
channelData = YoutubeSearch.channelInfo(id)
page = request.args.get('p', None)
sort = request.args.get('s', None)
if page is None:
page = 1
if sort is None:
sort = 3
for video in channelData[1]:
data = ytch.get_channel_tab_info(id, page, sort)
for video in data['items']:
if config['nginxVideoStream']:
hostName = urllib.parse.urlparse(video['videoThumb']).netloc
video['videoThumb'] = video['videoThumb'].replace("https://{}".format(hostName), "").replace("hqdefault",
hostName = urllib.parse.urlparse(video['thumbnail'][1:]).netloc
video['thumbnail'] = video['thumbnail'].replace("https://{}".format(hostName), "")[1:].replace("hqdefault",
"mqdefault") + "&host=" + hostName
else:
video['videoThumb'] = video['videoThumb'].replace('/', '~')
if config['nginxVideoStream']:
hostName = urllib.parse.urlparse(channelData[0]['avatar']).netloc
channelData[0]['avatar'] = channelData[0]['avatar'].replace("https://{}".format(hostName),
"") + "?host=" + hostName
else:
channelData[0]['avatar'] = channelData[0]['avatar'].replace('/', '~')
video['thumbnail'] = video['thumbnail'].replace('/', '~')
return render_template('channel.html', form=form, btform=button_form, channel=channelData[0], videos=channelData[1],
restricted=config['restrictPublicUsage'], config=config)
if config['nginxVideoStream']:
hostName = urllib.parse.urlparse(data['avatar'][1:]).netloc
data['avatar'] = data['avatar'].replace("https://{}".format(hostName), "")[1:] + "?host=" + hostName
else:
data['avatar'] = data['avatar'].replace('/', '~')
next_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=int(page) + 1)
if int(page) == 1:
prev_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=1)
else:
prev_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=int(page) - 1)
return render_template('channel.html', form=form, btform=button_form, data=data,
restricted=config['restrictPublicUsage'], config=config, next_page=next_page, prev_page=prev_page)
def get_best_urls(urls):
@ -454,27 +464,48 @@ def get_live_urls(urls):
def watch():
id = request.args.get('v', None)
info = ytwatch.extract_info(id, False, playlist_id=None, index=None)
# Use nginx
best_formats = ["22", "18", "34", "35", "36", "37", "38", "43", "44", "45", "46"]
if info == 'Captcha':
return render_template('captcha.html', origin=request.referrer)
retry = 3
while retry != 0 and info['playability_error'] == 'Could not find player':
info=ytwatch.extract_info(id, False, playlist_id=None, index=None)
retry -= 1
vsources = ytwatch.get_video_sources(info, False)
# Retry 3 times if no sources are available.
retry = 3
while retry != 0 and len(vsources) == 0:
vsources = ytwatch.get_video_sources(info, False)
retry -= 1
for source in vsources:
hostName = urllib.parse.urlparse(source['src']).netloc
source['src'] = source['src'].replace("https://{}".format(hostName), "") + "&host=" + hostName
# Parse video formats
for v_format in info['formats']:
hostName = urllib.parse.urlparse(v_format['url']).netloc
v_format['url'] = v_format['url'].replace("https://{}".format(hostName), "") + "&host=" + hostName
if v_format['audio_bitrate'] is not None and v_format['vcodec'] is not None:
v_format['video_valid'] = True
elif v_format['audio_bitrate'] is not None and v_format['vcodec'] is None:
if v_format['audio_bitrate'] is not None and v_format['vcodec'] is None:
v_format['audio_valid'] = True
# Markup description
try:
info['description'] = Markup(bleach.linkify(info['description'].replace("\n", "<br>")))
except AttributeError or TypeError:
print(info['description'])
# Get comments
videocomments = comments.video_comments(id, sort=0, offset=0, lc='', secret_key='')
videocomments = utils.post_process_comments_info(videocomments)
if videocomments is not None:
videocomments.sort(key=lambda x: x['likes'], reverse=True)
info['rating'] = str((info['like_count']/(info['like_count']+info['dislike_count']))*100)[0:4]
return render_template("video.html", info=info, title='{}'.format(info['title']), config=config, videocomments=videocomments)
# Calculate rating %
info['rating'] = str((info['like_count'] / (info['like_count'] + info['dislike_count'])) * 100)[0:4]
return render_template("video.html", info=info, title='{}'.format(info['title']), config=config,
videocomments=videocomments, vsources=vsources)
def markupString(string):
@ -723,20 +754,17 @@ def register():
return render_template('register.html', title='Register', registrations=REGISTRATIONS, form=form, config=config)
@app.route('/registrations_status/icon')
def registrations_status_icon():
@app.route('/status')
def status():
count = db.session.query(User).count()
if count >= config['maxInstanceUsers'] or config['maxInstanceUsers'] == 0:
return redirect(url_for('static', filename='img/close.png'))
filen = url_for('static', filename='img/close.png')
caniregister = False
else:
return redirect(url_for('static', filename='img/open.png'))
@app.route('/registrations_status/text')
def registrations_status_text():
count = db.session.query(User).count()
return "{c}/{t}".format(c=count, t=config['maxInstanceUsers'])
filen = url_for('static', filename='img/open.png')
caniregister = True
return render_template('status.html', title='STATUS', count=count, max=config['maxInstanceUsers'], file=filen, cani=caniregister)
@app.route('/error/<errno>')
def error(errno):

View File

@ -1,46 +1,35 @@
<div class="card">
<div class="image">
{%if config.nginxVideoStream%}
<img alt="Thumbnail" src="{{video.videoThumb}}">
{%else%}
<img alt="Thumbnail" src="/img/{{video.videoThumb.replace('/', '~')}}">
{%endif%}
</div>
<div class="ui card">
<a class="image" href="{{url_for('watch', v=video.id, _method='GET')}}">
<img src="https://yotter.xyz{{video.videoThumb}}">
</a>
<div class="content">
{% if video.views == "Livestream" %}
<a class="video-title break-word" href="#">{{video.videoTitle}}</a>
{% else %}
<a class="video-title break-word" href="{{url_for('watch', v=video.id, _method='GET')}}">{{video.videoTitle}}</a>
{% endif %}
<a class="header" href="{{url_for('watch', v=video.id, _method='GET')}}">{{video.videoTitle}}</a>
<div class="meta">
<a class="break-word" href="{{url_for('channel', id=video.channelId)}}">{{video.channelName}}</a>
</div>
<div class="description break-word">
{{video.description}}
</div>
</div>
<div class="extra content">
{% if video.isLive == "Livestream" or video.isLive %}
<span class="right floated">
<span class="left floated like">
<i class="red circle icon"></i>
{{video.views}}
{{video.views}}
</span>
{% else %}
<span class="right floated">
<span class="left floated like">
<i class="eye icon"></i>
{{video.views}}
{{video.views}}
</span>
{% endif %}
{% if video.timeStamp == "Scheduled" or video.isUpcoming %}
<span class="right floated">
<span class="right floated star">
<i class="blue clock icon"></i>
{{video.timeStamp}}
{{video.timeStamp}}
</span>
{% else %}
<span class="right floated">
<span class="right floated star">
<i class="clock icon"></i>
{{video.timeStamp}}
{{video.timeStamp}}
</span>
{% endif %}
<span>

View File

@ -0,0 +1,17 @@
{% extends "base.html" %}
{% block content %}
<div class="ui text container center aligned centered">
<div class="ui icon negative message">
<i class="meh outline icon"></i>
<div class="content">
<div class="header">
Ahh... Here we go again!
</div>
<p>Google is asking to solve a Captcha. As we don't want you to do it, we'll do it for you. <b> Please, try again in a few seconds.</b></p>
<a href="{{origin}}"> Click here to reload </a>
</div>
</div>
</div>
{%endblock%}

View File

@ -1,58 +1,94 @@
{% extends "base.html" %}
{% block content %}
<div class="blue ui centered card">
<div class="content">
<div class="center aligned author">
{%if config.nginxVideoStream%}
<img alt="Thumbnail" src="{{channel.avatar}}">
{%else%}
<img alt="Thumbnail" src="/img/{{channel.avatar.replace('/', '~')}}">
{%endif%}
<div class="ui center aligned text container">
<div class="ui centered vertical segment">
<h2 class="ui header">
<img src="{{data.avatar}}" class="ui circular image">
{{data.channel_name}}
</h2>
</div>
<div class="center aligned header"><a href="">{{channel.name}}</a></div>
<div class="center aligned description">
<div class="statistic">
<div class="ui vertical segment">
<p>{{data.short_description}}</p>
</div>
<div class="ui vertical segment">
<div class="ui tiny statistic">
<div class="value">
<i class="users icon"></i>{{channel.subCount}}
{%if data.approx_suscriber_count == None%}
<i class="user icon"></i> ?
{%else%}
<i class="user icon"></i> {{data.approx_subscriber_count}}
{%endif%}
</div>
<div class="label">
Followers
</div>
</div>
</div>
</div>
{% if restricted or current_user.is_authenticated %}
<div class="center aligned extra content">
{% if not current_user.is_following_yt(channel.id) %}
<p>
<form action="{{ url_for('ytfollow', channelId=channel.id) }}" method="post">
{{ btform.hidden_tag() }}
{{ btform.submit(value='Follow') }}
{% if restricted or current_user.is_authenticated %}
{% if not current_user.is_following_yt(data.channel_id) %}
<form action="{{ url_for('ytfollow', channelId=data.channel_id) }}" method="post">
<button type="submit" value="Submit" class="ui red button">
<i class="user icon"></i>
Suscribe
</button>
</form>
</p>
{% else %}
<p>
<form action="{{ url_for('ytunfollow', channelId=channel.id) }}" method="post">
{{ btform.hidden_tag() }}
{{ btform.submit(value='Unfollow') }}
<form action="{{ url_for('ytunfollow', channelId=data.channel_id) }}" method="post">
<button type="submit" value="Submit" class="ui red active button">
<i class="user icon"></i>
Unsuscribe
</button>
</form>
</p>
{% endif %}
</div>
{% endif %}
</div>
{%endif%}
{%endif%}
</div>
</div>
<br>
<br>
{% if not videos %}
{% if data['error'] != None %}
{% include '_empty_feed.html' %}
{% else %}
<div class="ui centered cards">
{% for video in videos %}
{% include '_video_item.html' %}
{% for video in data['items'] %}
<div class="ui card">
<a class="image" href="{{url_for('watch', v=video.id, _method='GET')}}">
<img src="https://yotter.xyz{{video.thumbnail}}">
</a>
<div class="content">
<a class="header" href="{{url_for('watch', v=video.id, _method='GET')}}">{{video.title}}</a>
<div class="meta">
<a class="break-word" href="{{url_for('channel', id=video.channel_id)}}">{{data.channel_name}}</a>
</div>
</div>
<div class="extra content">
<span class="left floated like">
<i class="eye icon"></i>
{{video.approx_view_count}}
</span>
{%if video.duration == "PREMIERING NOW" or video.duration == "LIVE"%}
<span class="right floated star">
<i class="red circle icon"></i>
LIVE
</span>
{%else%}
<span class="right floated star">
<i class="clock icon"></i>
{{video.time_published}}
</span>
{%endif%}
</div>
</div>
{% endfor %}
</div>
{% endif %}
<br>
<div class="ui center aligned text container">
<a href="{{prev_page}}"> <button class="ui left attached button"><i class="angle red left icon"></i></button> </a>
<a href="{{next_page}}"> <button class="right attached ui button"><i class="angle red right icon"></i></button></a>
</div>
<br>
{% endblock %}

46
app/templates/status.html Normal file
View File

@ -0,0 +1,46 @@
{% extends "base.html" %}
{% block content %}
<div class="ui text container center aligned centered">
<div class="ui placeholder segment">
<div class="ui two column stackable center aligned grid">
<div class="ui vertical divider">
{%if cani%}
:)
{%else%}
:(
{%endif%}
</div>
<div class="middle aligned row">
<div class="column">
<h3 class="ui header"> Capacity </h3>
<div class="ui icon header">
{%if cani%}
<i class="green users icon"></i>
{%else%}
<i class="red users icon"></i>
{%endif%}
{{count}}/{{max}}
</div>
</div>
<div class="column">
<div class="ui icon header">
<i class="user circle outline icon"></i>
Can I register?
</div>
{%if cani%}
<a href="/register"><div class="ui green button">
Yes!
</div></a>
{%else%}
<a href="#!"><div class="ui disabled red button">
It's full!
</div></a>
{%endif%}
</div>
</div>
</div>
</div>
</div>
{%endblock%}

View File

@ -34,20 +34,18 @@
</div>
{%else%}
<div class="video-js-responsive-container vjs-hd">
<video class="video-js vjs-default-skin"
<video-js autofocus class="video-js vjs-default-skin"
data-setup='{ "playbackRates": [0.5, 0.75, 1, 1.25,1.5, 1.75, 2] }'
width="1080"
controls
buffered
preload="none">
{% if config.nginxVideoStream %}
{% for format in info.formats %}
{% if format.video_valid %}
<source src="{{format.url}}" type="video/{{format.ext}}">
{% endif %}
{% for source in vsources %}
<source src="{{source.src}}" type="{{source.type}}">
{% endfor %}
{% endif %}
</video>
</video-js>
</div>
{%endif%}
@ -99,7 +97,6 @@
<script src="{{ url_for('static',filename='video.min.js') }}"></script>
{% if info.live %}
<p>Active</p>
<script src="{{ url_for('static',filename='videojs-http-streaming.min.js')}}"></script>
<script>
var player = videojs('live');

View File

@ -2,9 +2,11 @@ alembic==1.4.3
astroid==2.4.2
async-timeout==3.0.1
attrs==20.2.0
beautifulsoup4==4.9.3
beautifulsoup4==4.9.2
bleach==3.2.1
Brotli==1.0.9
bs4==0.0.1
cachetools==4.1.1
certifi==2020.6.20
chardet==3.0.4
click==7.1.2
@ -38,6 +40,8 @@ packaging==20.4
pylint==2.6.0
PyMySQL==0.10.1
pyparsing==2.4.7
PySocks==1.7.1
python-anticaptcha==0.7.1
python-dateutil==2.8.1
python-dotenv==0.14.0
python-editor==1.0.4

75
tw_data/feed.py Normal file
View File

@ -0,0 +1,75 @@
from requests_futures.sessions import FuturesSession
from werkzeug.datastructures import Headers
from flask import Markup
from concurrent.futures import as_completed
from numerize import numerize
from bs4 import BeautifulSoup
from re import findall
import time, datetime
import requests
import bleach
import urllib
import json
import re
NITTERINSTANCE = "https://nitter.net/"
def get_feed(usernames, maxOld):
'''
Returns feed tweets given a set of usernames
'''
feedTweets = []
with FuturesSession() as session:
futures = [session.get('{instance}{user}'.format(instance=NITTERINSTANCE, user=u)) for u in usernames]
for future in as_completed(futures):
res = future.result().content.decode('utf-8')
html = BeautifulSoup(res, "html.parser")
userFeed = html.find_all('div', attrs={'class':'timeline-item'})
if userFeed != []:
for post in userFeed[:-1]:
tweet = {}
date_time_str = post.find('span', attrs={'class':'tweet-date'}).find('a')['title'].replace(",","")
time = datetime.datetime.now() - datetime.datetime.strptime(date_time_str, '%d/%m/%Y %H:%M:%S')
if time.days >= maxOld:
continue
if post.find('div', attrs={'class':'pinned'}):
if post.find('div', attrs={'class':'pinned'}).find('span', attrs={'icon-pin'}):
continue
tweet['originalPoster'] = post.find('a', attrs={'class':'username'}).text
tweet['twitterName'] = post.find('a', attrs={'class':'fullname'}).text
tweet['timeStamp'] = datetime.datetime.strptime(date_time_str, '%d/%m/%Y %H:%M:%S')
tweet['date'] = post.find('span', attrs={'class':'tweet-date'}).find('a').text
tweet['content'] = Markup(post.find('div', attrs={'class':'tweet-content'}))
if post.find('div', attrs={'class':'retweet-header'}):
tweet['username'] = post.find('div', attrs={'class':'retweet-header'}).find('div', attrs={'class':'icon-container'}).text
tweet['isRT'] = True
else:
tweet['username'] = tweet['originalPoster']
tweet['isRT'] = False
tweet['profilePic'] = NITTERINSTANCE+post.find('a', attrs={'class':'tweet-avatar'}).find('img')['src'][1:]
url = NITTERINSTANCE + post.find('a', attrs={'class':'tweet-link'})['href'][1:]
if post.find('div', attrs={'class':'quote'}):
tweet['isReply'] = True
tweet['quote'] = post.find('div', attrs={'class':'quote'})
if tweet['quote'].find('div', attrs={'class':'quote-text'}):
tweet['replyingTweetContent'] = Markup(tweet['quote'].find('div', attrs={'class':'quote-text'}))
if tweet['quote'].find('a', attrs={'class':'still-image'}):
tweet['replyAttachedImg'] = NITTERINSTANCE+tweet['quote'].find('a', attrs={'class':'still-image'})['href'][1:]
if tweet['quote'].find('div', attrs={'class':'unavailable-quote'}):
tweet['replyingUser']="Unavailable"
else:
tweet['replyingUser']=tweet['quote'].find('a', attrs={'class':'username'}).text
post.find('div', attrs={'class':'quote'}).decompose()
if post.find('div', attrs={'class':'attachments'}):
if not post.find(class_='quote'):
if post.find('div', attrs={'class':'attachments'}).find('a', attrs={'class':'still-image'}):
attachedImg = NITTERINSTANCE + post.find('div', attrs={'class':'attachments'}).find('a')['href'][1:]
feedTweets.append(tweet)
return feedTweets

116
tw_data/user.py Normal file
View File

@ -0,0 +1,116 @@
from flask import Markup
from requests_futures.sessions import FuturesSession
from werkzeug.datastructures import Headers
from concurrent.futures import as_completed
from numerize import numerize
from bs4 import BeautifulSoup
from re import findall
import time, datetime
import requests
import bleach
import urllib
import json
import re
##########################
#### Config variables ####
##########################
NITTERINSTANCE = 'https://nitter.net/'
def get_uer_info(username):
response = urllib.request.urlopen('{instance}{user}'.format(instance=NITTERINSTANCE, user=username)).read()
#rssFeed = feedparser.parse(response.content)
html = BeautifulSoup(str(response), "lxml")
if html.body.find('div', attrs={'class':'error-panel'}):
return False
else:
html = html.body.find('div', attrs={'class':'profile-card'})
if html.find('a', attrs={'class':'profile-card-fullname'}):
fullName = html.find('a', attrs={'class':'profile-card-fullname'}).getText().encode('latin1').decode('unicode_escape').encode('latin1').decode('utf8')
else:
fullName = None
if html.find('div', attrs={'class':'profile-bio'}):
profileBio = html.find('div', attrs={'class':'profile-bio'}).getText().encode('latin1').decode('unicode_escape').encode('latin1').decode('utf8')
else:
profileBio = None
user = {
"profileFullName":fullName,
"profileUsername":html.find('a', attrs={'class':'profile-card-username'}).string.encode('latin_1').decode('unicode_escape').encode('latin_1').decode('utf8'),
"profileBio":profileBio,
"tweets":html.find_all('span', attrs={'class':'profile-stat-num'})[0].string,
"following":html.find_all('span', attrs={'class':'profile-stat-num'})[1].string,
"followers":numerize.numerize(int(html.find_all('span', attrs={'class':'profile-stat-num'})[2].string.replace(",",""))),
"likes":html.find_all('span', attrs={'class':'profile-stat-num'})[3].string,
"profilePic":"{instance}{pic}".format(instance=NITTERINSTANCE, pic=html.find('a', attrs={'class':'profile-card-avatar'})['href'][1:])
}
return user
def get_tweets(user, page=1):
feed = urllib.request.urlopen('{instance}{user}'.format(instance=NITTERINSTANCE, user=user)).read()
#Gather feedPosts
res = feed.decode('utf-8')
html = BeautifulSoup(res, "html.parser")
feedPosts = get_feed_tweets(html)
if page == 2:
nextPage = html.find('div', attrs={'class':'show-more'}).find('a')['href']
print('{instance}{user}{page}'.format(instance=NITTERINSTANCE, user=user, page=nextPage))
feed = urllib.request.urlopen('{instance}{user}{page}'.format(instance=NITTERINSTANCE, user=user, page=nextPage)).read()
res = feed.decode('utf-8')
html = BeautifulSoup(res, "html.parser")
feedPosts = get_feed_tweets(html)
return feedPosts
def get_feed_tweets(html):
feedPosts = []
userFeed = html.find_all('div', attrs={'class':'timeline-item'})
if userFeed != []:
for post in userFeed[:-1]:
if 'show-more' in str(post):
continue
date_time_str = post.find('span', attrs={'class':'tweet-date'}).find('a')['title'].replace(",","")
if post.find('div', attrs={'class':'pinned'}):
if post.find('div', attrs={'class':'pinned'}).find('span', attrs={'icon-pin'}):
continue
tweet = {}
tweet['op'] = post.find('a', attrs={'class':'username'}).text
tweet['twitterName'] = post.find('a', attrs={'class':'fullname'}).text
tweet['timeStamp'] = str(datetime.datetime.strptime(date_time_str, '%d/%m/%Y %H:%M:%S'))
tweet['date'] = post.find('span', attrs={'class':'tweet-date'}).find('a').text
tweet['content'] = Markup(post.find('div', attrs={'class':'tweet-content'}).decode_contents())
if post.find('div', attrs={'class':'retweet-header'}):
tweet['username'] = post.find('div', attrs={'class':'retweet-header'}).find('div', attrs={'class':'icon-container'}).text
tweet['isRT'] = True
else:
tweet['username'] = tweet['op']
tweet['isRT'] = False
tweet['profilePic'] = NITTERINSTANCE+post.find('a', attrs={'class':'tweet-avatar'}).find('img')['src'][1:]
tweet['url'] = NITTERINSTANCE + post.find('a', attrs={'class':'tweet-link'})['href'][1:]
if post.find('div', attrs={'class':'quote'}):
tweet['isReply'] = True
quote = post.find('div', attrs={'class':'quote'})
if quote.find('div', attrs={'class':'quote-text'}):
tweet['replyingTweetContent'] = Markup(quote.find('div', attrs={'class':'quote-text'}))
if quote.find('a', attrs={'class':'still-image'}):
tweet['replyAttachedImg'] = NITTERINSTANCE+quote.find('a', attrs={'class':'still-image'})['href'][1:]
tweet['replyingUser']=quote.find('a', attrs={'class':'username'}).text
post.find('div', attrs={'class':'quote'}).decompose()
if post.find('div', attrs={'class':'attachments'}):
if not post.find(class_='quote'):
if post.find('div', attrs={'class':'attachments'}).find('a', attrs={'class':'still-image'}):
tweet['attachedImg'] = NITTERINSTANCE + post.find('div', attrs={'class':'attachments'}).find('a')['href'][1:]
feedPosts.append(tweet)
else:
return {"emptyFeed": True}
return feedPosts

View File

@ -11,5 +11,6 @@
"admin_message":"Message from the admin text",
"admin_user":"admin_username",
"max_old_user_days": 60,
"donate_url": ""
"donate_url": "",
"anticaptcha":""
}

View File

@ -1,20 +1,16 @@
import base64
from youtube import util, yt_data_extract, local_playlist, subscriptions
from youtube import yt_app
import urllib
import json
from string import Template
import youtube.proto as proto
import html
import math
import gevent
import re
import cachetools.func
import traceback
import urllib
import cachetools.func
import flask
from flask import request
import gevent
import youtube.proto as proto
from youtube import util, yt_data_extract
headers_desktop = (
('Accept', '*/*'),
@ -109,7 +105,7 @@ def channel_ctoken_v1(channel_id, page, sort, tab, view=1):
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, print_status=True):
def get_channel_tab_info(channel_id, page="1", sort=3, tab='videos', view=1, print_status=True):
message = 'Got channel tab' if print_status else None
if int(sort) == 2 and int(page) > 1:
@ -128,7 +124,11 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, print_st
headers_desktop + generic_cookie,
debug_name='channel_tab', report_text=message)
return content
info = yt_data_extract.extract_channel_info(json.loads(content), tab)
if info['error'] is not None:
return False
post_process_channel_info(info)
return info
# cache entries expire after 30 minutes
@cachetools.func.ttl_cache(maxsize=128, ttl=30*60)
@ -259,23 +259,4 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
**info
)
@yt_app.route('/channel/<channel_id>/')
@yt_app.route('/channel/<channel_id>/<tab>')
def get_channel_page(channel_id, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
@yt_app.route('/user/<username>/')
@yt_app.route('/user/<username>/<tab>')
def get_user_page(username, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
@yt_app.route('/c/<custom>/')
@yt_app.route('/c/<custom>/<tab>')
def get_custom_c_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
@yt_app.route('/<custom>')
@yt_app.route('/<custom>/<tab>')
def get_toplevel_custom_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)

213
youtube/channels.py Normal file
View File

@ -0,0 +1,213 @@
from youtube import proto
from flask import Markup as mk
import requests
import base64
import json
import re
# From: https://github.com/user234683/youtube-local/blob/master/youtube/channel.py
# SORT:
# videos:
# Popular - 1
# Oldest - 2
# Newest - 3
# playlists:
# Oldest - 2
# Newest - 3
# Last video added - 4
# view:
# grid: 0 or 1
# list: 2
headers = {
'Host': 'www.youtube.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418',
}
real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),)
generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
def channel_ctoken_desktop(channel_id, page, sort, tab, view=1):
# see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646
# page > 1 doesn't work when sorting by oldest
offset = 30*(int(page) - 1)
schema_number = {
3: 6307666885028338688,
2: 17254859483345278706,
1: 16570086088270825023,
}[int(sort)]
page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1,
proto.uint(1, schema_number) + proto.string(2,
proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
)
)))
tab = proto.string(2, tab )
sort = proto.uint(3, int(sort))
#page = proto.string(15, str(page) )
shelf_view = proto.uint(4, 0)
view = proto.uint(6, int(view))
continuation_info = proto.string(3,
proto.percent_b64encode(tab + sort + shelf_view + view + page_token)
)
channel_id = proto.string(2, channel_id )
pointless_nest = proto.string(80226972, channel_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def channel_ctoken_mobile(channel_id, page, sort, tab, view=1):
tab = proto.string(2, tab )
sort = proto.uint(3, int(sort))
page = proto.string(15, str(page) )
# example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos
shelf_view = proto.uint(4, 0)
view = proto.uint(6, int(view))
continuation_info = proto.string( 3, proto.percent_b64encode(tab + view + sort + shelf_view + page) )
channel_id = proto.string(2, channel_id )
pointless_nest = proto.string(80226972, channel_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def id_or_username(string):
cidRegex = "^UC.{22}$"
if re.match(cidRegex, string):
return "channel"
else:
return "user"
def get_channel_videos_tab(content):
tabs = content['contents']['twoColumnBrowseResultsRenderer']['tabs']
for tab in tabs:
if tab['title'] != "Videos":
continue
else:
return tab
def get_video_items_from_tab(tab):
items = []
for item in tab:
try:
if item['gridVideoRenderer']:
items.append(item)
else:
continue
except KeyError:
continue
return items
def get_info_grid_video_item(item, channel=None):
item = item['gridVideoRenderer']
thumbnailOverlays = item['thumbnailOverlays']
published = ""
views = ""
isLive = False
isUpcoming = False
try:
if 'UPCOMING' in str(thumbnailOverlays):
start_time = item['upcomingEventData']['startTime']
isUpcoming = True
views = "-"
published = "Scheduled"
except KeyError:
isUpcoming = False
try:
if 'LIVE' in str(thumbnailOverlays):
isLive = True
try:
views = item['viewCountText']['simpleText']
except:
views = "Live"
try:
duration = item['lengthText']['simpleText']
except:
duration = "-"
if published != "Scheduled":
try:
published = item['publishedTimeText']['simpleText']
except KeyError:
published = "None"
except KeyError:
isUpcoming = False
isLive = False
if not isUpcoming and not isLive:
views = item['viewCountText']['simpleText']
published = item['publishedTimeText']['simpleText']
try:
duration = item['lengthText']['simpleText']
except:
duration = "?"
video = {
'videoTitle':item['title']['runs'][0]['text'],
'description':"",
'views':views,
'timeStamp':published,
'duration':duration,
'channelName':channel['username'],
'authorUrl':"/channel/{}".format(channel['channelId']),
'channelId':channel['channelId'],
'id':item['videoId'],
'videoUrl':"/watch?v={}".format(item['videoId']),
'isLive':isLive,
'isUpcoming':isUpcoming,
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
}
return video
def get_author_info_from_channel(content):
hmd = content['metadata']['channelMetadataRenderer']
cmd = content['header']['c4TabbedHeaderRenderer']
description = mk(hmd['description'])
channel = {
"channelId": cmd['channelId'],
"username": cmd['title'],
"thumbnail": "https:{}".format(cmd['avatar']['thumbnails'][0]['url'].replace("/", "~")),
"description":description,
"suscribers": cmd['subscriberCountText']['runs'][0]['text'].split(" ")[0],
"banner": cmd['banner']['thumbnails'][0]['url']
}
return channel
def get_channel_info(channelId, videos=True, page=1, sort=3):
if id_or_username(channelId) == "channel":
videos = []
ciUrl = "https://www.youtube.com/channel/{}".format(channelId)
mainUrl = "https://www.youtube.com/browse_ajax?ctoken={}".format(channel_ctoken_desktop(channelId, page, sort, "videos"))
content = json.loads(requests.get(mainUrl, headers=headers).text)
req = requests.get(ciUrl, headers=headers).text
start = (
req.index('window["ytInitialData"]')
+ len('window["ytInitialData"]')
+ 3
)
end = req.index("};", start) + 1
jsonIni = req[start:end]
data = json.loads(jsonIni)
#videosTab = get_channel_videos_tab(content)
authorInfo = get_author_info_from_channel(data)
if videos:
gridVideoItemList = get_video_items_from_tab(content[1]['response']['continuationContents']['gridContinuation']['items'])
for video in gridVideoItemList:
vid = get_info_grid_video_item(video, authorInfo)
videos.append(vid)
print({"channel":authorInfo, "videos":videos})
return {"channel":authorInfo, "videos":videos}
else:
return {"channel":authorInfo}
else:
baseUrl = "https://www.youtube.com/user/{}".format(channelId)

View File

@ -1,38 +1,10 @@
from youtube import proto
from youtube import utils
from flask import Markup
import urllib.parse
import requests
import base64
import json
import urllib
import flask
from flask import request
from werkzeug.exceptions import abort
from youtube import util, yt_data_extract, proto
from youtube import yt_app
# Sort: 1
# Upload date: 2
# View count: 3
# Rating: 1
# Relevance: 0
# Offset: 9
# Filters: 2
# Upload date: 1
# Type: 2
# Duration: 3
features = {
'4k': 14,
'hd': 4,
'hdr': 25,
'subtitles': 5,
'creative_commons': 6,
'3d': 7,
'live': 8,
'purchased': 9,
'360': 15,
'location': 23,
}
def page_number_to_sp_parameter(page, autocorrect, sort, filters):
offset = (int(page) - 1)*20 # 20 results per page
@ -41,8 +13,8 @@ def page_number_to_sp_parameter(page, autocorrect, sort, filters):
result = proto.uint(1, sort) + filters_enc + autocorrect + proto.uint(9, offset) + proto.string(61, b'')
return base64.urlsafe_b64encode(result).decode('ascii')
def get_search_json(query, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(query)
def search_by_terms(search_terms, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(search_terms)
headers = {
'Host': 'www.youtube.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
@ -52,54 +24,145 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
content = requests.get(url, headers=headers).text
info = json.loads(content)
return info
videos = get_videos_from_search(info)
channels = get_channels_from_search(info)
results = {
"videos": videos,
"channels": channels
}
return results
@yt_app.route('/search')
def get_search_page():
if len(request.args) == 0:
return flask.render_template('base.html', title="Search")
def get_channels_from_search(search):
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
contents = primaryContents['sectionListRenderer']['contents']
if 'query' not in request.args:
abort(400)
for content in contents:
try:
items = content['itemSectionRenderer']['contents']
except:
continue
query = request.args.get("query")
page = request.args.get("page", "1")
autocorrect = int(request.args.get("autocorrect", "1"))
sort = int(request.args.get("sort", "0"))
filters = {}
filters['time'] = int(request.args.get("time", "0"))
filters['type'] = int(request.args.get("type", "0"))
filters['duration'] = int(request.args.get("duration", "0"))
polymer_json = get_search_json(query, page, autocorrect, sort, filters)
for item in items:
try:
item['channelRenderer']
channel = get_channel_renderer_item_info(item['channelRenderer'])
results.append(channel)
except KeyError:
continue
return results
search_info = yt_data_extract.extract_search_info(polymer_json)
if search_info['error']:
return flask.render_template('error.html', error_message = search_info['error'])
def get_channel_renderer_item_info(item):
try:
suscribers = item['subscriberCountText']['simpleText'].split(" ")[0]
except:
suscribers = "?"
for extract_item_info in search_info['items']:
util.prefix_urls(extract_item_info)
util.add_extra_html_info(extract_item_info)
try:
description = utils.get_description_snippet_text(item['descriptionSnippet']['runs'])
except KeyError:
description = ""
corrections = search_info['corrections']
if corrections['type'] == 'did_you_mean':
corrected_query_string = request.args.to_dict(flat=False)
corrected_query_string['query'] = [corrections['corrected_query']]
corrections['corrected_query_url'] = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
elif corrections['type'] == 'showing_results_for':
no_autocorrect_query_string = request.args.to_dict(flat=False)
no_autocorrect_query_string['autocorrect'] = ['0']
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
corrections['original_query_url'] = no_autocorrect_query_url
try:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"thumbnail": "https:{}".format(item['thumbnail']['thumbnails'][0]['url'].replace("/", "~")),
"description": Markup(str(description)),
"suscribers": suscribers,
"videos": item['videoCountText']['runs'][0]['text']
}
except KeyError:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"avatar": item['thumbnail']['thumbnails'][0]['url'],
"suscribers": suscribers
}
return channel
def get_videos_from_search(search):
latest = []
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
contents = primaryContents['sectionListRenderer']['contents']
for content in contents:
try:
items = content['itemSectionRenderer']['contents']
except:
continue
for item in items:
try:
item['videoRenderer']
video = get_video_renderer_item_info(item['videoRenderer'])
results.append(video)
except KeyError:
continue
# Sometimes Youtube will return an empty query. Try again.
return results
def get_video_renderer_item_info(item):
published = ""
views = ""
isLive = False
isUpcoming = False
thumbnailOverlays = item['thumbnailOverlays']
try:
if 'UPCOMING' in str(thumbnailOverlays):
start_time = item['upcomingEventData']['startTime']
isUpcoming = True
views = "-"
published = "Scheduled"
except KeyError:
isUpcoming = False
try:
if 'LIVE' in str(thumbnailOverlays):
isLive = True
try:
views = item['viewCountText']['simpleText']
except:
views = "Live"
try:
duration = item['lengthText']['simpleText']
except:
duration = "-"
if published != "Scheduled":
try:
published = item['publishedTimeText']['simpleText']
except KeyError:
published = "None"
except:
isUpcoming = False
isLive = False
if not isUpcoming and not isLive:
views = item['viewCountText']['simpleText']
published = item['publishedTimeText']['simpleText']
duration = item['lengthText']['simpleText']
video = {
'videoTitle':item['title']['runs'][0]['text'],
'description':Markup(str(utils.get_description_snippet_text(item['descriptionSnippet']['runs']))),
'views':views,
'timeStamp':published,
'duration':duration,
'channelName':item['ownerText']['runs'][0]['text'],
'authorUrl':"/channel/{}".format(item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId']),
'channelId':item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'],
'id':item['videoId'],
'videoUrl':"/watch?v={}".format(item['videoId']),
'isLive':isLive,
'isUpcoming':isUpcoming,
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
}
return video
return flask.render_template('search.html',
header_playlist_names = local_playlist.get_playlist_names(),
query = query,
estimated_results = search_info['estimated_results'],
estimated_pages = search_info['estimated_pages'],
corrections = search_info['corrections'],
results = search_info['items'],
parameters_dictionary = request.args,
)

View File

@ -1,9 +1,13 @@
import gzip
import requests
from bs4 import BeautifulSoup
from youtube import yt_data_extract
try:
import brotli
have_brotli = True
except ImportError:
have_brotli = False
@ -15,7 +19,7 @@ import json
import gevent
import gevent.queue
import gevent.lock
from python_anticaptcha import AnticaptchaClient, NoCaptchaTaskProxylessTask
# The trouble with the requests library: It ships its own certificate bundle via certifi
# instead of using the system certificate store, meaning self-signed certificates
# configured by the user will not work. Some draconian networks block TLS unless a corporate
@ -51,13 +55,12 @@ import urllib3.contrib.socks
URL_ORIGIN = "/https://www.youtube.com"
connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED')
connection_pool = urllib3.PoolManager(cert_reqs='CERT_REQUIRED')
def get_pool(use_tor):
return connection_pool
class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
'''Separate cookiejars for receiving and sending'''
def __init__(self, cookiejar_send=None, cookiejar_receive=None):
self.cookiejar_send = cookiejar_send
self.cookiejar_receive = cookiejar_receive
@ -75,6 +78,7 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
https_request = http_request
https_response = http_response
class FetchError(Exception):
def __init__(self, code, reason='', ip=None):
Exception.__init__(self, 'HTTP error during request: ' + code + ' ' + reason)
@ -82,6 +86,7 @@ class FetchError(Exception):
self.reason = reason
self.ip = ip
def decode_content(content, encoding_header):
encodings = encoding_header.replace(' ', '').split(',')
for encoding in reversed(encodings):
@ -93,6 +98,68 @@ def decode_content(content, encoding_header):
content = gzip.decompress(content)
return content
def bypass_captcha(session, response, url, cookies):
print("vvv COOKIES DICT vvv")
inputs = {}
html = BeautifulSoup(str(response.text), "lxml")
# If there's a captcha and we need to solve it...
if html.body.find('div', attrs={'class': 'g-recaptcha'}):
# Get the captcha form
form = html.body.find('form', attrs={"action": "/das_captcha"})
# Set up form inputs for request
for _input in form.find_all('input'):
try:
print(_input["name"] + " -> " + _input["value"])
inputs[_input["name"]] = _input["value"]
except KeyError:
continue
print("\n vvv Form inputs created vvv ")
print(inputs)
# Get CAPTCHA keys
site_key = html.body.find('div', attrs={'class': 'g-recaptcha'})['data-sitekey']
s_value = html.body.find('input', attrs={'name': 'session_token'})['value']
# Get anti-captcha API key from config
config = json.load(open('yotter-config.json'))
# Generate anti-captcha request payload
body = {'clientKey': config['anticaptcha']}
task = {'type': "NoCaptchaTaskProxyless",
'websiteURL': url,
'websiteKey': site_key,
'recaptchaDataSValue': s_value}
body['task'] = task
# Create the task.
response = requests.post("https://api.anti-captcha.com/createTask", json=body).json()
task_id = response["taskId"]
print("Task was created: {}. Waiting...".format(task_id))
# Wait until task is completed
body = {"clientKey": config['anticaptcha'], "taskId": task_id}
response = requests.post("https://api.anti-captcha.com/getTaskResult", json=body).json()
ready = response["status"] == "ready"
while not ready:
print(response['status'])
response = requests.post("https://api.anti-captcha.com/getTaskResult", json=body).json()
ready = response["status"] == "ready"
inputs['g-recaptcha-response'] = response['solution']['gRecaptchaResponse']
print(response)
# Print POST request headers
yt_rq = requests.post("https://youtube.com/das_captcha", data=inputs,
headers={"Content-Type": "application/x-www-form-urlencoded",
"Accept-Language": "en-US,en;q=0.5",
"User-Agent":'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0',
"Referer": "https://www.youtube.com/das_captcha",
"Origin": "https://www.youtube.com"}, cookies=session.cookies).headers
print(yt_rq['Cookie'])
def fetch_url_response(url, headers=(), timeout=15, data=None,
cookiejar_send=None, cookiejar_receive=None,
use_tor=True, max_redirects=None):
@ -126,10 +193,9 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
if cookiejar_send is not None or cookiejar_receive is not None: # Use urllib
req = urllib.request.Request(url, data=data, headers=headers)
cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send,
cookiejar_receive=cookiejar_receive)
opener = urllib.request.build_opener(cookie_processor)
response = opener.open(req, timeout=timeout)
cleanup_func = (lambda r: None)
@ -138,18 +204,34 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
# (in connectionpool.py in urllib3)
# According to the documentation for urlopen, a redirect counts as a
# retry. So there are 3 redirects max by default.
session = requests.Session()
print("Starting python GET request to "+url+"...")
response = session.get(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0', "Accept-Language": "en-US,en;q=0.5"})
# Strings that appear when there's a Captcha.
string_de = "Fülle das folgende Feld aus, um YouTube weiter zu nutzen."
string_en = "To continue with your YouTube experience, please fill out the form below."
# If there's a captcha, bypass it.
if string_de in response.text or string_en in response.text:
bypass_captcha(session, response, url, session.cookies)
return "Captcha", "Captcha"
if max_redirects:
retries = urllib3.Retry(3+max_redirects, redirect=max_redirects)
retries = urllib3.Retry(3 + max_redirects, redirect=max_redirects)
else:
retries = urllib3.Retry(3)
pool = get_pool(use_tor)
pool = connection_pool
response = pool.request(method, url, headers=headers,
timeout=timeout, preload_content=False,
decode_content=False, retries=retries)
cleanup_func = (lambda r: r.release_conn())
return response, cleanup_func
def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
cookiejar_send=None, cookiejar_receive=None, use_tor=True,
debug_name=None):
@ -159,11 +241,13 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
url, headers, timeout=timeout,
cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive,
use_tor=use_tor)
response_time = time.time()
print(response)
if response == "Captcha":
return "Captcha"
response_time = time.time()
content = response.read()
read_finish = time.time()
cleanup_func(response) # release_connection for urllib3
if (response.status == 429
@ -178,12 +262,14 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
raise FetchError(str(response.status), reason=response.reason, ip=None)
if report_text:
print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
print(report_text, ' Latency:', round(response_time - start_time, 3), ' Read time:',
round(read_finish - response_time, 3))
content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
return content
def head(url, use_tor=False, report_text=None, max_redirects=10):
pool = get_pool(use_tor)
pool = connection_pool
start_time = time.time()
# default: Retry.DEFAULT = Retry(3)
@ -191,24 +277,21 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
# According to the documentation for urlopen, a redirect counts as a retry
# So there are 3 redirects max by default. Let's change that
# to 10 since googlevideo redirects a lot.
retries = urllib3.Retry(3+max_redirects, redirect=max_redirects,
retries = urllib3.Retry(3 + max_redirects, redirect=max_redirects,
raise_on_redirect=False)
headers = {'User-Agent': 'Python-urllib'}
response = pool.request('HEAD', url, headers=headers, retries=retries)
if report_text:
print(report_text, ' Latency:', round(time.time() - start_time,3))
print(report_text, ' Latency:', round(time.time() - start_time, 3))
return response
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
mobile_ua = (('User-Agent', mobile_user_agent),)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
class RateLimitedQueue(gevent.queue.Queue):
''' Does initial_burst (def. 30) at first, then alternates between waiting waiting_period (def. 5) seconds and doing subsequent_bursts (def. 10) queries. After 5 seconds with nothing left in the queue, resets rate limiting. '''
@ -225,7 +308,6 @@ class RateLimitedQueue(gevent.queue.Queue):
self.empty_start = 0
gevent.queue.Queue.__init__(self)
def get(self):
self.lock.acquire() # blocks if another greenlet currently has the lock
if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial:
@ -257,7 +339,6 @@ class RateLimitedQueue(gevent.queue.Queue):
return item
def download_thumbnail(save_directory, video_id):
url = "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
save_location = os.path.join(save_directory, video_id + ".jpg")
@ -269,26 +350,23 @@ def download_thumbnail(save_directory, video_id):
try:
f = open(save_location, 'wb')
except FileNotFoundError:
os.makedirs(save_directory, exist_ok = True)
os.makedirs(save_directory, exist_ok=True)
f = open(save_location, 'wb')
f.write(thumbnail)
f.close()
return True
def download_thumbnails(save_directory, ids):
if not isinstance(ids, (list, tuple)):
ids = list(ids)
# only do 5 at a time
# do the n where n is divisible by 5
i = -1
for i in range(0, int(len(ids)/5) - 1 ):
gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i*5, i*5 + 5)])
for i in range(0, int(len(ids) / 5) - 1):
gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i * 5, i * 5 + 5)])
# do the remainders (< 5)
gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i*5 + 5, len(ids))])
gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i * 5 + 5, len(ids))])
def dict_add(*dicts):
@ -296,6 +374,7 @@ def dict_add(*dicts):
dicts[0].update(dictionary)
return dicts[0]
def video_id(url):
url_parts = urllib.parse.urlparse(url)
return urllib.parse.parse_qs(url_parts.query)['v'][0]
@ -305,10 +384,11 @@ def video_id(url):
def get_thumbnail_url(video_id):
return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
def seconds_to_timestamp(seconds):
seconds = int(seconds)
hours, seconds = divmod(seconds,3600)
minutes, seconds = divmod(seconds,60)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
if hours != 0:
timestamp = str(hours) + ":"
timestamp += str(minutes).zfill(2) # zfill pads with zeros
@ -319,31 +399,32 @@ def seconds_to_timestamp(seconds):
return timestamp
def update_query_string(query_string, items):
parameters = urllib.parse.parse_qs(query_string)
parameters.update(items)
return urllib.parse.urlencode(parameters, doseq=True)
def uppercase_escape(s):
return re.sub(
r'\\U([0-9a-fA-F]{8})',
lambda m: chr(int(m.group(1), base=16)), s)
def prefix_url(url):
if url is None:
return None
url = url.lstrip('/') # some urls have // before them, which has a special meaning
return '/' + url
def left_remove(string, substring):
'''removes substring from the start of string, if present'''
if string.startswith(substring):
return string[len(substring):]
return string
def concat_or_none(*strings):
'''Concatenates strings. Returns None if any of the arguments are None'''
result = ''
@ -365,6 +446,7 @@ def prefix_urls(item):
except KeyError:
pass
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
@ -383,6 +465,7 @@ def add_extra_html_info(item):
elif item['type'] == 'channel':
item['url'] = (URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None
def parse_info_prepare_for_html(renderer, additional_info={}):
item = yt_data_extract.extract_item_info(renderer, additional_info)
prefix_urls(item)
@ -390,8 +473,8 @@ def parse_info_prepare_for_html(renderer, additional_info={}):
return item
def check_gevent_exceptions(*tasks):
for task in tasks:
if task.exception:
raise task.exception

View File

@ -8,11 +8,11 @@ from youtube import util, yt_data_extract
def get_video_sources(info, tor_bypass=False):
video_sources = []
max_resolution = "720"
max_resolution = 1080
for fmt in info['formats']:
if not all(fmt[attr] for attr in ('quality', 'width', 'ext', 'url')):
continue
if fmt['acodec'] and fmt['vcodec'] and fmt['height'] <= max_resolution:
if fmt['acodec'] and fmt['vcodec'] and (fmt['height'] <= max_resolution):
video_sources.append({
'src': fmt['url'],
'type': 'video/' + fmt['ext'],
@ -123,6 +123,24 @@ def get_subtitle_sources(info):
return sources
def decrypt_signatures(info):
'''return error string, or False if no errors'''
if not yt_data_extract.requires_decryption(info):
return False
if not info['player_name']:
return 'Could not find player name'
if not info['base_js']:
return 'Failed to find base.js'
player_name = info['player_name']
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js)
if err:
return err
err = yt_data_extract.decrypt_signatures(info)
return err
def get_ordered_music_list_attributes(music_list):
# get the set of attributes which are used by atleast 1 track
@ -146,14 +164,18 @@ headers = (
('X-YouTube-Client-Version', '2.20180830'),
) + util.mobile_ua
def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
# videos
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial videos
url = 'https://m.youtube.com/watch?v=' + video_id + '&pbj=1&bpctr=9999999999'
if playlist_id:
url += '&list=' + playlist_id
if index:
url += '&index=' + index
polymer_json = util.fetch_url(url, headers=headers, debug_name='watch')
# If there's a captcha... Return word Captcha
if polymer_json == 'Captcha':
return 'Captcha'
polymer_json = polymer_json.decode('utf-8')
# TODO: Decide whether this should be done in yt_data_extract.extract_watch_info
try:
@ -173,6 +195,12 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
url = 'https://www.youtube.com/get_video_info?' + urllib.parse.urlencode(data)
video_info_page = util.fetch_url(url, debug_name='get_video_info', report_text='Fetched age restriction bypass page').decode('utf-8')
yt_data_extract.update_with_age_restricted_info(info, video_info_page)
# signature decryption
decryption_error = decrypt_signatures(info)
if decryption_error:
decryption_error = 'Error decrypting url signatures: ' + decryption_error
info['playability_error'] = decryption_error
# check if urls ready (non-live format) in former livestream
# urls not ready if all of them have no filesize
if info['was_live']: