New version / CaptchaBypass testing

This commit is contained in:
pluja 2020-10-12 08:47:15 +02:00
parent ceffdcfe24
commit 99b9ad5591
10 changed files with 603 additions and 248 deletions

View File

@ -1,4 +1,3 @@
import datetime
import glob
import json
@ -30,8 +29,8 @@ from app.forms import LoginForm, RegistrationForm, EmptyForm, SearchForm, Channe
from app.models import User, twitterPost, ytPost, Post, youtubeFollow, twitterFollow
from youtube import comments, utils, channel as ytch, search as yts
from youtube import watch as ytwatch
#########################################
from youtube_data import search as yts
#########################################
@ -326,6 +325,10 @@ def ytsearch():
else:
prev_page = "/ytsearch?q={q}&s={s}&p={p}".format(q=query, s=sort, p=int(page) - 1)
for video in results['videos']:
hostname = urllib.parse.urlparse(video['videoThumb']).netloc
video['videoThumb'] = video['videoThumb'].replace("https://{}".format(hostname), "") + "&host=" + hostname
for channel in results['channels']:
if config['nginxVideoStream']:
channel['thumbnail'] = channel['thumbnail'].replace("~", "/")
@ -342,8 +345,6 @@ def ytsearch():
@app.route('/ytfollow/<channelId>', methods=['POST'])
@login_required
def ytfollow(channelId):
form = EmptyForm()
if form.validate_on_submit():
r = followYoutubeChannel(channelId)
return redirect(request.referrer)
@ -376,8 +377,6 @@ def followYoutubeChannel(channelId):
@app.route('/ytunfollow/<channelId>', methods=['POST'])
@login_required
def ytunfollow(channelId):
form = EmptyForm()
if form.validate_on_submit():
unfollowYoutubeChannel(channelId)
return redirect(request.referrer)
@ -404,27 +403,38 @@ def unfollowYoutubeChannel(channelId):
def channel(id):
form = ChannelForm()
button_form = EmptyForm()
data = requests.get('https://www.youtube.com/feeds/videos.xml?channel_id={id}'.format(id=id))
data = feedparser.parse(data.content)
channelData = YoutubeSearch.channelInfo(id)
page = request.args.get('p', None)
sort = request.args.get('s', None)
if page is None:
page = 1
if sort is None:
sort = 3
for video in channelData[1]:
data = ytch.get_channel_tab_info(id, page, sort)
for video in data['items']:
if config['nginxVideoStream']:
hostName = urllib.parse.urlparse(video['videoThumb']).netloc
video['videoThumb'] = video['videoThumb'].replace("https://{}".format(hostName), "").replace("hqdefault",
hostName = urllib.parse.urlparse(video['thumbnail'][1:]).netloc
video['thumbnail'] = video['thumbnail'].replace("https://{}".format(hostName), "")[1:].replace("hqdefault",
"mqdefault") + "&host=" + hostName
else:
video['videoThumb'] = video['videoThumb'].replace('/', '~')
if config['nginxVideoStream']:
hostName = urllib.parse.urlparse(channelData[0]['avatar']).netloc
channelData[0]['avatar'] = channelData[0]['avatar'].replace("https://{}".format(hostName),
"") + "?host=" + hostName
else:
channelData[0]['avatar'] = channelData[0]['avatar'].replace('/', '~')
video['thumbnail'] = video['thumbnail'].replace('/', '~')
return render_template('channel.html', form=form, btform=button_form, channel=channelData[0], videos=channelData[1],
restricted=config['restrictPublicUsage'], config=config)
if config['nginxVideoStream']:
hostName = urllib.parse.urlparse(data['avatar'][1:]).netloc
data['avatar'] = data['avatar'].replace("https://{}".format(hostName), "")[1:] + "?host=" + hostName
else:
data['avatar'] = data['avatar'].replace('/', '~')
next_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=int(page) + 1)
if int(page) == 1:
prev_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=1)
else:
prev_page = "/channel/{q}?s={s}&p={p}".format(q=id, s=sort, p=int(page) - 1)
return render_template('channel.html', form=form, btform=button_form, data=data,
restricted=config['restrictPublicUsage'], config=config, next_page=next_page, prev_page=prev_page)
def get_best_urls(urls):
@ -454,18 +464,13 @@ def get_live_urls(urls):
def watch():
id = request.args.get('v', None)
info = ytwatch.extract_info(id, False, playlist_id=None, index=None)
<<<<<<< Updated upstream
<<<<<<< Updated upstream
<<<<<<< Updated upstream
# Use nginx
best_formats = ["22", "18", "34", "35", "36", "37", "38", "43", "44", "45", "46"]
=======
=======
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
vsources = ytwatch.get_video_sources(info, False)
retry = 3
while retry != 0 and info['playability_error'] == 'Could not find player':
info=ytwatch.extract_info(id, False, playlist_id=None, index=None)
retry -= 1
vsources = ytwatch.get_video_sources(info, False)
# Retry 3 times if no sources are available.
retry = 3
while retry != 0 and len(vsources) == 0:
@ -477,26 +482,29 @@ def watch():
source['src'] = source['src'].replace("https://{}".format(hostName), "") + "&host=" + hostName
# Parse video formats
>>>>>>> Stashed changes
for v_format in info['formats']:
hostName = urllib.parse.urlparse(v_format['url']).netloc
v_format['url'] = v_format['url'].replace("https://{}".format(hostName), "") + "&host=" + hostName
if v_format['audio_bitrate'] is not None and v_format['vcodec'] is not None:
v_format['video_valid'] = True
elif v_format['audio_bitrate'] is not None and v_format['vcodec'] is None:
if v_format['audio_bitrate'] is not None and v_format['vcodec'] is None:
v_format['audio_valid'] = True
# Markup description
try:
info['description'] = Markup(bleach.linkify(info['description'].replace("\n", "<br>")))
except AttributeError or TypeError:
print(info['description'])
# Get comments
videocomments = comments.video_comments(id, sort=0, offset=0, lc='', secret_key='')
videocomments = utils.post_process_comments_info(videocomments)
if videocomments is not None:
videocomments.sort(key=lambda x: x['likes'], reverse=True)
# Calculate rating %
info['rating'] = str((info['like_count'] / (info['like_count'] + info['dislike_count'])) * 100)[0:4]
return render_template("video.html", info=info, title='{}'.format(info['title']), config=config, videocomments=videocomments)
return render_template("video.html", info=info, title='{}'.format(info['title']), config=config,
videocomments=videocomments, vsources=vsources)
def markupString(string):
@ -745,15 +753,17 @@ def register():
return render_template('register.html', title='Register', registrations=REGISTRATIONS, form=form, config=config)
@app.route('/registrations_status/icon')
def registrations_status_icon():
@app.route('/status')
def status():
count = db.session.query(User).count()
if count >= config['maxInstanceUsers'] or config['maxInstanceUsers'] == 0:
return redirect(url_for('static', filename='img/close.png'))
filen = url_for('static', filename='img/close.png')
caniregister = False
else:
filen = url_for('static', filename='img/open.png')
caniregister = True
return render_template('status.html', title='STATUS', count=count, max=config['maxInstanceUsers'], file=filen, cani=caniregister)
@app.route('/error/<errno>')
def error(errno):

