using WorkerCore = Sonex.Library.WorkersCore.Worker; namespace Sonex.Worker.WebSync; internal sealed class WebSyncProductLinksProcessor { private readonly WebSyncProductTaskRunner _taskRunner; public WebSyncProductLinksProcessor(WebSyncProductTaskRunner taskRunner) { _taskRunner = taskRunner; } public async Task ProcessAsync( IReadOnlyList productLinks, string targetImagesPath, bool updateImages, WebSyncCollectionSnapshot collectionSnapshot, int maxParallelTasks, int retryCount, int downloadTimeoutSeconds, int retryDelaySeconds, WebSyncRunReport runReport, Func waitIfPaused, CancellationToken cancellationToken) { var progress = new WorkerProgressTracker(productLinks.Count); var processor = new SingleProductProcessor( _taskRunner, targetImagesPath, updateImages, collectionSnapshot, retryCount, downloadTimeoutSeconds, retryDelaySeconds, runReport, progress, waitIfPaused); await ProcessInParallelAsync( productLinks, maxParallelTasks, processor.ProcessAsync, waitIfPaused, cancellationToken).ConfigureAwait(false); } private static async ValueTask ProcessWithPauseAsync( string productUrl, CancellationToken cancellationToken, Func processor, Func waitIfPaused) { await waitIfPaused(cancellationToken).ConfigureAwait(false); await processor(cancellationToken).ConfigureAwait(false); } private static Task ProcessInParallelAsync( IReadOnlyList productLinks, int maxParallelTasks, Func processor, Func waitIfPaused, CancellationToken cancellationToken) { var options = new ParallelOptions { CancellationToken = cancellationToken, MaxDegreeOfParallelism = maxParallelTasks }; return Parallel.ForEachAsync( productLinks, options, (productUrl, token) => ProcessWithPauseAsync( productUrl, token, ct => processor(productUrl, ct), waitIfPaused)); } private sealed class WorkerProgressTracker { private readonly int _totalTasks; private int _processedTasks; public WorkerProgressTracker(int totalTasks) { _totalTasks = totalTasks; } public int TotalTasks => _totalTasks; public bool TryAdvance(out int doneTasks) { doneTasks = Interlocked.Increment(ref _processedTasks); return doneTasks == _totalTasks || doneTasks % 100 == 0; } } private sealed class SingleProductProcessor { private readonly WebSyncProductTaskRunner _taskRunner; private readonly string _targetImagesPath; private readonly bool _updateImages; private readonly WebSyncCollectionSnapshot _collectionSnapshot; private readonly int _retryCount; private readonly int _downloadTimeoutSeconds; private readonly int _retryDelaySeconds; private readonly WebSyncRunReport _runReport; private readonly WorkerProgressTracker _progress; private readonly Func _waitIfPaused; public SingleProductProcessor( WebSyncProductTaskRunner taskRunner, string targetImagesPath, bool updateImages, WebSyncCollectionSnapshot collectionSnapshot, int retryCount, int downloadTimeoutSeconds, int retryDelaySeconds, WebSyncRunReport runReport, WorkerProgressTracker progress, Func waitIfPaused) { _taskRunner = taskRunner; _targetImagesPath = targetImagesPath; _updateImages = updateImages; _collectionSnapshot = collectionSnapshot; _retryCount = retryCount; _downloadTimeoutSeconds = downloadTimeoutSeconds; _retryDelaySeconds = retryDelaySeconds; _runReport = runReport; _progress = progress; _waitIfPaused = waitIfPaused; } public async ValueTask ProcessAsync(string productUrl, CancellationToken cancellationToken) { await _taskRunner.ExecuteWithRetryAsync( productUrl, _targetImagesPath, _updateImages, _collectionSnapshot, _retryCount, _downloadTimeoutSeconds, _retryDelaySeconds, _runReport, _waitIfPaused, cancellationToken).ConfigureAwait(false); if (_progress.TryAdvance(out int doneTasks)) { WorkerCore.UpdateProgress(doneTasks, _progress.TotalTasks); } } } }