Merge pull request #7214 from FineFindus/feat/increase-batch-size

perf(LocalFeedRepository): speed up local feed extraction
This commit is contained in:
FineFindus 2025-04-15 20:23:14 +02:00 committed by GitHub
commit e1b286d1e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -82,31 +82,32 @@ class LocalFeedRepository : FeedRepository {
if (channelIds.isEmpty()) return if (channelIds.isEmpty()) return
val totalExtractionCount = AtomicInteger() val totalExtractionCount = AtomicInteger()
val chunkedExtractionCount = AtomicInteger() val channelExtractionCount = AtomicInteger()
withContext(Dispatchers.Main) { withContext(Dispatchers.Main) {
onProgressUpdate(FeedProgress(0, channelIds.size)) onProgressUpdate(FeedProgress(0, channelIds.size))
} }
for (channelIdChunk in channelIds.chunked(CHUNK_SIZE)) { for (channelIdChunk in channelIds.chunked(CHUNK_SIZE)) {
// add a delay after each BATCH_SIZE amount of visited channels val count = channelExtractionCount.get();
val count = chunkedExtractionCount.get();
if (count >= BATCH_SIZE) { if (count >= BATCH_SIZE) {
delay(BATCH_DELAY.random()) // add a delay after each BATCH_SIZE amount of fully-fetched channels
chunkedExtractionCount.set(0) delay(CHANNEL_BATCH_DELAY.random())
channelExtractionCount.set(0)
} }
val collectedFeedItems = channelIdChunk.parallelMap { channelId -> val collectedFeedItems = channelIdChunk.parallelMap { channelId ->
try { try {
getRelatedStreams(channelId, minimumDateMillis) getRelatedStreams(channelId, minimumDateMillis).also {
if (it.isNotEmpty())
// increase counter if we had to fully fetch the channel
channelExtractionCount.incrementAndGet()
}
} catch (e: Exception) { } catch (e: Exception) {
Log.e(channelId, e.stackTraceToString()) Log.e(channelId, e.stackTraceToString())
null null
} finally { } finally {
chunkedExtractionCount.incrementAndGet()
val currentProgress = totalExtractionCount.incrementAndGet()
withContext(Dispatchers.Main) { withContext(Dispatchers.Main) {
onProgressUpdate(FeedProgress(currentProgress, channelIds.size)) onProgressUpdate(FeedProgress(totalExtractionCount.incrementAndGet(), channelIds.size))
} }
} }
}.filterNotNull().flatten().map(StreamItem::toFeedItem) }.filterNotNull().flatten().map(StreamItem::toFeedItem)
@ -161,7 +162,12 @@ class LocalFeedRepository : FeedRepository {
} }
companion object { companion object {
private const val CHUNK_SIZE = 2 /**
* Amount of feeds that are fetched concurrently.
*
* Should ideally be a factor of `BATCH_SIZE` to be correctly applied.
*/
private const val CHUNK_SIZE = 5
/** /**
* Maximum amount of feeds that should be fetched together, before a delay should be applied. * Maximum amount of feeds that should be fetched together, before a delay should be applied.
@ -169,9 +175,13 @@ class LocalFeedRepository : FeedRepository {
private const val BATCH_SIZE = 50 private const val BATCH_SIZE = 50
/** /**
* Millisecond delay between two consecutive batches to avoid throttling. * Millisecond delay after fetching `BATCH_SIZE` channels to avoid throttling.
*
* A channel is only counted as fetched when it had a recent upload, requiring to fetch
* the channelInfo via Innertube.
*/ */
private val BATCH_DELAY = (500L..1500L) private val CHANNEL_BATCH_DELAY = (500L..1500L)
private const val MAX_FEED_AGE_DAYS = 30L // 30 days private const val MAX_FEED_AGE_DAYS = 30L // 30 days
} }
} }