View File

@ -1,46 +1,35 @@
<div class="card">
<div class="image">
{%if config.nginxVideoStream%}
<img alt="Thumbnail" src="{{video.videoThumb}}">
{%else%}
<img alt="Thumbnail" src="/img/{{video.videoThumb.replace('/', '~')}}">
{%endif%}
</div>
<div class="ui card">
<a class="image" href="{{url_for('watch', v=video.id, _method='GET')}}">
<img src="https://yotter.xyz{{video.videoThumb}}">
</a>
<div class="content">
{% if video.views == "Livestream" %}
<a class="video-title break-word" href="#">{{video.videoTitle}}</a>
{% else %}
<a class="video-title break-word" href="{{url_for('watch', v=video.id, _method='GET')}}">{{video.videoTitle}}</a>
{% endif %}
<a class="header" href="{{url_for('watch', v=video.id, _method='GET')}}">{{video.videoTitle}}</a>
<div class="meta">
<a class="break-word" href="{{url_for('channel', id=video.channelId)}}">{{video.channelName}}</a>
</div>
<div class="description break-word">
{{video.description}}
</div>
</div>
<div class="extra content">
{% if video.isLive == "Livestream" or video.isLive %}
<span class="right floated">
<span class="left floated like">
<i class="red circle icon"></i>
{{video.views}}
{{video.views}}
</span>
{% else %}
<span class="right floated">
<span class="left floated like">
<i class="eye icon"></i>
{{video.views}}
{{video.views}}
</span>
{% endif %}
{% if video.timeStamp == "Scheduled" or video.isUpcoming %}
<span class="right floated">
<span class="right floated star">
<i class="blue clock icon"></i>
{{video.timeStamp}}
{{video.timeStamp}}
</span>
{% else %}
<span class="right floated">
<span class="right floated star">
<i class="clock icon"></i>
{{video.timeStamp}}
{{video.timeStamp}}
</span>
{% endif %}
<span>

