mirror of
https://github.com/TeamPiped/Piped-Backend.git
synced 2025-04-29 16:30:29 +05:30
Merge pull request #323 from TeamPiped/unauthenticated-subscriptions
Implement unauthenticated subscriptions api
This commit is contained in:
commit
ac2723e0e5
@ -29,6 +29,8 @@ DISABLE_RYD:false
|
|||||||
DISABLE_SERVER:false
|
DISABLE_SERVER:false
|
||||||
# Disable the inclusion of LBRY streams
|
# Disable the inclusion of LBRY streams
|
||||||
DISABLE_LBRY:false
|
DISABLE_LBRY:false
|
||||||
|
# How long should unauthenticated subscriptions last for
|
||||||
|
SUBSCRIPTIONS_EXPIRY:30
|
||||||
# Hibernate properties
|
# Hibernate properties
|
||||||
hibernate.connection.url:jdbc:postgresql://postgres:5432/piped
|
hibernate.connection.url:jdbc:postgresql://postgres:5432/piped
|
||||||
hibernate.connection.driver_class:org.postgresql.Driver
|
hibernate.connection.driver_class:org.postgresql.Driver
|
||||||
|
@ -5,10 +5,7 @@ import jakarta.persistence.criteria.CriteriaBuilder;
|
|||||||
import jakarta.persistence.criteria.CriteriaQuery;
|
import jakarta.persistence.criteria.CriteriaQuery;
|
||||||
import me.kavin.piped.consts.Constants;
|
import me.kavin.piped.consts.Constants;
|
||||||
import me.kavin.piped.utils.*;
|
import me.kavin.piped.utils.*;
|
||||||
import me.kavin.piped.utils.obj.db.PlaylistVideo;
|
import me.kavin.piped.utils.obj.db.*;
|
||||||
import me.kavin.piped.utils.obj.db.PubSub;
|
|
||||||
import me.kavin.piped.utils.obj.db.User;
|
|
||||||
import me.kavin.piped.utils.obj.db.Video;
|
|
||||||
import org.hibernate.Session;
|
import org.hibernate.Session;
|
||||||
import org.hibernate.StatelessSession;
|
import org.hibernate.StatelessSession;
|
||||||
import org.schabi.newpipe.extractor.NewPipe;
|
import org.schabi.newpipe.extractor.NewPipe;
|
||||||
@ -69,10 +66,17 @@ public class Main {
|
|||||||
CriteriaQuery<PubSub> criteria = cb.createQuery(PubSub.class);
|
CriteriaQuery<PubSub> criteria = cb.createQuery(PubSub.class);
|
||||||
var root = criteria.from(PubSub.class);
|
var root = criteria.from(PubSub.class);
|
||||||
var userRoot = criteria.from(User.class);
|
var userRoot = criteria.from(User.class);
|
||||||
|
var subquery = criteria.subquery(UnauthenticatedSubscription.class);
|
||||||
|
var subRoot = subquery.from(UnauthenticatedSubscription.class);
|
||||||
|
subquery.select(subRoot.get("id"))
|
||||||
|
.where(cb.gt(subRoot.get("subscribedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)));
|
||||||
criteria.select(root)
|
criteria.select(root)
|
||||||
.where(cb.and(
|
.where(cb.or(
|
||||||
|
cb.and(
|
||||||
cb.lessThan(root.get("subbedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)),
|
cb.lessThan(root.get("subbedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)),
|
||||||
cb.isMember(root.get("id"), userRoot.<Collection<String>>get("subscribed_ids"))
|
cb.isMember(root.get("id"), userRoot.<Collection<String>>get("subscribed_ids"))
|
||||||
|
),
|
||||||
|
root.get("id").in(subquery)
|
||||||
));
|
));
|
||||||
|
|
||||||
List<PubSub> pubSubList = s.createQuery(criteria).list();
|
List<PubSub> pubSubList = s.createQuery(criteria).list();
|
||||||
|
@ -266,6 +266,22 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return getErrorResponse(e, request.getPath());
|
return getErrorResponse(e, request.getPath());
|
||||||
}
|
}
|
||||||
|
})).map(GET, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
|
||||||
|
try {
|
||||||
|
return getJsonResponse(ResponseHelper.unauthenticatedFeedResponse(
|
||||||
|
Objects.requireNonNull(request.getQueryParameter("channels")).split(",")
|
||||||
|
), "public, s-maxage=120");
|
||||||
|
} catch (Exception e) {
|
||||||
|
return getErrorResponse(e, request.getPath());
|
||||||
|
}
|
||||||
|
})).map(GET, "/feed/unauthenticated/rss", AsyncServlet.ofBlocking(executor, request -> {
|
||||||
|
try {
|
||||||
|
return getRawResponse(ResponseHelper.unauthenticatedFeedResponseRSS(
|
||||||
|
Objects.requireNonNull(request.getQueryParameter("channels")).split(",")
|
||||||
|
), "application/atom+xml", "public, s-maxage=120");
|
||||||
|
} catch (Exception e) {
|
||||||
|
return getErrorResponse(e, request.getPath());
|
||||||
|
}
|
||||||
})).map(POST, "/import", AsyncServlet.ofBlocking(executor, request -> {
|
})).map(POST, "/import", AsyncServlet.ofBlocking(executor, request -> {
|
||||||
try {
|
try {
|
||||||
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
|
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
|
||||||
|
@ -59,6 +59,8 @@ public class Constants {
|
|||||||
|
|
||||||
public static final boolean DISABLE_LBRY;
|
public static final boolean DISABLE_LBRY;
|
||||||
|
|
||||||
|
public static final int SUBSCRIPTIONS_EXPIRY;
|
||||||
|
|
||||||
public static final String VERSION;
|
public static final String VERSION;
|
||||||
|
|
||||||
public static final ObjectMapper mapper = new ObjectMapper().addMixIn(Page.class, PageMixin.class);
|
public static final ObjectMapper mapper = new ObjectMapper().addMixIn(Page.class, PageMixin.class);
|
||||||
@ -90,6 +92,7 @@ public class Constants {
|
|||||||
DISABLE_RYD = Boolean.parseBoolean(getProperty(prop, "DISABLE_RYD", "false"));
|
DISABLE_RYD = Boolean.parseBoolean(getProperty(prop, "DISABLE_RYD", "false"));
|
||||||
DISABLE_SERVER = Boolean.parseBoolean(getProperty(prop, "DISABLE_SERVER", "false"));
|
DISABLE_SERVER = Boolean.parseBoolean(getProperty(prop, "DISABLE_SERVER", "false"));
|
||||||
DISABLE_LBRY = Boolean.parseBoolean(getProperty(prop, "DISABLE_LBRY", "false"));
|
DISABLE_LBRY = Boolean.parseBoolean(getProperty(prop, "DISABLE_LBRY", "false"));
|
||||||
|
SUBSCRIPTIONS_EXPIRY = Integer.parseInt(getProperty(prop, "SUBSCRIPTIONS_EXPIRY", "30"));
|
||||||
System.getenv().forEach((key, value) -> {
|
System.getenv().forEach((key, value) -> {
|
||||||
if (key.startsWith("hibernate"))
|
if (key.startsWith("hibernate"))
|
||||||
hibernateProperties.put(key, value);
|
hibernateProperties.put(key, value);
|
||||||
|
@ -20,7 +20,7 @@ public class DatabaseSessionFactory {
|
|||||||
|
|
||||||
sessionFactory = configuration.addAnnotatedClass(User.class).addAnnotatedClass(Channel.class)
|
sessionFactory = configuration.addAnnotatedClass(User.class).addAnnotatedClass(Channel.class)
|
||||||
.addAnnotatedClass(Video.class).addAnnotatedClass(PubSub.class).addAnnotatedClass(Playlist.class)
|
.addAnnotatedClass(Video.class).addAnnotatedClass(PubSub.class).addAnnotatedClass(Playlist.class)
|
||||||
.addAnnotatedClass(PlaylistVideo.class).buildSessionFactory();
|
.addAnnotatedClass(PlaylistVideo.class).addAnnotatedClass(UnauthenticatedSubscription.class).buildSessionFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Session createSession() {
|
public static Session createSession() {
|
||||||
|
@ -15,11 +15,10 @@ import jakarta.persistence.criteria.JoinType;
|
|||||||
import jakarta.persistence.criteria.Root;
|
import jakarta.persistence.criteria.Root;
|
||||||
import me.kavin.piped.consts.Constants;
|
import me.kavin.piped.consts.Constants;
|
||||||
import me.kavin.piped.ipfs.IPFS;
|
import me.kavin.piped.ipfs.IPFS;
|
||||||
|
import me.kavin.piped.utils.obj.Channel;
|
||||||
|
import me.kavin.piped.utils.obj.Playlist;
|
||||||
import me.kavin.piped.utils.obj.*;
|
import me.kavin.piped.utils.obj.*;
|
||||||
import me.kavin.piped.utils.obj.db.PlaylistVideo;
|
import me.kavin.piped.utils.obj.db.*;
|
||||||
import me.kavin.piped.utils.obj.db.PubSub;
|
|
||||||
import me.kavin.piped.utils.obj.db.User;
|
|
||||||
import me.kavin.piped.utils.obj.db.Video;
|
|
||||||
import me.kavin.piped.utils.obj.search.SearchChannel;
|
import me.kavin.piped.utils.obj.search.SearchChannel;
|
||||||
import me.kavin.piped.utils.obj.search.SearchPlaylist;
|
import me.kavin.piped.utils.obj.search.SearchPlaylist;
|
||||||
import me.kavin.piped.utils.resp.*;
|
import me.kavin.piped.utils.resp.*;
|
||||||
@ -959,6 +958,178 @@ public class ResponseHelper {
|
|||||||
return mapper.writeValueAsBytes(new AuthenticationFailureResponse());
|
return mapper.writeValueAsBytes(new AuthenticationFailureResponse());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static byte[] unauthenticatedFeedResponse(String[] channelIds) throws Exception {
|
||||||
|
|
||||||
|
Set<String> filtered = Arrays.stream(channelIds)
|
||||||
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.filter(id -> id.matches("[A-Za-z\\d_-]+"))
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
|
||||||
|
if (filtered.isEmpty())
|
||||||
|
return mapper.writeValueAsBytes(Collections.EMPTY_LIST);
|
||||||
|
|
||||||
|
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
|
||||||
|
|
||||||
|
CriteriaBuilder cb = s.getCriteriaBuilder();
|
||||||
|
|
||||||
|
// Get all videos from subscribed channels, with channel info
|
||||||
|
CriteriaQuery<Video> criteria = cb.createQuery(Video.class);
|
||||||
|
var root = criteria.from(Video.class);
|
||||||
|
root.fetch("channel", JoinType.INNER);
|
||||||
|
|
||||||
|
criteria.select(root)
|
||||||
|
.where(cb.and(
|
||||||
|
root.get("channel").get("id").in(filtered)
|
||||||
|
))
|
||||||
|
.orderBy(cb.desc(root.get("uploaded")));
|
||||||
|
|
||||||
|
List<StreamItem> feedItems = new ObjectArrayList<>();
|
||||||
|
|
||||||
|
for (Video video : s.createQuery(criteria).setTimeout(20).list()) {
|
||||||
|
var channel = video.getChannel();
|
||||||
|
|
||||||
|
feedItems.add(new StreamItem("/watch?v=" + video.getId(), video.getTitle(),
|
||||||
|
rewriteURL(video.getThumbnail()), channel.getUploader(), "/channel/" + channel.getUploaderId(),
|
||||||
|
rewriteURL(channel.getUploaderAvatar()), null, null, video.getDuration(), video.getViews(),
|
||||||
|
video.getUploaded(), channel.isVerified()));
|
||||||
|
}
|
||||||
|
|
||||||
|
updateSubscribedTime(filtered);
|
||||||
|
addMissingChannels(filtered);
|
||||||
|
|
||||||
|
return mapper.writeValueAsBytes(feedItems);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte[] unauthenticatedFeedResponseRSS(String[] channelIds) throws Exception {
|
||||||
|
|
||||||
|
Set<String> filtered = Arrays.stream(channelIds)
|
||||||
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.filter(id -> id.matches("[A-Za-z\\d_-]+"))
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
|
||||||
|
if (filtered.isEmpty())
|
||||||
|
return mapper.writeValueAsBytes(mapper.createObjectNode()
|
||||||
|
.put("error", "No valid channel IDs provided"));
|
||||||
|
|
||||||
|
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
|
||||||
|
|
||||||
|
CriteriaBuilder cb = s.getCriteriaBuilder();
|
||||||
|
|
||||||
|
// Get all videos from subscribed channels, with channel info
|
||||||
|
CriteriaQuery<Video> criteria = cb.createQuery(Video.class);
|
||||||
|
var root = criteria.from(Video.class);
|
||||||
|
root.fetch("channel", JoinType.INNER);
|
||||||
|
|
||||||
|
criteria.select(root)
|
||||||
|
.where(cb.and(
|
||||||
|
root.get("channel").get("id").in(filtered)
|
||||||
|
))
|
||||||
|
.orderBy(cb.desc(root.get("uploaded")));
|
||||||
|
|
||||||
|
List<StreamItem> feedItems = new ObjectArrayList<>();
|
||||||
|
|
||||||
|
List<Video> videos = s.createQuery(criteria)
|
||||||
|
.setTimeout(20)
|
||||||
|
.setMaxResults(100)
|
||||||
|
.list();
|
||||||
|
|
||||||
|
SyndFeed feed = new SyndFeedImpl();
|
||||||
|
feed.setFeedType("atom_1.0");
|
||||||
|
feed.setTitle("Piped - Feed");
|
||||||
|
feed.setDescription("Piped's RSS unauthenticated subscription feed.");
|
||||||
|
feed.setUri(Constants.FRONTEND_URL + "/feed");
|
||||||
|
feed.setPublishedDate(new Date());
|
||||||
|
|
||||||
|
final List<SyndEntry> entries = new ObjectArrayList<>();
|
||||||
|
|
||||||
|
for (Video video : videos) {
|
||||||
|
var channel = video.getChannel();
|
||||||
|
SyndEntry entry = new SyndEntryImpl();
|
||||||
|
|
||||||
|
SyndPerson person = new SyndPersonImpl();
|
||||||
|
person.setName(channel.getUploader());
|
||||||
|
person.setUri(Constants.FRONTEND_URL + "/channel/" + channel.getUploaderId());
|
||||||
|
|
||||||
|
entry.setAuthors(Collections.singletonList(person));
|
||||||
|
|
||||||
|
entry.setLink(Constants.FRONTEND_URL + "/watch?v=" + video.getId());
|
||||||
|
entry.setUri(Constants.FRONTEND_URL + "/watch?v=" + video.getId());
|
||||||
|
entry.setTitle(video.getTitle());
|
||||||
|
entry.setPublishedDate(new Date(video.getUploaded()));
|
||||||
|
entries.add(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
feed.setEntries(entries);
|
||||||
|
|
||||||
|
updateSubscribedTime(filtered);
|
||||||
|
addMissingChannels(filtered);
|
||||||
|
|
||||||
|
return new SyndFeedOutput().outputString(feed).getBytes(UTF_8);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void updateSubscribedTime(Collection<String> channelIds) {
|
||||||
|
Multithreading.runAsync(() -> {
|
||||||
|
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
|
||||||
|
var tr = s.beginTransaction();
|
||||||
|
var cb = s.getCriteriaBuilder();
|
||||||
|
var cu = cb.createCriteriaUpdate(UnauthenticatedSubscription.class);
|
||||||
|
var root = cu.getRoot();
|
||||||
|
cu.where(root.get("id").in(channelIds))
|
||||||
|
.set(root.get("subscribedAt"), System.currentTimeMillis())
|
||||||
|
.where(cb.lt(root.get("subscribedAt"), System.currentTimeMillis() - (TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY) / 2)));
|
||||||
|
s.createMutationQuery(cu).executeUpdate();
|
||||||
|
tr.commit();
|
||||||
|
} catch (Exception e) {
|
||||||
|
ExceptionHandler.handle(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addMissingChannels(Collection<String> channelIds) {
|
||||||
|
Multithreading.runAsyncLimited(() -> {
|
||||||
|
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
|
||||||
|
|
||||||
|
var cb = s.getCriteriaBuilder();
|
||||||
|
|
||||||
|
{
|
||||||
|
var query = cb.createQuery();
|
||||||
|
var root = query.from(UnauthenticatedSubscription.class);
|
||||||
|
query.select(root.get("id"))
|
||||||
|
.where(root.get("id").in(channelIds));
|
||||||
|
|
||||||
|
List<Object> existing = s.createQuery(query).setTimeout(20).list();
|
||||||
|
|
||||||
|
var tr = s.beginTransaction();
|
||||||
|
channelIds.stream()
|
||||||
|
.filter(id -> !existing.contains(id))
|
||||||
|
.map(UnauthenticatedSubscription::new)
|
||||||
|
.forEach(s::insert);
|
||||||
|
tr.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var query = cb.createQuery();
|
||||||
|
var root = query.from(me.kavin.piped.utils.obj.db.Channel.class);
|
||||||
|
query.select(root.get("id"))
|
||||||
|
.where(root.get("id").in(channelIds));
|
||||||
|
|
||||||
|
List<Object> existing = s.createQuery(query).setTimeout(20).list();
|
||||||
|
|
||||||
|
channelIds.stream()
|
||||||
|
.filter(id -> !existing.contains(id))
|
||||||
|
.forEach(id -> Multithreading.runAsyncLimited(() -> {
|
||||||
|
saveChannel(id);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
ExceptionHandler.handle(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public static byte[] importResponse(String session, String[] channelIds, boolean override) throws IOException {
|
public static byte[] importResponse(String session, String[] channelIds, boolean override) throws IOException {
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,38 @@
|
|||||||
|
package me.kavin.piped.utils.obj.db;
|
||||||
|
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
@Table(name = "unauthenticated_subscriptions", indexes = {
|
||||||
|
@Index(columnList = "id", name = "unauthenticated_subscriptions_id_idx"),
|
||||||
|
@Index(columnList = "subscribed_at", name = "unauthenticated_subscriptions_subscribed_at_idx")
|
||||||
|
})
|
||||||
|
public class UnauthenticatedSubscription {
|
||||||
|
|
||||||
|
public UnauthenticatedSubscription() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public UnauthenticatedSubscription(String id) {
|
||||||
|
this.id = id;
|
||||||
|
this.subscribedAt = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Id
|
||||||
|
@Column(name = "id", unique = true, nullable = false, length = 24)
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
@Column(name = "subscribed_at", nullable = false)
|
||||||
|
private long subscribedAt;
|
||||||
|
|
||||||
|
public long getSubscribedAt() {
|
||||||
|
return subscribedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSubscribedAt(long subscribedAt) {
|
||||||
|
this.subscribedAt = subscribedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
@ -26,7 +26,7 @@ curl ${CURLOPTS[@]} $HOST/playlists/PLQSoWXSpjA3-egtFq45DcUydZ885W7MTT || exit 1
|
|||||||
|
|
||||||
# Playlist Nextpage
|
# Playlist Nextpage
|
||||||
PLAYLIST_NEXTPAGE=$(curl -s -o - -f $HOST/playlists/PLQSoWXSpjA3-egtFq45DcUydZ885W7MTT | jq -r .nextpage)
|
PLAYLIST_NEXTPAGE=$(curl -s -o - -f $HOST/playlists/PLQSoWXSpjA3-egtFq45DcUydZ885W7MTT | jq -r .nextpage)
|
||||||
curl ${CURLOPTS[@]} $HOST/nextpage/playlists/UCsXVk37bltHxD1rDPwtNM8Q -G --data-urlencode "nextpage=$PLAYLIST_NEXTPAGE" || exit 1
|
curl ${CURLOPTS[@]} $HOST/nextpage/playlists/PLQSoWXSpjA3-egtFq45DcUydZ885W7MTT -G --data-urlencode "nextpage=$PLAYLIST_NEXTPAGE" || exit 1
|
||||||
|
|
||||||
# Clips
|
# Clips
|
||||||
curl ${CURLOPTS[@]} $HOST/clips/Ugkx71jS31nwsms_Cc65oi7yXF1mILflhhrO || exit 1
|
curl ${CURLOPTS[@]} $HOST/clips/Ugkx71jS31nwsms_Cc65oi7yXF1mILflhhrO || exit 1
|
||||||
@ -126,3 +126,8 @@ curl ${CURLOPTS[@]} $HOST/import/playlist -X POST -H "Content-Type: application/
|
|||||||
|
|
||||||
# Delete User Test
|
# Delete User Test
|
||||||
curl ${CURLOPTS[@]} $HOST/user/delete -X POST -H "Content-Type: application/json" -H "Authorization: $AUTH_TOKEN" -d $(jq -n --compact-output --arg password "$PASS" '{"password": $password}') || exit 1
|
curl ${CURLOPTS[@]} $HOST/user/delete -X POST -H "Content-Type: application/json" -H "Authorization: $AUTH_TOKEN" -d $(jq -n --compact-output --arg password "$PASS" '{"password": $password}') || exit 1
|
||||||
|
|
||||||
|
# Unauthenticated subscription tests
|
||||||
|
CHANNEL_IDS=UCsXVk37bltHxD1rDPwtNM8Q,UCXuqSBlHAE6Xw-yeJA0Tunw
|
||||||
|
curl ${CURLOPTS[@]} $HOST/feed/unauthenticated -G --data-urlencode "channels=$CHANNEL_IDS" || exit 1
|
||||||
|
curl ${CURLOPTS[@]} $HOST/feed/unauthenticated/rss -G --data-urlencode "channels=$CHANNEL_IDS" || exit 1
|
||||||
|
Loading…
x
Reference in New Issue
Block a user