This commit is contained in:
pluja 2021-03-12 19:50:13 +01:00
parent 1d52e68b3e
commit d0aa476f70
6 changed files with 158 additions and 38 deletions

View File

@ -429,8 +429,7 @@ def channel(id):
if sort is None: if sort is None:
sort = 3 sort = 3
data = ytch.get_channel_tab_info(id, page, sort) data = ytch.get_channel_tab(id, page, sort)
for video in data['items']: for video in data['items']:
if config['isInstance']: if config['isInstance']:
hostName = urllib.parse.urlparse(video['thumbnail'][1:]).netloc hostName = urllib.parse.urlparse(video['thumbnail'][1:]).netloc

View File

@ -1,7 +1,7 @@
{ {
"serverName": "yotter.xyz", "serverName": "yotter.xyz",
"nitterInstance": "https://nitter.net/", "nitterInstance": "https://nitter.mastodont.cat/",
"maxInstanceUsers": 120, "maxInstanceUsers": 200,
"serverLocation": "Germany", "serverLocation": "Germany",
"restrictPublicUsage":true, "restrictPublicUsage":true,
"isInstance":true, "isInstance":true,

View File

@ -105,25 +105,36 @@ def channel_ctoken_v1(channel_id, page, sort, tab, view=1):
return base64.urlsafe_b64encode(pointless_nest).decode('ascii') return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
def get_channel_tab_info(channel_id, page="1", sort=3, tab='videos', view=1, print_status=True): def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
ctoken=None, print_status=True):
message = 'Got channel tab' if print_status else None message = 'Got channel tab' if print_status else None
if int(sort) == 2 and int(page) > 1: if not ctoken:
ctoken = channel_ctoken_v1(channel_id, page, sort, tab, view)
ctoken = ctoken.replace('=', '%3D')
url = ('https://www.youtube.com/channel/' + channel_id + '/' + tab
+ '?action_continuation=1&continuation=' + ctoken
+ '&pbj=1')
content = util.fetch_url(url, headers_desktop + real_cookie,
debug_name='channel_tab', report_text=message)
else:
ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view)
ctoken = ctoken.replace('=', '%3D') ctoken = ctoken.replace('=', '%3D')
url = 'https://www.youtube.com/browse_ajax?ctoken=' + ctoken
content = util.fetch_url(url,
headers_desktop + generic_cookie,
debug_name='channel_tab', report_text=message)
# Not sure what the purpose of the key is or whether it will change
# For now it seems to be constant for the API endpoint, not dependent
# on the browsing session or channel
key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
data = {
'context': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'WEB',
'clientVersion': '2.20180830',
},
},
'continuation': ctoken,
}
content_type_header = (('Content-Type', 'application/json'),)
content = util.fetch_url(
url, headers_desktop + content_type_header,
data=json.dumps(data), debug_name='channel_tab', report_text=message)
info = yt_data_extract.extract_channel_info(json.loads(content), tab) info = yt_data_extract.extract_channel_info(json.loads(content), tab)
if info['error'] is not None: if info['error'] is not None:
return False return False
@ -174,12 +185,31 @@ def get_number_of_videos_general(base_url):
return get_number_of_videos_channel(get_channel_id(base_url)) return get_number_of_videos_channel(get_channel_id(base_url))
def get_channel_search_json(channel_id, query, page): def get_channel_search_json(channel_id, query, page):
params = proto.string(2, 'search') + proto.string(15, str(page)) offset = proto.unpadded_b64encode(proto.uint(3, (page-1)*30))
params = proto.string(2, 'search') + proto.string(15, offset)
params = proto.percent_b64encode(params) params = proto.percent_b64encode(params)
ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query) ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query)
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
polymer_json = util.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, headers_desktop, debug_name='channel_search') key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
data = {
'context': {
'client': {
'hl': 'en',
'gl': 'US',
'clientName': 'WEB',
'clientVersion': '2.20180830',
},
},
'continuation': ctoken,
}
content_type_header = (('Content-Type', 'application/json'),)
polymer_json = util.fetch_url(
url, headers_desktop + content_type_header,
data=json.dumps(data), debug_name='channel_search')
return polymer_json return polymer_json