View File

@ -1,58 +1,94 @@
{% extends "base.html" %}
{% block content %}
<div class="blue ui centered card">
<div class="content">
<div class="center aligned author">
{%if config.nginxVideoStream%}
<img alt="Thumbnail" src="{{channel.avatar}}">
{%else%}
<img alt="Thumbnail" src="/img/{{channel.avatar.replace('/', '~')}}">
{%endif%}
<div class="ui center aligned text container">
<div class="ui centered vertical segment">
<h2 class="ui header">
<img src="{{data.avatar}}" class="ui circular image">
{{data.channel_name}}
</h2>
</div>
<div class="center aligned header"><a href="">{{channel.name}}</a></div>
<div class="center aligned description">
<div class="statistic">
<div class="ui vertical segment">
<p>{{data.short_description}}</p>
</div>
<div class="ui vertical segment">
<div class="ui tiny statistic">
<div class="value">
<i class="users icon"></i>{{channel.subCount}}
{%if data.approx_suscriber_count == None%}
<i class="user icon"></i> ?
{%else%}
<i class="user icon"></i> {{data.approx_subscriber_count}}
{%endif%}
</div>
<div class="label">
Followers
</div>
</div>
</div>
</div>
{% if restricted or current_user.is_authenticated %}
<div class="center aligned extra content">
{% if not current_user.is_following_yt(channel.id) %}
<p>
<form action="{{ url_for('ytfollow', channelId=channel.id) }}" method="post">
{{ btform.hidden_tag() }}
{{ btform.submit(value='Follow') }}
{% if not current_user.is_following_yt(data.channel_id) %}
<form action="{{ url_for('ytfollow', channelId=data.channel_id) }}" method="post">
<button type="submit" value="Submit" class="ui red button">
<i class="user icon"></i>
Suscribe
</button>
</form>
</p>
{% else %}
<p>
<form action="{{ url_for('ytunfollow', channelId=channel.id) }}" method="post">
{{ btform.hidden_tag() }}
{{ btform.submit(value='Unfollow') }}
<form action="{{ url_for('ytunfollow', channelId=data.channel_id) }}" method="post">
<button type="submit" value="Submit" class="ui red active button">
<i class="user icon"></i>
Unsuscribe
</button>
</form>
</p>
{%endif%}
</div>
{%endif%}
</div>
</div>
<br>
<br>
{% if not videos %}
{% if data['error'] != None %}
{% include '_empty_feed.html' %}
{% else %}
<div class="ui centered cards">
{% for video in videos %}
{% include '_video_item.html' %}
{% for video in data['items'] %}
<div class="ui card">
<a class="image" href="{{url_for('watch', v=video.id, _method='GET')}}">
<img src="https://yotter.xyz{{video.thumbnail}}">
</a>
<div class="content">
<a class="header" href="{{url_for('watch', v=video.id, _method='GET')}}">{{video.title}}</a>
<div class="meta">
<a class="break-word" href="{{url_for('channel', id=video.channel_id)}}">{{data.channel_name}}</a>
</div>
</div>
<div class="extra content">
<span class="left floated like">
<i class="eye icon"></i>
{{video.approx_view_count}}
</span>
{%if video.duration == "PREMIERING NOW" or video.duration == "LIVE"%}
<span class="right floated star">
<i class="red circle icon"></i>
LIVE
</span>
{%else%}
<span class="right floated star">
<i class="clock icon"></i>
{{video.time_published}}
</span>
{%endif%}
</div>
</div>
{% endfor %}
</div>
{% endif %}
<br>
<div class="ui center aligned text container">
<a href="{{prev_page}}"> <button class="ui left attached button"><i class="angle red left icon"></i></button> </a>
<a href="{{next_page}}"> <button class="right attached ui button"><i class="angle red right icon"></i></button></a>
</div>
<br>
{% endblock %}

