Add support for feed pagination

This commit is contained in:
Kavin 2023-05-01 21:02:35 +01:00
parent 0536236657
commit 148457e5d0
No known key found for this signature in database
GPG Key ID: 49451E4482CC5BCD
2 changed files with 26 additions and 11 deletions

View File

@ -320,7 +320,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} }
})).map(GET, "/feed", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/feed", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(FeedHandlers.feedResponse(request.getQueryParameter("authToken")), return getJsonResponse(FeedHandlers.feedResponse(request.getQueryParameter("authToken"), request.getQueryParameter("start")),
"private"); "private");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e, request.getPath()); return getErrorResponse(e, request.getPath());
@ -335,7 +335,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
})).map(GET, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse( return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse(
getArray(request.getQueryParameter("channels")) getArray(request.getQueryParameter("channels")), request.getQueryParameter("start")
), "public, s-maxage=120"); ), "public, s-maxage=120");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e, request.getPath()); return getErrorResponse(e, request.getPath());
@ -344,7 +344,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
try { try {
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(), String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class); String[].class);
return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse(subscriptions), "public, s-maxage=120"); return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse(subscriptions, request.getQueryParameter("start")), "public, s-maxage=120");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e, request.getPath()); return getErrorResponse(e, request.getPath());
} }

View File

@ -93,7 +93,7 @@ public class FeedHandlers {
} }
} }
public static byte[] feedResponse(String session) throws IOException { public static byte[] feedResponse(String session, String start) throws IOException {
if (StringUtils.isBlank(session)) if (StringUtils.isBlank(session))
ExceptionHandler.throwErrorResponse(new InvalidRequestResponse("session is a required parameter")); ExceptionHandler.throwErrorResponse(new InvalidRequestResponse("session is a required parameter"));
@ -114,13 +114,20 @@ public class FeedHandlers {
subquery.select(subroot.get("subscribed_ids")) subquery.select(subroot.get("subscribed_ids"))
.where(cb.equal(subroot.get("id"), user.getId())); .where(cb.equal(subroot.get("id"), user.getId()));
var channelPredicate = root.get("channel").get("uploader_id").in(subquery);
criteria.select(root) criteria.select(root)
.where( .where(
root.get("channel").get("uploader_id").in(subquery) start == null ? channelPredicate : cb.and(
channelPredicate,
cb.lessThan(root.get("uploaded"), Long.parseLong(start))
)
) )
.orderBy(cb.desc(root.get("uploaded"))); .orderBy(cb.desc(root.get("uploaded")));
List<StreamItem> feedItems = s.createQuery(criteria).setTimeout(20).stream() List<StreamItem> feedItems = s.createQuery(criteria)
.setMaxResults(100)
.setTimeout(20).stream()
.parallel().map(video -> { .parallel().map(video -> {
var channel = video.getChannel(); var channel = video.getChannel();
@ -193,7 +200,7 @@ public class FeedHandlers {
return null; return null;
} }
public static byte[] unauthenticatedFeedResponse(String[] channelIds) throws Exception { public static byte[] unauthenticatedFeedResponse(String[] channelIds, String start) throws Exception {
Set<String> filtered = Arrays.stream(channelIds) Set<String> filtered = Arrays.stream(channelIds)
.filter(ChannelHelpers::isValidId) .filter(ChannelHelpers::isValidId)
@ -211,13 +218,20 @@ public class FeedHandlers {
var root = criteria.from(Video.class); var root = criteria.from(Video.class);
root.fetch("channel", JoinType.RIGHT); root.fetch("channel", JoinType.RIGHT);
var channelPredicate = root.get("channel").get("id").in(filtered);
criteria.select(root) criteria.select(root)
.where(cb.and( .where(
root.get("channel").get("id").in(filtered) start == null ? channelPredicate : cb.and(
)) channelPredicate,
cb.lessThan(root.get("uploaded"), Long.parseLong(start))
)
)
.orderBy(cb.desc(root.get("uploaded"))); .orderBy(cb.desc(root.get("uploaded")));
List<StreamItem> feedItems = s.createQuery(criteria).setTimeout(20).stream() List<StreamItem> feedItems = s.createQuery(criteria)
.setMaxResults(100)
.setTimeout(20).stream()
.parallel().map(video -> { .parallel().map(video -> {
var channel = video.getChannel(); var channel = video.getChannel();
@ -334,6 +348,7 @@ public class FeedHandlers {
var tr = s.beginTransaction(); var tr = s.beginTransaction();
channelIds.stream() channelIds.stream()
.filter(ChannelHelpers::isValidId)
.filter(id -> !existing.contains(id)) .filter(id -> !existing.contains(id))
.map(UnauthenticatedSubscription::new) .map(UnauthenticatedSubscription::new)
.forEach(s::insert); .forEach(s::insert);