View File

@ -120,9 +120,9 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
if data is not None: if data is not None:
method = "POST" method = "POST"
if isinstance(data, str): if isinstance(data, str):
data = data.encode('ascii') data = data.encode('utf-8')
elif not isinstance(data, bytes): elif not isinstance(data, bytes):
data = urllib.parse.urlencode(data).encode('ascii') data = urllib.parse.urlencode(data).encode('utf-8')
if cookiejar_send is not None or cookiejar_receive is not None: # Use urllib if cookiejar_send is not None or cookiejar_receive is not None: # Use urllib
req = urllib.request.Request(url, data=data, headers=headers) req = urllib.request.Request(url, data=data, headers=headers)
@ -143,7 +143,7 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
else: else:
retries = urllib3.Retry(3) retries = urllib3.Retry(3)
pool = get_pool(use_tor) pool = get_pool(use_tor)
response = pool.request(method, url, headers=headers, response = pool.request(method, url, headers=headers, body=data,
timeout=timeout, preload_content=False, timeout=timeout, preload_content=False,
decode_content=False, retries=retries) decode_content=False, retries=retries)
cleanup_func = (lambda r: r.release_conn()) cleanup_func = (lambda r: r.release_conn())
@ -156,7 +156,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
start_time = time.time() start_time = time.time()
response, cleanup_func = fetch_url_response( response, cleanup_func = fetch_url_response(
url, headers, timeout=timeout, url, headers, timeout=timeout, data=data,
cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive, cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive,
use_tor=use_tor) use_tor=use_tor)
response_time = time.time() response_time = time.time()

View File

@ -290,7 +290,7 @@ def extract_item_info(item, additional_info={}):
info['duration'] = extract_str(item.get('lengthText')) info['duration'] = extract_str(item.get('lengthText'))
# if it's an item in a playlist, get its index # if it's an item in a playlist, get its index
if 'index' in item: # url has wrong index on playlist page if 'index' in item: # url has wrong index on playlist page
info['index'] = extract_int(item.get('index')) info['index'] = extract_int(item.get('index'))
elif 'indexText' in item: elif 'indexText' in item:
# Current item in playlist has ▶ instead of the actual index, must # Current item in playlist has ▶ instead of the actual index, must
@ -329,6 +329,11 @@ def extract_item_info(item, additional_info={}):
def extract_response(polymer_json): def extract_response(polymer_json):
'''return response, error''' '''return response, error'''
# /youtubei/v1/browse endpoint returns response directly
if isinstance(polymer_json, dict) and 'responseContext' in polymer_json:
# this is the response
return polymer_json, None
response = multi_deep_get(polymer_json, [1, 'response'], ['response']) response = multi_deep_get(polymer_json, [1, 'response'], ['response'])
if response is None: if response is None:
return None, 'Failed to extract response' return None, 'Failed to extract response'

View File

