Fix bugs and implement more of unauthenticated subs.

This commit is contained in:
Kavin 2022-07-31 23:15:28 +05:30
parent 02067518b0
commit 9a45e33fdb
No known key found for this signature in database
GPG Key ID: 49451E4482CC5BCD
7 changed files with 128 additions and 56 deletions

View File

@ -29,6 +29,8 @@ DISABLE_RYD:false
DISABLE_SERVER:false
# Disable the inclusion of LBRY streams
DISABLE_LBRY:false
# How long should unauthenticated subscriptions last for
SUBSCRIPTIONS_EXPIRY:30
# Hibernate properties
hibernate.connection.url:jdbc:postgresql://postgres:5432/piped
hibernate.connection.driver_class:org.postgresql.Driver

View File

@ -5,10 +5,7 @@ import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.db.PlaylistVideo;
import me.kavin.piped.utils.obj.db.PubSub;
import me.kavin.piped.utils.obj.db.User;
import me.kavin.piped.utils.obj.db.Video;
import me.kavin.piped.utils.obj.db.*;
import org.hibernate.Session;
import org.hibernate.StatelessSession;
import org.schabi.newpipe.extractor.NewPipe;
@ -69,10 +66,17 @@ public class Main {
CriteriaQuery<PubSub> criteria = cb.createQuery(PubSub.class);
var root = criteria.from(PubSub.class);
var userRoot = criteria.from(User.class);
var subquery = criteria.subquery(UnauthenticatedSubscription.class);
var subRoot = subquery.from(UnauthenticatedSubscription.class);
subquery.select(subRoot.get("id"))
.where(cb.gt(subRoot.get("subscribedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)));
criteria.select(root)
.where(cb.and(
cb.lessThan(root.get("subbedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)),
cb.isMember(root.get("id"), userRoot.<Collection<String>>get("subscribed_ids"))
.where(cb.or(
cb.and(
cb.lessThan(root.get("subbedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)),
cb.isMember(root.get("id"), userRoot.<Collection<String>>get("subscribed_ids"))
),
root.get("id").in(subquery)
));
List<PubSub> pubSubList = s.createQuery(criteria).list();

View File

@ -266,6 +266,22 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(GET, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
try {
return getJsonResponse(ResponseHelper.unauthenticatedFeedResponse(
Objects.requireNonNull(request.getQueryParameter("channels")).split(",")
), "public, s-maxage=120");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(GET, "/feed/unauthenticated/rss", AsyncServlet.ofBlocking(executor, request -> {
try {
return getRawResponse(ResponseHelper.unauthenticatedFeedResponseRSS(
Objects.requireNonNull(request.getQueryParameter("channels")).split(",")
), "application/atom+xml", "public, s-maxage=120");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(POST, "/import", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),

View File

@ -59,6 +59,8 @@ public class Constants {
public static final boolean DISABLE_LBRY;
public static final int SUBSCRIPTIONS_EXPIRY;
public static final String VERSION;
public static final ObjectMapper mapper = new ObjectMapper().addMixIn(Page.class, PageMixin.class);
@ -90,6 +92,7 @@ public class Constants {
DISABLE_RYD = Boolean.parseBoolean(getProperty(prop, "DISABLE_RYD", "false"));
DISABLE_SERVER = Boolean.parseBoolean(getProperty(prop, "DISABLE_SERVER", "false"));
DISABLE_LBRY = Boolean.parseBoolean(getProperty(prop, "DISABLE_LBRY", "false"));
SUBSCRIPTIONS_EXPIRY = Integer.parseInt(getProperty(prop, "SUBSCRIPTIONS_EXPIRY", "30"));
System.getenv().forEach((key, value) -> {
if (key.startsWith("hibernate"))
hibernateProperties.put(key, value);

View File

@ -20,7 +20,7 @@ public class DatabaseSessionFactory {
sessionFactory = configuration.addAnnotatedClass(User.class).addAnnotatedClass(Channel.class)
.addAnnotatedClass(Video.class).addAnnotatedClass(PubSub.class).addAnnotatedClass(Playlist.class)
.addAnnotatedClass(PlaylistVideo.class).buildSessionFactory();
.addAnnotatedClass(PlaylistVideo.class).addAnnotatedClass(UnauthenticatedSubscription.class).buildSessionFactory();
}
public static Session createSession() {

View File

@ -962,8 +962,50 @@ public class ResponseHelper {
Set<String> filtered = Arrays.stream(channelIds)
.filter(StringUtils::isNotBlank)
.filter(StringUtils::isAlphanumeric)
.filter(id -> id.startsWith("UC"))
.filter(id -> id.matches("[A-Za-z\\d_-]+"))
.collect(Collectors.toUnmodifiableSet());
if (filtered.isEmpty())
return mapper.writeValueAsBytes(Collections.EMPTY_LIST);
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
CriteriaBuilder cb = s.getCriteriaBuilder();
// Get all videos from subscribed channels, with channel info
CriteriaQuery<Video> criteria = cb.createQuery(Video.class);
var root = criteria.from(Video.class);
root.fetch("channel", JoinType.INNER);
criteria.select(root)
.where(cb.and(
root.get("channel").get("id").in(filtered)
))
.orderBy(cb.desc(root.get("uploaded")));
List<StreamItem> feedItems = new ObjectArrayList<>();
for (Video video : s.createQuery(criteria).setTimeout(20).list()) {
var channel = video.getChannel();
feedItems.add(new StreamItem("/watch?v=" + video.getId(), video.getTitle(),
rewriteURL(video.getThumbnail()), channel.getUploader(), "/channel/" + channel.getUploaderId(),
rewriteURL(channel.getUploaderAvatar()), null, null, video.getDuration(), video.getViews(),
video.getUploaded(), channel.isVerified()));
}
updateSubscribedTime(filtered);
addMissingChannels(filtered);
return mapper.writeValueAsBytes(feedItems);
}
}
public static byte[] unauthenticatedFeedResponseRSS(String[] channelIds) throws Exception {
Set<String> filtered = Arrays.stream(channelIds)
.filter(StringUtils::isNotBlank)
.filter(id -> id.matches("[A-Za-z\\d_-]+"))
.collect(Collectors.toUnmodifiableSet());
if (filtered.isEmpty())
@ -981,7 +1023,7 @@ public class ResponseHelper {
criteria.select(root)
.where(cb.and(
root.get("channel").in(filtered)
root.get("channel").get("id").in(filtered)
))
.orderBy(cb.desc(root.get("uploaded")));
@ -1021,62 +1063,67 @@ public class ResponseHelper {
feed.setEntries(entries);
updateSubscribedTime(filtered);
addMissingChannels(filtered);
return new SyndFeedOutput().outputString(feed).getBytes(UTF_8);
}
}
public static byte[] unauthenticatedFeedResponseRSS(String[] channelIds) throws Exception {
Set<String> filtered = Arrays.stream(channelIds)
.filter(StringUtils::isNotBlank)
.filter(StringUtils::isAlphanumeric)
.filter(id -> id.startsWith("UC"))
.collect(Collectors.toUnmodifiableSet());
if (filtered.isEmpty())
return mapper.writeValueAsBytes(Collections.EMPTY_LIST);
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
CriteriaBuilder cb = s.getCriteriaBuilder();
// Get all videos from subscribed channels, with channel info
CriteriaQuery<Video> criteria = cb.createQuery(Video.class);
var root = criteria.from(Video.class);
root.fetch("channel", JoinType.INNER);
criteria.select(root)
.where(cb.and(
root.get("channel").in(filtered)
))
.orderBy(cb.desc(root.get("uploaded")));
List<StreamItem> feedItems = new ObjectArrayList<>();
for (Video video : s.createQuery(criteria).setTimeout(20).list()) {
var channel = video.getChannel();
feedItems.add(new StreamItem("/watch?v=" + video.getId(), video.getTitle(),
rewriteURL(video.getThumbnail()), channel.getUploader(), "/channel/" + channel.getUploaderId(),
rewriteURL(channel.getUploaderAvatar()), null, null, video.getDuration(), video.getViews(),
video.getUploaded(), channel.isVerified()));
}
updateSubscribedTime(filtered);
return mapper.writeValueAsBytes(feedItems);
}
}
private static void updateSubscribedTime(Collection<String> channelIds) {
Multithreading.runAsync(() -> {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
var tr = s.beginTransaction();
var cb = s.getCriteriaBuilder();
var cu = cb.createCriteriaUpdate(UnauthenticatedSubscription.class);
var root = cu.getRoot();
cu.where(root.get("id").in(channelIds)).set(root.get("lastSubscribed"), System.currentTimeMillis());
cu.where(root.get("id").in(channelIds))
.set(root.get("subscribedAt"), System.currentTimeMillis())
.where(cb.lt(root.get("subscribedAt"), System.currentTimeMillis() - (TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY) / 2)));
s.createMutationQuery(cu).executeUpdate();
tr.commit();
} catch (Exception e) {
ExceptionHandler.handle(e);
}
});
}
private static void addMissingChannels(Collection<String> channelIds) {
Multithreading.runAsyncLimited(() -> {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
var cb = s.getCriteriaBuilder();
{
var query = cb.createQuery();
var root = query.from(UnauthenticatedSubscription.class);
query.select(root.get("id"))
.where(root.get("id").in(channelIds));
List<Object> existing = s.createQuery(query).setTimeout(20).list();
var tr = s.beginTransaction();
channelIds.stream()
.filter(id -> !existing.contains(id))
.map(UnauthenticatedSubscription::new)
.forEach(s::insert);
tr.commit();
}
{
var query = cb.createQuery();
var root = query.from(me.kavin.piped.utils.obj.db.Channel.class);
query.select(root.get("id"))
.where(root.get("id").in(channelIds));
List<Object> existing = s.createQuery(query).setTimeout(20).list();
channelIds.stream()
.filter(id -> !existing.contains(id))
.forEach(id -> Multithreading.runAsyncLimited(() -> {
saveChannel(id);
}));
}
} catch (Exception e) {
ExceptionHandler.handle(e);
}

View File

@ -12,7 +12,7 @@ public class UnauthenticatedSubscription {
public UnauthenticatedSubscription() {
}
public UnauthenticatedSubscription(String id, String channelId, long subscribedAt) {
public UnauthenticatedSubscription(String id) {
this.id = id;
this.subscribedAt = System.currentTimeMillis();
}