Fix search results sometimes didnt show up

This commit is contained in:
pluja 2020-10-10 18:08:49 +02:00
parent 7a71b6914a
commit fb2d3a962b
3 changed files with 146 additions and 240 deletions

View File

@ -28,10 +28,9 @@ from youtube_search import YoutubeSearch
from app import app, db
from app.forms import LoginForm, RegistrationForm, EmptyForm, SearchForm, ChannelForm
from app.models import User, twitterPost, ytPost, Post, youtubeFollow, twitterFollow
from youtube import comments, utils
from youtube import comments, utils, search as yts
from youtube import watch as ytwatch
#########################################
from youtube_data import search as yts
#########################################
@ -270,7 +269,6 @@ def u(username):
return render_template('user.html', posts=posts, user=user, form=form, config=config)
#########################
#### Youtube Logic ######
#########################
@ -326,12 +324,15 @@ def ytsearch():
else:
prev_page = "/ytsearch?q={q}&s={s}&p={p}".format(q=query, s=sort, p=int(page) - 1)
for video in results['videos']:
hostname = urllib.parse.urlparse(video['videoThumb']).netloc
video['videoThumb'] = video['videoThumb'].replace("https://{}".format(hostname), "") + "&host=" + hostname
for channel in results['channels']:
if config['nginxVideoStream']:
channel['thumbnail'] = channel['thumbnail'].replace("~", "/")
hostName = urllib.parse.urlparse(channel['thumbnail']).netloc
channel['thumbnail'] = channel['thumbnail'].replace("https://{}".format(hostName),
"") + "?host=" + hostName
channel['thumbnail'] = channel['thumbnail'].replace("https://{}".format(hostName),"") + "?host=" + hostName
return render_template('ytsearch.html', form=form, btform=button_form, results=results,
restricted=config['restrictPublicUsage'], config=config, npage=next_page,
ppage=prev_page)

View File

