2020-10-07 19:52:16 +05:30
from youtube_data import proto
import json
import base64
import urllib
import requests
import re
import bleach
from flask import Markup
URL_ORIGIN = " /https://www.youtube.com "
def make_comment_ctoken ( video_id , sort = 0 , offset = 0 , lc = ' ' , secret_key = ' ' ) :
video_id = proto . as_bytes ( video_id )
secret_key = proto . as_bytes ( secret_key )
page_info = proto . string ( 4 , video_id ) + proto . uint ( 6 , sort )
offset_information = proto . nested ( 4 , page_info ) + proto . uint ( 5 , offset )
if secret_key :
offset_information = proto . string ( 1 , secret_key ) + offset_information
page_params = proto . string ( 2 , video_id )
if lc :
page_params + = proto . string ( 6 , proto . percent_b64encode ( proto . string ( 15 , lc ) ) )
result = proto . nested ( 2 , page_params ) + proto . uint ( 3 , 6 ) + proto . nested ( 6 , offset_information )
return base64 . urlsafe_b64encode ( result ) . decode ( ' ascii ' )
def comment_replies_ctoken ( video_id , comment_id , max_results = 500 ) :
params = proto . string ( 2 , comment_id ) + proto . uint ( 9 , max_results )
params = proto . nested ( 3 , params )
result = proto . nested ( 2 , proto . string ( 2 , video_id ) ) + proto . uint ( 3 , 6 ) + proto . nested ( 6 , params )
return base64 . urlsafe_b64encode ( result ) . decode ( ' ascii ' )
mobile_headers = {
' User-Agent ' : ' Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1 ' ,
' Accept ' : ' */* ' ,
' Accept-Language ' : ' en-US,en;q=0.5 ' ,
' X-YouTube-Client-Name ' : ' 2 ' ,
' X-YouTube-Client-Version ' : ' 2.20180823 ' ,
}
def request_comments ( ctoken , replies = False ) :
if replies : # let's make it use different urls for no reason despite all the data being encoded
base_url = " https://m.youtube.com/watch_comment?action_get_comment_replies=1&ctoken= "
else :
base_url = " https://m.youtube.com/watch_comment?action_get_comments=1&ctoken= "
url = base_url + ctoken . replace ( " = " , " % 3D " ) + " &pbj=1 "
for i in range ( 0 , 8 ) : # don't retry more than 8 times
content = requests . get ( url , headers = mobile_headers ) . text
if content [ 0 : 4 ] == b " )]} ' " : # random closing characters included at beginning of response for some reason
content = content [ 4 : ]
elif content [ 0 : 10 ] == b ' \n <!DOCTYPE ' : # occasionally returns html instead of json for no reason
content = b ' '
print ( " got <!DOCTYPE>, retrying " )
continue
break
polymer_json = json . loads ( content )
return polymer_json
def single_comment_ctoken ( video_id , comment_id ) :
page_params = proto . string ( 2 , video_id ) + proto . string ( 6 , proto . percent_b64encode ( proto . string ( 15 , comment_id ) ) )
result = proto . nested ( 2 , page_params ) + proto . uint ( 3 , 6 )
return base64 . urlsafe_b64encode ( result ) . decode ( ' ascii ' )
def concat_texts ( strings ) :
''' Concatenates strings. Returns None if any of the arguments are None '''
result = ' '
for string in strings :
if string [ ' text ' ] is None :
return None
result + = string [ ' text ' ]
return result
def parse_comment ( raw_comment ) :
cmnt = { }
2020-10-07 20:19:42 +05:30
print ( raw_comment )
2020-10-07 19:52:16 +05:30
raw_comment = raw_comment [ ' commentThreadRenderer ' ] [ ' comment ' ] [ ' commentRenderer ' ]
2020-10-07 19:56:45 +05:30
imgHostName = urllib . parse . urlparse ( raw_comment [ ' authorThumbnail ' ] [ ' thumbnails ' ] [ 0 ] [ ' url ' ] ) . netloc
2020-10-07 19:52:16 +05:30
cmnt [ ' author ' ] = raw_comment [ ' authorText ' ] [ ' runs ' ] [ 0 ] [ ' text ' ]
2020-10-07 19:56:45 +05:30
cmnt [ ' thumbnail ' ] = raw_comment [ ' authorThumbnail ' ] [ ' thumbnails ' ] [ 0 ] [ ' url ' ] . replace ( " https:// {} " . format ( imgHostName ) , " " ) + " ?host= " + imgHostName
2020-10-07 19:52:16 +05:30
cmnt [ ' channel ' ] = raw_comment [ ' authorEndpoint ' ] [ ' commandMetadata ' ] [ ' webCommandMetadata ' ] [ ' url ' ]
cmnt [ ' text ' ] = Markup ( bleach . linkify ( concat_texts ( raw_comment [ ' contentText ' ] [ ' runs ' ] ) . replace ( " \n " , " <br> " ) ) )
cmnt [ ' date ' ] = raw_comment [ ' publishedTimeText ' ] [ ' runs ' ] [ 0 ] [ ' text ' ]
2020-10-07 20:19:42 +05:30
try :
cmnt [ ' creatorHeart ' ] = raw_comment [ ' creatorHeart ' ] [ ' creatorHeartRenderer ' ] [ ' creatorThumbnail ' ] [ ' thumbnails ' ] [ 0 ] [ ' url ' ]
except :
cmnt [ ' creatorHeart ' ] = False
2020-10-07 19:52:16 +05:30
try :
cmnt [ ' likes ' ] = raw_comment [ ' likeCount ' ]
except :
cmnt [ ' likes ' ] = 0
try :
cmnt [ ' replies ' ] = raw_comment [ ' replyCount ' ]
except :
cmnt [ ' replies ' ] = 0
cmnt [ ' authorIsChannelOwner ' ] = raw_comment [ ' authorIsChannelOwner ' ]
try :
cmnt [ ' pinned ' ] = raw_comment [ ' pinnedCommentBadge ' ]
cmnt [ ' pinned ' ] = True
except :
cmnt [ ' pinned ' ] = False
return cmnt
def post_process_comments_info ( comments_info ) :
comments = [ ]
for comment in comments_info [ 1 ] [ ' response ' ] [ ' continuationContents ' ] [ ' commentSectionContinuation ' ] [ ' items ' ] :
comments . append ( parse_comment ( comment ) )
return comments
def video_comments ( video_id , sort = 0 , offset = 0 , lc = ' ' , secret_key = ' ' ) :
comments_info = request_comments ( make_comment_ctoken ( video_id , sort , offset , lc , secret_key ) )
comments_info = post_process_comments_info ( comments_info )
return comments_info
return { }