@ -172,14 +172,13 @@ def _extract_watch_info_mobile(top_level):
else: else:
info['playlist'] = {} info['playlist'] = {}
info['playlist']['title'] = playlist.get('title') info['playlist']['title'] = playlist.get('title')
info['playlist']['author'] = extract_str(multi_get(playlist, info['playlist']['author'] = extract_str(multi_get(playlist,
'ownerName', 'longBylineText', 'shortBylineText', 'ownerText')) 'ownerName', 'longBylineText', 'shortBylineText', 'ownerText'))
author_id = deep_get(playlist, 'longBylineText', 'runs', 0, author_id = deep_get(playlist, 'longBylineText', 'runs', 0,
'navigationEndpoint', 'browseEndpoint', 'browseId') 'navigationEndpoint', 'browseEndpoint', 'browseId')
info['playlist']['author_id'] = author_id info['playlist']['author_id'] = author_id
if author_id: info['playlist']['author_url'] = concat_or_none(
info['playlist']['author_url'] = concat_or_none( 'https://www.youtube.com/channel/', author_id)
'https://www.youtube.com/channel/', author_id)
info['playlist']['id'] = playlist.get('playlistId') info['playlist']['id'] = playlist.get('playlistId')
info['playlist']['url'] = concat_or_none( info['playlist']['url'] = concat_or_none(
'https://www.youtube.com/playlist?list=', 'https://www.youtube.com/playlist?list=',
@ -447,7 +446,8 @@ def _extract_playability_error(info, player_response, error_prefix=''):
SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
def extract_watch_info(polymer_json): def extract_watch_info(polymer_json):
info = {'playability_error': None, 'error': None} info = {'playability_error': None, 'error': None,
'player_response_missing': None}
if isinstance(polymer_json, dict): if isinstance(polymer_json, dict):
top_level = polymer_json top_level = polymer_json
@ -509,6 +509,10 @@ def extract_watch_info(polymer_json):
if not info['formats']: if not info['formats']:
_extract_formats(info, player_response) _extract_formats(info, player_response)
# see https://github.com/user234683/youtube-local/issues/22#issuecomment-706395160
info['player_urls_missing'] = (
not info['formats'] and not embedded_player_response)
# playability errors # playability errors
_extract_playability_error(info, player_response) _extract_playability_error(info, player_response)
@ -565,6 +569,84 @@ def extract_watch_info(polymer_json):
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
return info return info
single_char_codes = {
'n': '\n',
'\\': '\\',
'"': '"',
"'": "'",
'b': '\b',
'f': '\f',
'n': '\n',
'r': '\r',
't': '\t',
'v': '\x0b',
'0': '\x00',
'\n': '', # backslash followed by literal newline joins lines
}
def js_escape_replace(match):
r'''Resolves javascript string escape sequences such as \x..'''
# some js-strings in the watch page html include them for no reason
# https://mathiasbynens.be/notes/javascript-escapes
escaped_sequence = match.group(1)
if escaped_sequence[0] in ('x', 'u'):
return chr(int(escaped_sequence[1:], base=16))
# In javascript, if it's not one of those escape codes, it's just the
# literal character. e.g., "\a" = "a"
return single_char_codes.get(escaped_sequence, escaped_sequence)
# works but complicated and unsafe:
#PLAYER_RESPONSE_RE = re.compile(r'<script[^>]*?>[^<]*?var ytInitialPlayerResponse = ({(?:"(?:[^"\\]|\\.)*?"|[^"])+?});')
# Because there are sometimes additional statements after the json object
# so we just capture all of those until end of script and tell json decoder
# to ignore extra stuff after the json object
PLAYER_RESPONSE_RE = re.compile(r'<script[^>]*?>[^<]*?var ytInitialPlayerResponse = ({.*?)</script>')
INITIAL_DATA_RE = re.compile(r"<script[^>]*?>var ytInitialData = '(.+?[^\\])';")
BASE_JS_RE = re.compile(r'jsUrl":\s*"([\w\-\./]+?/base.js)"')
JS_STRING_ESCAPE_RE = re.compile(r'\\([^xu]|x..|u....)')
def extract_watch_info_from_html(watch_html):
base_js_match = BASE_JS_RE.search(watch_html)
player_response_match = PLAYER_RESPONSE_RE.search(watch_html)
initial_data_match = INITIAL_DATA_RE.search(watch_html)
if base_js_match is not None:
base_js_url = base_js_match.group(1)
else:
base_js_url = None
if player_response_match is not None:
decoder = json.JSONDecoder()
# this will make it ignore extra stuff after end of object
player_response = decoder.raw_decode(player_response_match.group(1))[0]
else:
return {'error': 'Could not find ytInitialPlayerResponse'}
player_response = None
if initial_data_match is not None:
initial_data = initial_data_match.group(1)
initial_data = JS_STRING_ESCAPE_RE.sub(js_escape_replace, initial_data)
initial_data = json.loads(initial_data)
else:
print('extract_watch_info_from_html: failed to find initialData')
initial_data = None
# imitate old format expected by extract_watch_info
fake_polymer_json = {
'player': {
'args': {},
'assets': {
'js': base_js_url
}
},
'playerResponse': player_response,
'response': initial_data,
}
return extract_watch_info(fake_polymer_json)
def get_caption_url(info, language, format, automatic=False, translation_language=None): def get_caption_url(info, language, format, automatic=False, translation_language=None):
'''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.''' '''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
url = info['_captions_base_url'] url = info['_captions_base_url']
@ -580,7 +662,8 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
return url return url
def update_with_age_restricted_info(info, video_info_page): def update_with_age_restricted_info(info, video_info_page):
ERROR_PREFIX = 'Error bypassing age-restriction: ' '''Inserts urls from 'player_response' in get_video_info page'''
ERROR_PREFIX = 'Error getting missing player or bypassing age-restriction: '
video_info = urllib.parse.parse_qs(video_info_page) video_info = urllib.parse.parse_qs(video_info_page)
player_response = deep_get(video_info, 'player_response', 0) player_response = deep_get(video_info, 'player_response', 0)
@ -603,7 +686,9 @@ def requires_decryption(info):
# adapted from youtube-dl and invidious: # adapted from youtube-dl and invidious:
# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr # https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}{]+)return a\.join\(""\)\}') decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}{]+)return a\.join\(""\)\}')
op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)') # gives us e.g. rt, .xK, 5 from rt.xK(a,5) or rt, ["xK"], 5 from rt["xK"](a,5)
# (var, operation, argument)
var_op_arg_re = re.compile(r'(\w+)(\.\w+|\["[^"]+"\])\(a,(\d+)\)')
def extract_decryption_function(info, base_js): def extract_decryption_function(info, base_js):
'''Insert decryption function into info. Return error string if not successful. '''Insert decryption function into info. Return error string if not successful.
Decryption function is a list of list[2] of numbers. Decryption function is a list of list[2] of numbers.
@ -617,10 +702,11 @@ def extract_decryption_function(info, base_js):
if not function_body: if not function_body:
return 'Empty decryption function body' return 'Empty decryption function body'
var_name = get(function_body[0].split('.'), 0) var_with_operation_match = var_op_arg_re.fullmatch(function_body[0])
if var_name is None: if var_with_operation_match is None:
return 'Could not find var_name' return 'Could not find var_name'
var_name = var_with_operation_match.group(1)
var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
if var_body_match is None: if var_body_match is None:
return 'Could not find var_body' return 'Could not find var_body'
@ -649,13 +735,13 @@ def extract_decryption_function(info, base_js):
decryption_function = [] decryption_function = []
for op_with_arg in function_body: for op_with_arg in function_body:
match = op_with_arg_re.fullmatch(op_with_arg) match = var_op_arg_re.fullmatch(op_with_arg)
if match is None: if match is None:
return 'Could not parse operation with arg' return 'Could not parse operation with arg'
op_name = match.group(1) op_name = match.group(2).strip('[].')
if op_name not in operation_definitions: if op_name not in operation_definitions:
return 'Unknown op_name: ' + op_name return 'Unknown op_name: ' + str(op_name)
op_argument = match.group(2) op_argument = match.group(3)
decryption_function.append([operation_definitions[op_name], int(op_argument)]) decryption_function.append([operation_definitions[op_name], int(op_argument)])
info['decryption_function'] = decryption_function info['decryption_function'] = decryption_function