Merge pull request #550 from TeamPiped/randomization-cleanup

Cleanup list randomization code for pubsub.
This commit is contained in:
Kavin 2023-03-11 21:02:11 +00:00 committed by GitHub
commit f8622cabeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,6 +2,7 @@ package me.kavin.piped;
import io.activej.inject.Injector; import io.activej.inject.Injector;
import io.sentry.Sentry; import io.sentry.Sentry;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import jakarta.persistence.criteria.CriteriaBuilder; import jakarta.persistence.criteria.CriteriaBuilder;
import me.kavin.piped.consts.Constants; import me.kavin.piped.consts.Constants;
import me.kavin.piped.server.ServerLauncher; import me.kavin.piped.server.ServerLauncher;
@ -81,9 +82,7 @@ public class Main {
public void run() { public void run() {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
final Random random = new Random(); List<String> channelIds = s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" +
s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" +
"SELECT DISTINCT channel FROM users_subscribed" + "SELECT DISTINCT channel FROM users_subscribed" +
" UNION " + " UNION " +
"SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" + "SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" +
@ -92,9 +91,12 @@ public class Main {
.setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY))
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableSet()) .distinct()
.stream() .collect(Collectors.toCollection(ObjectArrayList::new));
.sorted(Comparator.comparingInt(o -> random.nextInt()))
Collections.shuffle(channelIds);
channelIds.stream()
.parallel() .parallel()
.forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> {
try { try {