46
app/templates/status.html Normal file
View File

@ -0,0 +1,46 @@
{% extends "base.html" %}
{% block content %}
<div class="ui text container center aligned centered">
<div class="ui placeholder segment">
<div class="ui two column stackable center aligned grid">
<div class="ui vertical divider">
{%if cani%}
:)
{%else%}
:(
{%endif%}
</div>
<div class="middle aligned row">
<div class="column">
<h3 class="ui header"> Capacity </h3>
<div class="ui icon header">
{%if cani%}
<i class="green users icon"></i>
{%else%}
<i class="red users icon"></i>
{%endif%}
{{count}}/{{max}}
</div>
</div>
<div class="column">
<div class="ui icon header">
<i class="user circle outline icon"></i>
Can I register?
</div>
{%if cani%}
<a href="/register"><div class="ui green button">
Yes!
</div></a>
{%else%}
<a href="#!"><div class="ui disabled red button">
It's full!
</div></a>
{%endif%}
</div>
</div>
</div>
</div>
</div>
{%endblock%}

View File

@ -34,20 +34,18 @@
</div>
{%else%}
<div class="video-js-responsive-container vjs-hd">
<video class="video-js vjs-default-skin"
<video-js autofocus class="video-js vjs-default-skin"
data-setup='{ "playbackRates": [0.5, 0.75, 1, 1.25,1.5, 1.75, 2] }'
width="1080"
controls
buffered
preload="none">
{% if config.nginxVideoStream %}
{% for format in info.formats %}
{% if format.video_valid %}
<source src="{{format.url}}" type="video/{{format.ext}}">
{% endif %}
{% for source in vsources %}
<source src="{{source.src}}" type="{{source.type}}">
{% endfor %}
{% endif %}
</video>
</video-js>
</div>
{%endif%}
@ -99,7 +97,6 @@
<script src="{{ url_for('static',filename='video.min.js') }}"></script>
{% if info.live %}
<p>Active</p>
<script src="{{ url_for('static',filename='videojs-http-streaming.min.js')}}"></script>
<script>
var player = videojs('live');

View File

@ -1,20 +1,16 @@
import base64
from youtube import util, yt_data_extract, local_playlist, subscriptions
from youtube import yt_app
import urllib
import json
from string import Template
import youtube.proto as proto
import html
import math
import gevent
import re
import cachetools.func
import traceback
import urllib
import cachetools.func
import flask
from flask import request
import gevent
import youtube.proto as proto
from youtube import util, yt_data_extract
headers_desktop = (
('Accept', '*/*'),
@ -109,7 +105,7 @@ def channel_ctoken_v1(channel_id, page, sort, tab, view=1):
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, print_status=True):
def get_channel_tab_info(channel_id, page="1", sort=3, tab='videos', view=1, print_status=True):
message = 'Got channel tab' if print_status else None
if int(sort) == 2 and int(page) > 1:
@ -128,7 +124,11 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, print_st
headers_desktop + generic_cookie,
debug_name='channel_tab', report_text=message)
return content
info = yt_data_extract.extract_channel_info(json.loads(content), tab)
if info['error'] is not None:
return False
post_process_channel_info(info)
return info
# cache entries expire after 30 minutes
@cachetools.func.ttl_cache(maxsize=128, ttl=30*60)
@ -259,23 +259,4 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None):
**info
)
@yt_app.route('/channel/<channel_id>/')
@yt_app.route('/channel/<channel_id>/<tab>')
def get_channel_page(channel_id, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
@yt_app.route('/user/<username>/')
@yt_app.route('/user/<username>/<tab>')
def get_user_page(username, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
@yt_app.route('/c/<custom>/')
@yt_app.route('/c/<custom>/<tab>')
def get_custom_c_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
@yt_app.route('/<custom>')
@yt_app.route('/<custom>/<tab>')
def get_toplevel_custom_page(custom, tab='videos'):
return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)

213
youtube/channels.py Normal file
View File

@ -0,0 +1,213 @@
from youtube import proto
from flask import Markup as mk
import requests
import base64
import json
import re
# From: https://github.com/user234683/youtube-local/blob/master/youtube/channel.py
# SORT:
# videos:
# Popular - 1
# Oldest - 2
# Newest - 3
# playlists:
# Oldest - 2
# Newest - 3
# Last video added - 4
# view:
# grid: 0 or 1
# list: 2
headers = {
'Host': 'www.youtube.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418',
}
real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),)
generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
def channel_ctoken_desktop(channel_id, page, sort, tab, view=1):
# see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646
# page > 1 doesn't work when sorting by oldest
offset = 30*(int(page) - 1)
schema_number = {
3: 6307666885028338688,
2: 17254859483345278706,
1: 16570086088270825023,
}[int(sort)]
page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1,
proto.uint(1, schema_number) + proto.string(2,
proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
)
)))
tab = proto.string(2, tab )
sort = proto.uint(3, int(sort))
#page = proto.string(15, str(page) )
shelf_view = proto.uint(4, 0)
view = proto.uint(6, int(view))
continuation_info = proto.string(3,
proto.percent_b64encode(tab + sort + shelf_view + view + page_token)
)
channel_id = proto.string(2, channel_id )
pointless_nest = proto.string(80226972, channel_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def channel_ctoken_mobile(channel_id, page, sort, tab, view=1):
tab = proto.string(2, tab )
sort = proto.uint(3, int(sort))
page = proto.string(15, str(page) )
# example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos
shelf_view = proto.uint(4, 0)
view = proto.uint(6, int(view))
continuation_info = proto.string( 3, proto.percent_b64encode(tab + view + sort + shelf_view + page) )
channel_id = proto.string(2, channel_id )
pointless_nest = proto.string(80226972, channel_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def id_or_username(string):
cidRegex = "^UC.{22}$"
if re.match(cidRegex, string):
return "channel"
else:
return "user"
def get_channel_videos_tab(content):
tabs = content['contents']['twoColumnBrowseResultsRenderer']['tabs']
for tab in tabs:
if tab['title'] != "Videos":
continue
else:
return tab
def get_video_items_from_tab(tab):
items = []
for item in tab:
try:
if item['gridVideoRenderer']:
items.append(item)
else:
continue
except KeyError:
continue
return items
def get_info_grid_video_item(item, channel=None):
item = item['gridVideoRenderer']
thumbnailOverlays = item['thumbnailOverlays']
published = ""
views = ""
isLive = False
isUpcoming = False
try:
if 'UPCOMING' in str(thumbnailOverlays):
start_time = item['upcomingEventData']['startTime']
isUpcoming = True
views = "-"
published = "Scheduled"
except KeyError:
isUpcoming = False
try:
if 'LIVE' in str(thumbnailOverlays):
isLive = True
try:
views = item['viewCountText']['simpleText']
except:
views = "Live"
try:
duration = item['lengthText']['simpleText']
except:
duration = "-"
if published != "Scheduled":
try:
published = item['publishedTimeText']['simpleText']
except KeyError:
published = "None"
except KeyError:
isUpcoming = False
isLive = False
if not isUpcoming and not isLive:
views = item['viewCountText']['simpleText']
published = item['publishedTimeText']['simpleText']
try:
duration = item['lengthText']['simpleText']
except:
duration = "?"
video = {
'videoTitle':item['title']['runs'][0]['text'],
'description':"",
'views':views,
'timeStamp':published,
'duration':duration,
'channelName':channel['username'],
'authorUrl':"/channel/{}".format(channel['channelId']),
'channelId':channel['channelId'],
'id':item['videoId'],
'videoUrl':"/watch?v={}".format(item['videoId']),
'isLive':isLive,
'isUpcoming':isUpcoming,
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
}
return video
def get_author_info_from_channel(content):
hmd = content['metadata']['channelMetadataRenderer']
cmd = content['header']['c4TabbedHeaderRenderer']
description = mk(hmd['description'])
channel = {
"channelId": cmd['channelId'],
"username": cmd['title'],
"thumbnail": "https:{}".format(cmd['avatar']['thumbnails'][0]['url'].replace("/", "~")),
"description":description,
"suscribers": cmd['subscriberCountText']['runs'][0]['text'].split(" ")[0],
"banner": cmd['banner']['thumbnails'][0]['url']
}
return channel
def get_channel_info(channelId, videos=True, page=1, sort=3):
if id_or_username(channelId) == "channel":
videos = []
ciUrl = "https://www.youtube.com/channel/{}".format(channelId)
mainUrl = "https://www.youtube.com/browse_ajax?ctoken={}".format(channel_ctoken_desktop(channelId, page, sort, "videos"))
content = json.loads(requests.get(mainUrl, headers=headers).text)
req = requests.get(ciUrl, headers=headers).text
start = (
req.index('window["ytInitialData"]')
+ len('window["ytInitialData"]')
+ 3
)
end = req.index("};", start) + 1
jsonIni = req[start:end]
data = json.loads(jsonIni)
#videosTab = get_channel_videos_tab(content)
authorInfo = get_author_info_from_channel(data)
if videos:
gridVideoItemList = get_video_items_from_tab(content[1]['response']['continuationContents']['gridContinuation']['items'])
for video in gridVideoItemList:
vid = get_info_grid_video_item(video, authorInfo)
videos.append(vid)
print({"channel":authorInfo, "videos":videos})
return {"channel":authorInfo, "videos":videos}
else:
return {"channel":authorInfo}
else:
baseUrl = "https://www.youtube.com/user/{}".format(channelId)

View File

@ -1,38 +1,10 @@
from youtube import proto
from youtube import utils
from flask import Markup
import urllib.parse
import requests
import base64
import json
import urllib
import flask
from flask import request
from werkzeug.exceptions import abort
from youtube import util, yt_data_extract, proto
from youtube import yt_app
# Sort: 1
# Upload date: 2
# View count: 3
# Rating: 1
# Relevance: 0
# Offset: 9
# Filters: 2
# Upload date: 1
# Type: 2
# Duration: 3
features = {
'4k': 14,
'hd': 4,
'hdr': 25,
'subtitles': 5,
'creative_commons': 6,
'3d': 7,
'live': 8,
'purchased': 9,
'360': 15,
'location': 23,
}
def page_number_to_sp_parameter(page, autocorrect, sort, filters):
offset = (int(page) - 1)*20 # 20 results per page
@ -41,8 +13,8 @@ def page_number_to_sp_parameter(page, autocorrect, sort, filters):
result = proto.uint(1, sort) + filters_enc + autocorrect + proto.uint(9, offset) + proto.string(61, b'')
return base64.urlsafe_b64encode(result).decode('ascii')
def get_search_json(query, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(query)
def search_by_terms(search_terms, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(search_terms)
headers = {
'Host': 'www.youtube.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
@ -52,54 +24,145 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
content = requests.get(url, headers=headers).text
info = json.loads(content)
return info
videos = get_videos_from_search(info)
channels = get_channels_from_search(info)
results = {
"videos": videos,
"channels": channels
}
return results
@yt_app.route('/search')
def get_search_page():
if len(request.args) == 0:
return flask.render_template('base.html', title="Search")
def get_channels_from_search(search):
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
contents = primaryContents['sectionListRenderer']['contents']
if 'query' not in request.args:
abort(400)
for content in contents:
try:
items = content['itemSectionRenderer']['contents']
except:
continue
query = request.args.get("query")
page = request.args.get("page", "1")
autocorrect = int(request.args.get("autocorrect", "1"))
sort = int(request.args.get("sort", "0"))
filters = {}
filters['time'] = int(request.args.get("time", "0"))
filters['type'] = int(request.args.get("type", "0"))
filters['duration'] = int(request.args.get("duration", "0"))
polymer_json = get_search_json(query, page, autocorrect, sort, filters)
for item in items:
try:
item['channelRenderer']
channel = get_channel_renderer_item_info(item['channelRenderer'])
results.append(channel)
except KeyError:
continue
return results
search_info = yt_data_extract.extract_search_info(polymer_json)
if search_info['error']:
return flask.render_template('error.html', error_message = search_info['error'])
def get_channel_renderer_item_info(item):
try:
suscribers = item['subscriberCountText']['simpleText'].split(" ")[0]
except:
suscribers = "?"
for extract_item_info in search_info['items']:
util.prefix_urls(extract_item_info)
util.add_extra_html_info(extract_item_info)
try:
description = utils.get_description_snippet_text(item['descriptionSnippet']['runs'])
except KeyError:
description = ""
corrections = search_info['corrections']
if corrections['type'] == 'did_you_mean':
corrected_query_string = request.args.to_dict(flat=False)
corrected_query_string['query'] = [corrections['corrected_query']]
corrections['corrected_query_url'] = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
elif corrections['type'] == 'showing_results_for':
no_autocorrect_query_string = request.args.to_dict(flat=False)
no_autocorrect_query_string['autocorrect'] = ['0']
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
corrections['original_query_url'] = no_autocorrect_query_url
try:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"thumbnail": "https:{}".format(item['thumbnail']['thumbnails'][0]['url'].replace("/", "~")),
"description": Markup(str(description)),
"suscribers": suscribers,
"videos": item['videoCountText']['runs'][0]['text']
}
except KeyError:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"avatar": item['thumbnail']['thumbnails'][0]['url'],
"suscribers": suscribers
}
return channel
def get_videos_from_search(search):
latest = []
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
contents = primaryContents['sectionListRenderer']['contents']
for content in contents:
try:
items = content['itemSectionRenderer']['contents']
except:
continue
for item in items:
try:
item['videoRenderer']
video = get_video_renderer_item_info(item['videoRenderer'])
results.append(video)
except KeyError:
continue
# Sometimes Youtube will return an empty query. Try again.
return results
def get_video_renderer_item_info(item):
published = ""
views = ""
isLive = False
isUpcoming = False
thumbnailOverlays = item['thumbnailOverlays']
try:
if 'UPCOMING' in str(thumbnailOverlays):
start_time = item['upcomingEventData']['startTime']
isUpcoming = True
views = "-"
published = "Scheduled"
except KeyError:
isUpcoming = False
try:
if 'LIVE' in str(thumbnailOverlays):
isLive = True
try:
views = item['viewCountText']['simpleText']
except:
views = "Live"
try:
duration = item['lengthText']['simpleText']
except:
duration = "-"
if published != "Scheduled":
try:
published = item['publishedTimeText']['simpleText']
except KeyError:
published = "None"
except:
isUpcoming = False
isLive = False
if not isUpcoming and not isLive:
views = item['viewCountText']['simpleText']
published = item['publishedTimeText']['simpleText']
duration = item['lengthText']['simpleText']
video = {
'videoTitle':item['title']['runs'][0]['text'],
'description':Markup(str(utils.get_description_snippet_text(item['descriptionSnippet']['runs']))),
'views':views,
'timeStamp':published,
'duration':duration,
'channelName':item['ownerText']['runs'][0]['text'],
'authorUrl':"/channel/{}".format(item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId']),
'channelId':item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'],
'id':item['videoId'],
'videoUrl':"/watch?v={}".format(item['videoId']),
'isLive':isLive,
'isUpcoming':isUpcoming,
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
}
return video
return flask.render_template('search.html',
header_playlist_names = local_playlist.get_playlist_names(),
query = query,
estimated_results = search_info['estimated_results'],
estimated_pages = search_info['estimated_pages'],
corrections = search_info['corrections'],
results = search_info['items'],
parameters_dictionary = request.args,
)

View File

@ -99,12 +99,7 @@ def decode_content(content, encoding_header):
return content
def bypass_captcha():
session = requests.Session()
url = "https://youtube.com/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999"
print("Starting python GET request...")
response = session.get(url)
print("GET successful!")
def bypass_captcha(session, response):
print("vvv COOKIES DICT vvv")
cookies = session.cookies.get_dict()
print(cookies)
@ -195,15 +190,16 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
# (in connectionpool.py in urllib3)
# According to the documentation for urlopen, a redirect counts as a
# retry. So there are 3 redirects max by default.
print("Testing for CAPTCHA python GET request...")
r = requests.get(url)
print("GET successful!")
html = BeautifulSoup(str(r.text), "lxml")
# If there's a captcha and we need to solve it...
if html.body.find('div', attrs={'class': 'g-recaptcha'}):
print("ReCaptcha detected! Trying to bypass it.")
bypass_captcha()
session = requests.Session()
print("Starting python GET request...")
response = session.get(url)
# Strings that appear when there's a Captcha.
string_de = "Fülle das folgende Feld aus, um YouTube weiter zu nutzen."
string_en = "To continue with your YouTube experience, please fill out the form below."
# If there's a captcha, bypass it.
if string_de in response.text or string_en in response.text:
bypass_captcha(session, response)
if max_redirects:
retries = urllib3.Retry(3 + max_redirects, redirect=max_redirects)
@ -464,3 +460,4 @@ def check_gevent_exceptions(*tasks):
for task in tasks:
if task.exception:
raise task.exception

View File

@ -8,11 +8,11 @@ from youtube import util, yt_data_extract
def get_video_sources(info, tor_bypass=False):
video_sources = []
max_resolution = "720"
max_resolution = 1080
for fmt in info['formats']:
if not all(fmt[attr] for attr in ('quality', 'width', 'ext', 'url')):
continue
if fmt['acodec'] and fmt['vcodec'] and fmt['height'] <= max_resolution:
if fmt['acodec'] and fmt['vcodec'] and (fmt['height'] <= max_resolution):
video_sources.append({
'src': fmt['url'],
'type': 'video/' + fmt['ext'],
@ -123,6 +123,24 @@ def get_subtitle_sources(info):
return sources
def decrypt_signatures(info):
'''return error string, or False if no errors'''
if not yt_data_extract.requires_decryption(info):
return False
if not info['player_name']:
return 'Could not find player name'
if not info['base_js']:
return 'Failed to find base.js'
player_name = info['player_name']
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = base_js.decode('utf-8')
err = yt_data_extract.extract_decryption_function(info, base_js)
if err:
return err
err = yt_data_extract.decrypt_signatures(info)
return err
def get_ordered_music_list_attributes(music_list):
# get the set of attributes which are used by atleast 1 track
@ -146,9 +164,8 @@ headers = (
('X-YouTube-Client-Version', '2.20180830'),
) + util.mobile_ua
def extract_info(video_id, use_invidious, playlist_id=None, index=None):
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial
# videos
url = 'https://m.youtube.com/watch?v=' + video_id + '&gl=US&hl=en&has_verified=1&pbj=1&bpctr=9999999999'
# bpctr=9999999999 will bypass are-you-sure dialogs for controversial videos
url = 'https://m.youtube.com/watch?v=' + video_id + '&pbj=1&bpctr=9999999999'
if playlist_id:
url += '&list=' + playlist_id
if index:
@ -173,6 +190,12 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
url = 'https://www.youtube.com/get_video_info?' + urllib.parse.urlencode(data)
video_info_page = util.fetch_url(url, debug_name='get_video_info', report_text='Fetched age restriction bypass page').decode('utf-8')
yt_data_extract.update_with_age_restricted_info(info, video_info_page)
# signature decryption
decryption_error = decrypt_signatures(info)
if decryption_error:
decryption_error = 'Error decrypting url signatures: ' + decryption_error
info['playability_error'] = decryption_error
# check if urls ready (non-live format) in former livestream
# urls not ready if all of them have no filesize
if info['was_live']: