This repository has been archived on 2022-06-28. You can view files and clone it, but cannot push or open issues or pull requests.
Yotter/youtube/yt_data_extract/common.py

471 lines
17 KiB
Python

import re
import urllib.parse
import collections
def get(object, key, default=None, types=()):
'''Like dict.get(), but returns default if the result doesn't match one of the types.
Also works for indexing lists.'''
try:
result = object[key]
except (TypeError, IndexError, KeyError):
return default
if not types or isinstance(result, types):
return result
else:
return default
def multi_get(object, *keys, default=None, types=()):
'''Like get, but try other keys if the first fails'''
for key in keys:
try:
result = object[key]
except (TypeError, IndexError, KeyError):
pass
else:
if not types or isinstance(result, types):
return result
else:
continue
return default
def deep_get(object, *keys, default=None, types=()):
'''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices.
Last argument is the default value to use in case of any IndexErrors or KeyErrors.
If types is given and the result doesn't match one of those types, default is returned'''
try:
for key in keys:
object = object[key]
except (TypeError, IndexError, KeyError):
return default
else:
if not types or isinstance(object, types):
return object
else:
return default
def multi_deep_get(object, *key_sequences, default=None, types=()):
'''Like deep_get, but can try different key sequences in case one fails.
Return default if all of them fail. key_sequences is a list of lists'''
for key_sequence in key_sequences:
_object = object
try:
for key in key_sequence:
_object = _object[key]
except (TypeError, IndexError, KeyError):
pass
else:
if not types or isinstance(_object, types):
return _object
else:
continue
return default
def liberal_update(obj, key, value):
'''Updates obj[key] with value as long as value is not None.
Ensures obj[key] will at least get a value of None, however'''
if (value is not None) or (key not in obj):
obj[key] = value
def conservative_update(obj, key, value):
'''Only updates obj if it doesn't have key or obj[key] is None'''
if obj.get(key) is None:
obj[key] = value
def concat_or_none(*strings):
'''Concatenates strings. Returns None if any of the arguments are None'''
result = ''
for string in strings:
if string is None:
return None
result += string
return result
def remove_redirect(url):
if url is None:
return None
if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking
query_string = url[url.find('?')+1: ]
return urllib.parse.parse_qs(query_string)['q'][0]
return url
youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$')
def normalize_url(url):
if url is None:
return None
match = youtube_url_re.fullmatch(url)
if match is None:
raise Exception()
return 'https://www.youtube.com' + match.group(1)
def _recover_urls(runs):
for run in runs:
url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
text = run.get('text', '')
# second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text
if url is not None and (text.startswith('http://') or text.startswith('https://')):
url = remove_redirect(url)
run['url'] = url
run['text'] = url # youtube truncates the url text, use actual url instead
def extract_str(node, default=None, recover_urls=False):
'''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)'''
if isinstance(node, str):
return node
try:
return node['simpleText']
except (KeyError, TypeError):
pass
if isinstance(node, dict) and 'runs' in node:
if recover_urls:
_recover_urls(node['runs'])
return ''.join(text_run.get('text', '') for text_run in node['runs'])
return default
def extract_formatted_text(node):
if not node:
return []
if 'runs' in node:
_recover_urls(node['runs'])
return node['runs']
elif 'simpleText' in node:
return [{'text': node['simpleText']}]
return []
def extract_int(string, default=None):
if isinstance(string, int):
return string
if not isinstance(string, str):
string = extract_str(string)
if not string:
return default
match = re.search(r'\b(\d+)\b', string.replace(',', ''))
if match is None:
return default
try:
return int(match.group(1))
except ValueError:
return default
def extract_approx_int(string):
'''e.g. "15.1M" from "15.1M subscribers"'''
if not isinstance(string, str):
string = extract_str(string)
if not string:
return None
match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', ''))
if match is None:
return None
return match.group(1)
MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'}
def extract_date(date_text):
'''Input: "Mar 9, 2019". Output: "2019-3-9"'''
if not isinstance(date_text, str):
date_text = extract_str(date_text)
if date_text is None:
return None
date_text = date_text.replace(',', '').lower()
parts = date_text.split()
if len(parts) >= 3:
month, day, year = parts[-3:]
month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
return year + '-' + month + '-' + day
return None
def check_missing_keys(object, *key_sequences):
for key_sequence in key_sequences:
_object = object
try:
for key in key_sequence:
_object = _object[key]
except (KeyError, IndexError, TypeError):
return 'Could not find ' + key
return None
def extract_item_info(item, additional_info={}):
if not item:
return {'error': 'No item given'}
type = get(list(item.keys()), 0)
if not type:
return {'error': 'Could not find type'}
item = item[type]
info = {'error': None}
if type in ('itemSectionRenderer', 'compactAutoplayRenderer'):
return extract_item_info(deep_get(item, 'contents', 0), additional_info)
if type in ('movieRenderer', 'clarificationRenderer'):
info['type'] = 'unsupported'
return info
info.update(additional_info)
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
# camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
if len(type_parts) < 2:
info['type'] = 'unsupported'
return
primary_type = type_parts[-2]
if primary_type == 'video':
info['type'] = 'video'
elif primary_type in ('playlist', 'radio', 'show'):
info['type'] = 'playlist'
elif primary_type == 'channel':
info['type'] = 'channel'
elif type == 'videoWithContextRenderer': # stupid exception
info['type'] = 'video'
primary_type = 'video'
else:
info['type'] = 'unsupported'
# videoWithContextRenderer changes it to 'headline' just to be annoying
info['title'] = extract_str(multi_get(item, 'title', 'headline'))
if primary_type != 'channel':
info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText'))
info['author_id'] = extract_str(multi_deep_get(item,
['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText'))
info['thumbnail'] = multi_deep_get(item,
['thumbnail', 'thumbnails', 0, 'url'], # videos
['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows
)
info['badges'] = []
for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()):
badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label')
if badge:
info['badges'].append(badge)
if primary_type in ('video', 'playlist'):
info['time_published'] = None
timestamp = re.search(r'(\d+ \w+ ago)',
extract_str(item.get('publishedTimeText'), default=''))
if timestamp:
info['time_published'] = timestamp.group(1)
if primary_type == 'video':
info['id'] = item.get('videoId')
info['view_count'] = extract_int(item.get('viewCountText'))
# dig into accessibility data to get view_count for videos marked as recommended, and to get time_published
accessibility_label = multi_deep_get(item,
['title', 'accessibility', 'accessibilityData', 'label'],
['headline', 'accessibility', 'accessibilityData', 'label'],
default='')
timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label)
if timestamp:
conservative_update(info, 'time_published', timestamp.group(1))
view_count = re.search(r'(\d+) views', accessibility_label.replace(',', ''))
if view_count:
conservative_update(info, 'view_count', int(view_count.group(1)))
if info['view_count']:
info['approx_view_count'] = '{:,}'.format(info['view_count'])
else:
info['approx_view_count'] = extract_approx_int(item.get('shortViewCountText'))
# handle case where it is "No views"
if not info['approx_view_count']:
if ('No views' in item.get('shortViewCountText', '')
or 'no views' in accessibility_label.lower()):
info['view_count'] = 0
info['approx_view_count'] = '0'
info['duration'] = extract_str(item.get('lengthText'))
# if it's an item in a playlist, get its index
if 'index' in item: # url has wrong index on playlist page
info['index'] = extract_int(item.get('index'))
elif 'indexText' in item:
# Current item in playlist has ▶ instead of the actual index, must
# dig into url
match = re.search(r'index=(\d+)', deep_get(item,
'navigationEndpoint', 'commandMetadata', 'webCommandMetadata',
'url', default=''))
if match is None: # worth a try then
info['index'] = extract_int(item.get('indexText'))
else:
info['index'] = int(match.group(1))
else:
info['index'] = None
elif primary_type in ('playlist', 'radio'):
info['id'] = item.get('playlistId')
info['video_count'] = extract_int(item.get('videoCount'))
elif primary_type == 'channel':
info['id'] = item.get('channelId')
info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
elif primary_type == 'show':
info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
if primary_type in ('playlist', 'channel'):
conservative_update(info, 'video_count', extract_int(item.get('videoCountText')))
for overlay in item.get('thumbnailOverlays', []):
conservative_update(info, 'duration', extract_str(deep_get(
overlay, 'thumbnailOverlayTimeStatusRenderer', 'text'
)))
# show renderers don't have videoCountText
conservative_update(info, 'video_count', extract_int(deep_get(
overlay, 'thumbnailOverlayBottomPanelRenderer', 'text'
)))
return info
def extract_response(polymer_json):
'''return response, error'''
response = multi_deep_get(polymer_json, [1, 'response'], ['response'])
if response is None:
return None, 'Failed to extract response'
else:
return response, None
_item_types = {
'movieRenderer',
'didYouMeanRenderer',
'showingResultsForRenderer',
'videoRenderer',
'compactVideoRenderer',
'compactAutoplayRenderer',
'videoWithContextRenderer',
'gridVideoRenderer',
'playlistVideoRenderer',
'playlistRenderer',
'compactPlaylistRenderer',
'gridPlaylistRenderer',
'radioRenderer',
'compactRadioRenderer',
'gridRadioRenderer',
'showRenderer',
'compactShowRenderer',
'gridShowRenderer',
'channelRenderer',
'compactChannelRenderer',
'gridChannelRenderer',
}
def _traverse_browse_renderer(renderer):
for tab in get(renderer, 'tabs', ()):
tab_renderer = multi_get(tab, 'tabRenderer', 'expandableTabRenderer')
if tab_renderer is None:
continue
if tab_renderer.get('selected', False):
return get(tab_renderer, 'content', {})
print('Could not find tab with content')
return {}
def _traverse_standard_list(renderer):
renderer_list = multi_get(renderer, 'contents', 'items', default=())
continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation')
return renderer_list, continuation
# these renderers contain one inside them
nested_renderer_dispatch = {
'singleColumnBrowseResultsRenderer': _traverse_browse_renderer,
'twoColumnBrowseResultsRenderer': _traverse_browse_renderer,
'twoColumnSearchResultsRenderer': lambda renderer: get(renderer, 'primaryContents', {}),
}
# these renderers contain a list of renderers inside them
nested_renderer_list_dispatch = {
'sectionListRenderer': _traverse_standard_list,
'itemSectionRenderer': _traverse_standard_list,
'gridRenderer': _traverse_standard_list,
'playlistVideoListRenderer': _traverse_standard_list,
'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None),
}
def get_nested_renderer_list_function(key):
if key in nested_renderer_list_dispatch:
return nested_renderer_list_dispatch[key]
elif key.endswith('Continuation'):
return _traverse_standard_list
return None
def extract_items_from_renderer(renderer, item_types=_item_types):
ctoken = None
items = []
iter_stack = collections.deque()
current_iter = iter(())
while True:
# mode 1: get a new renderer by iterating.
# goes down the stack for an iterator if one has been exhausted
if not renderer:
try:
renderer = current_iter.__next__()
except StopIteration:
try:
current_iter = iter_stack.pop()
except IndexError:
return items, ctoken
# Get new renderer or check that the one we got is good before
# proceeding to mode 2
continue
# mode 2: dig into the current renderer
key, value = list(renderer.items())[0]
# the renderer is an item
if key in item_types:
items.append(renderer)
# has a list in it, add it to the iter stack
elif get_nested_renderer_list_function(key):
renderer_list, cont = get_nested_renderer_list_function(key)(value)
if renderer_list:
iter_stack.append(current_iter)
current_iter = iter(renderer_list)
if cont:
ctoken = cont
# new renderer nested inside this one
elif key in nested_renderer_dispatch:
renderer = nested_renderer_dispatch[key](value)
continue # don't reset renderer to None
renderer = None
def extract_items(response, item_types=_item_types):
'''return items, ctoken'''
if 'continuationContents' in response:
# sometimes there's another, empty, junk [something]Continuation key
# find real one
for key, renderer_cont in get(response,
'continuationContents', {}).items():
# e.g. commentSectionContinuation, playlistVideoListContinuation
if key.endswith('Continuation'):
items, cont = extract_items_from_renderer({key: renderer_cont},
item_types=item_types)
if items:
return items, cont
return [], None
elif 'contents' in response:
renderer = get(response, 'contents', {})
return extract_items_from_renderer(renderer, item_types=item_types)
else:
return [], None