Merge pull request #372 from TeamPiped/pubsub-opt

Optimize pubsub subscription query
This commit is contained in:
Kavin 2022-09-17 20:33:11 +05:30 committed by GitHub
commit bfa7bfa79a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,10 +2,10 @@ package me.kavin.piped;
import io.activej.inject.Injector;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.db.*;
import me.kavin.piped.utils.obj.db.PlaylistVideo;
import me.kavin.piped.utils.obj.db.Video;
import org.hibernate.Session;
import org.hibernate.StatelessSession;
import org.schabi.newpipe.extractor.NewPipe;
@ -17,6 +17,7 @@ import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExt
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class Main {
@ -64,41 +65,28 @@ public class Main {
public void run() {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<PubSub> criteria = cb.createQuery(PubSub.class);
var root = criteria.from(PubSub.class);
var userSq = criteria.subquery(User.class);
var userRoot = userSq.from(User.class);
userSq.select(userRoot.get("subscribed_ids"));
var subquery = criteria.subquery(UnauthenticatedSubscription.class);
var subRoot = subquery.from(UnauthenticatedSubscription.class);
subquery.select(subRoot.get("id"))
.where(cb.gt(subRoot.get("subscribedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)));
criteria.select(root)
.where(cb.and(
cb.lessThan(root.get("subbedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)),
cb.or(
root.get("id").in(userSq),
root.get("id").in(subquery)
)
)
);
final Random random = new Random();
List<PubSub> pubSubList = s.createQuery(criteria).list();
Collections.shuffle(pubSubList);
pubSubList.stream().parallel()
.forEach(pubSub -> {
if (pubSub != null)
Multithreading.runAsyncLimitedPubSub(() -> {
try {
ResponseHelper.subscribePubSub(pubSub.getId());
} catch (IOException e) {
ExceptionHandler.handle(e);
}
});
});
s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" +
"SELECT DISTINCT channel FROM users_subscribed" +
" UNION " +
"SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" +
")", String.class)
.setParameter("subbedTime", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4))
.setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY))
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableSet())
.stream()
.sorted(Comparator.comparingInt(o -> random.nextInt()))
.parallel()
.forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> {
try {
ResponseHelper.subscribePubSub(id);
} catch (IOException e) {
ExceptionHandler.handle(e);
}
}));
} catch (Exception e) {
e.printStackTrace();