import 'dart:async' as async; import 'dart:collection'; class ResourceLoadLimiter { ResourceLoadLimiter(int maxConcurrent) : maxConcurrent = _validateMaxConcurrent(maxConcurrent); final int maxConcurrent; final Queue<_QueuedLoadBase> _queue = Queue<_QueuedLoadBase>(); var _active = 0; int get activeCount => _active; int get pendingCount => _queue.length; Future run(Future Function() task) { final queued = _QueuedLoad(task); _queue.add(queued); _pump(); return queued.completer.future; } void clearPending() { while (_queue.isNotEmpty) { _queue.removeFirst().cancel(); } } void _pump() { while (_active < maxConcurrent && _queue.isNotEmpty) { final queued = _queue.removeFirst(); _active++; async.unawaited(_runQueued(queued)); } } Future _runQueued(_QueuedLoadBase queued) async { try { await queued.run(); } finally { _active--; _pump(); } } static int _validateMaxConcurrent(int value) { if (value < 1) { throw ArgumentError.value(value, 'maxConcurrent', 'must be >= 1'); } return value; } } abstract class _QueuedLoadBase { Future run(); void cancel(); } class _QueuedLoad implements _QueuedLoadBase { _QueuedLoad(this._task); final Future Function() _task; final completer = async.Completer(); @override Future run() async { if (completer.isCompleted) { return; } try { completer.complete(await _task()); } catch (error, stackTrace) { completer.completeError(error, stackTrace); } } @override void cancel() { if (!completer.isCompleted) { completer.completeError(const ResourceLoadCancelledException()); } } } class ResourceLoadCancelledException implements Exception { const ResourceLoadCancelledException(); @override String toString() => 'ResourceLoadCancelledException'; }