@ -1,38 +1,10 @@
from youtube import proto
from youtube import utils
from flask import Markup
import urllib.parse
import requests
import base64
import json
import urllib
import flask
from flask import request
from werkzeug.exceptions import abort
from youtube import util, yt_data_extract, proto
from youtube import yt_app
# Sort: 1
# Upload date: 2
# View count: 3
# Rating: 1
# Relevance: 0
# Offset: 9
# Filters: 2
# Upload date: 1
# Type: 2
# Duration: 3
features = {
'4k': 14,
'hd': 4,
'hdr': 25,
'subtitles': 5,
'creative_commons': 6,
'3d': 7,
'live': 8,
'purchased': 9,
'360': 15,
'location': 23,
}
def page_number_to_sp_parameter(page, autocorrect, sort, filters):
offset = (int(page) - 1)*20 # 20 results per page
@ -41,8 +13,8 @@ def page_number_to_sp_parameter(page, autocorrect, sort, filters):
result = proto.uint(1, sort) + filters_enc + autocorrect + proto.uint(9, offset) + proto.string(61, b'')
return base64.urlsafe_b64encode(result).decode('ascii')
def get_search_json(query, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(query)
def search_by_terms(search_terms, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(search_terms)
headers = {
'Host': 'www.youtube.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
@ -52,54 +24,145 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
content = util.fetch_url(url, headers=headers, report_text="Got search results", debug_name='search_results')
content = requests.get(url, headers=headers).text
info = json.loads(content)
return info
videos = get_videos_from_search(info)
channels = get_channels_from_search(info)
results = {
"videos": videos,
"channels": channels
}
return results
@yt_app.route('/search')
def get_search_page():
if len(request.args) == 0:
return flask.render_template('base.html', title="Search")
def get_channels_from_search(search):
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
contents = primaryContents['sectionListRenderer']['contents']
if 'query' not in request.args:
abort(400)
for content in contents:
try:
items = content['itemSectionRenderer']['contents']
except:
continue
query = request.args.get("query")
page = request.args.get("page", "1")
autocorrect = int(request.args.get("autocorrect", "1"))
sort = int(request.args.get("sort", "0"))
filters = {}
filters['time'] = int(request.args.get("time", "0"))
filters['type'] = int(request.args.get("type", "0"))
filters['duration'] = int(request.args.get("duration", "0"))
polymer_json = get_search_json(query, page, autocorrect, sort, filters)
for item in items:
try:
item['channelRenderer']
channel = get_channel_renderer_item_info(item['channelRenderer'])
results.append(channel)
except KeyError:
continue
return results
search_info = yt_data_extract.extract_search_info(polymer_json)
if search_info['error']:
return flask.render_template('error.html', error_message = search_info['error'])
def get_channel_renderer_item_info(item):
try:
suscribers = item['subscriberCountText']['simpleText'].split(" ")[0]
except:
suscribers = "?"
try:
description = utils.get_description_snippet_text(item['descriptionSnippet']['runs'])
except KeyError:
description = ""
for extract_item_info in search_info['items']:
util.prefix_urls(extract_item_info)
util.add_extra_html_info(extract_item_info)
try:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"thumbnail": "https:{}".format(item['thumbnail']['thumbnails'][0]['url'].replace("/", "~")),
"description": Markup(str(description)),
"suscribers": suscribers,
"videos": item['videoCountText']['runs'][0]['text']
}
except KeyError:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"avatar": item['thumbnail']['thumbnails'][0]['url'],
"suscribers": suscribers
}
return channel
corrections = search_info['corrections']
if corrections['type'] == 'did_you_mean':
corrected_query_string = request.args.to_dict(flat=False)
corrected_query_string['query'] = [corrections['corrected_query']]
corrections['corrected_query_url'] = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
elif corrections['type'] == 'showing_results_for':
no_autocorrect_query_string = request.args.to_dict(flat=False)
no_autocorrect_query_string['autocorrect'] = ['0']
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
corrections['original_query_url'] = no_autocorrect_query_url
def get_videos_from_search(search):
latest = []
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
contents = primaryContents['sectionListRenderer']['contents']
for content in contents:
try:
items = content['itemSectionRenderer']['contents']
except:
continue
for item in items:
try:
item['videoRenderer']
video = get_video_renderer_item_info(item['videoRenderer'])
results.append(video)
except KeyError:
continue
# Sometimes Youtube will return an empty query. Try again.
return results
def get_video_renderer_item_info(item):
published = ""
views = ""
isLive = False
isUpcoming = False
thumbnailOverlays = item['thumbnailOverlays']
try:
if 'UPCOMING' in str(thumbnailOverlays):
start_time = item['upcomingEventData']['startTime']
isUpcoming = True
views = "-"
published = "Scheduled"
except KeyError:
isUpcoming = False
try:
if 'LIVE' in str(thumbnailOverlays):
isLive = True
try:
views = item['viewCountText']['simpleText']
except:
views = "Live"
try:
duration = item['lengthText']['simpleText']
except:
duration = "-"
if published != "Scheduled":
try:
published = item['publishedTimeText']['simpleText']
except KeyError:
published = "None"
except:
isUpcoming = False
isLive = False
if not isUpcoming and not isLive:
views = item['viewCountText']['simpleText']
published = item['publishedTimeText']['simpleText']
duration = item['lengthText']['simpleText']
video = {
'videoTitle':item['title']['runs'][0]['text'],
'description':Markup(str(utils.get_description_snippet_text(item['descriptionSnippet']['runs']))),
'views':views,
'timeStamp':published,
'duration':duration,
'channelName':item['ownerText']['runs'][0]['text'],
'authorUrl':"/channel/{}".format(item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId']),
'channelId':item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'],
'id':item['videoId'],
'videoUrl':"/watch?v={}".format(item['videoId']),
'isLive':isLive,
'isUpcoming':isUpcoming,
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
}
return video
return flask.render_template('search.html',
header_playlist_names = local_playlist.get_playlist_names(),
query = query,
estimated_results = search_info['estimated_results'],
estimated_pages = search_info['estimated_pages'],
corrections = search_info['corrections'],
results = search_info['items'],
parameters_dictionary = request.args,
)

View File

@ -1,158 +0,0 @@
from youtube_data import proto
from youtube import utils
from flask import Markup
import urllib.parse
import requests
import base64
import json
def page_number_to_sp_parameter(page, autocorrect, sort, filters):
offset = (int(page) - 1)*20 # 20 results per page
autocorrect = proto.nested(8, proto.uint(1, 1 - int(autocorrect) ))
filters_enc = proto.nested(2, proto.uint(1, filters['time']) + proto.uint(2, filters['type']) + proto.uint(3, filters['duration']))
result = proto.uint(1, sort) + filters_enc + autocorrect + proto.uint(9, offset) + proto.string(61, b'')
return base64.urlsafe_b64encode(result).decode('ascii')
def search_by_terms(search_terms, page, autocorrect, sort, filters):
url = "https://www.youtube.com/results?search_query=" + urllib.parse.quote_plus(search_terms)
headers = {
'Host': 'www.youtube.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'X-YouTube-Client-Name': '1',
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
content = requests.get(url, headers=headers).text
info = json.loads(content)
videos = get_videos_from_search(info)
channels = get_channels_from_search(info)
results = {
"videos": videos,
"channels": channels
}
return results
def get_channels_from_search(search):
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
items = primaryContents['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents']
for item in items:
try:
item['channelRenderer']
channel = get_channel_renderer_item_info(item['channelRenderer'])
results.append(channel)
except KeyError:
continue
return results
def get_channel_renderer_item_info(item):
try:
suscribers = item['subscriberCountText']['simpleText'].split(" ")[0]
except:
suscribers = "?"
try:
description = utils.get_description_snippet_text(item['descriptionSnippet']['runs'])
except KeyError:
description = ""
try:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"thumbnail": "https:{}".format(item['thumbnail']['thumbnails'][0]['url'].replace("/", "~")),
"description": Markup(str(description)),
"suscribers": suscribers,
"videos": item['videoCountText']['runs'][0]['text']
}
except KeyError:
channel = {
"channelId": item['channelId'],
"username": item['title']['simpleText'],
"avatar": item['thumbnail']['thumbnails'][0]['url'],
"suscribers": suscribers
}
return channel
def get_videos_from_search(search):
latest = []
results = []
search = search[1]['response']
primaryContents = search['contents']['twoColumnSearchResultsRenderer']['primaryContents']
items = primaryContents['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents']
for item in items:
try:
item['videoRenderer']
video = get_video_renderer_item_info(item['videoRenderer'])
results.append(video)
except KeyError:
continue
# Sometimes Youtube will return an empty query. Try again.
return results
def get_video_renderer_item_info(item):
published = ""
views = ""
isLive = False
isUpcoming = False
thumbnailOverlays = item['thumbnailOverlays']
try:
if 'UPCOMING' in str(thumbnailOverlays):
start_time = item['upcomingEventData']['startTime']
isUpcoming = True
views = "-"
published = "Scheduled"
except KeyError:
isUpcoming = False
try:
if 'LIVE' in str(thumbnailOverlays):
isLive = True
try:
views = item['viewCountText']['simpleText']
except:
views = "Live"
try:
duration = item['lengthText']['simpleText']
except:
duration = "-"
if published != "Scheduled":
try:
published = item['publishedTimeText']['simpleText']
except KeyError:
published = "None"
except:
isUpcoming = False
isLive = False
if not isUpcoming and not isLive:
views = item['viewCountText']['simpleText']
published = item['publishedTimeText']['simpleText']
duration = item['lengthText']['simpleText']
video = {
'videoTitle':item['title']['runs'][0]['text'],
'description':Markup(str(utils.get_description_snippet_text(item['descriptionSnippet']['runs']))),
'views':views,
'timeStamp':published,
'duration':duration,
'channelName':item['ownerText']['runs'][0]['text'],
'authorUrl':"/channel/{}".format(item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId']),
'channelId':item['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'],
'id':item['videoId'],
'videoUrl':"/watch?v={}".format(item['videoId']),
'isLive':isLive,
'isUpcoming':isUpcoming,
'videoThumb':item['thumbnail']['thumbnails'][0]['url']
}
return video