Skip to content

Commit

Permalink
fix: concurrent pool handle many promises correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
shevernitskiy committed Dec 14, 2024
1 parent 5916e95 commit 7147ca1
Showing 1 changed file with 23 additions and 27 deletions.
50 changes: 23 additions & 27 deletions src/core/async-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ export class DelayQueue<T> implements AsyncQueue<T> {

export class ConcurrentPool<T> implements AsyncQueue<T> {
private stack: AsyncTask<T>[] = [];
private in_cycle = false;
private start_time = 0;
private concurrent = 0;
private timer = 0;

constructor(private size: number, private timeframe: number) {
constructor(private readonly size: number, private readonly timeframe: number) {
if (size <= 0) throw new Error("Invalid concurrent pool size");
if (timeframe <= 0) throw new Error("Invalid concurrent pool timeframe");
}
Expand All @@ -53,35 +52,32 @@ export class ConcurrentPool<T> implements AsyncQueue<T> {
return promise;
}

private resolve(): void {
if (this.in_cycle) return;
if (this.stack.length === 0) {
this.start_time = 0;
this.concurrent = 0;
private resolve() {
if (this.timer === 0) {
this.frame();
this.timer = setInterval(() => this.frame(), this.timeframe);
return;
}

const delta = Date.now() - this.start_time;

if (this.concurrent < this.size) {
if (delta > this.timeframe) {
this.start_time = 0;
this.concurrent = 0;
}

const promise = this.stack.shift();
promise?.resolve(promise.fn());

this.concurrent++;
if (this.concurrent === 1) this.start_time = Date.now();
} else {
this.in_cycle = true;
setTimeout(() => {
this.in_cycle = false;
this.start_time = 0;
this.concurrent = 0;
this.resolve();
}, this.timeframe - delta);
const item = this.stack.shift();
item?.resolve(item.fn());
return;
}
}

private frame(): void {
if (this.stack.length === 0) {
this.concurrent = 0;
clearInterval(this.timer);
this.timer = 0;
return;
}

const chunk = this.stack.splice(0, this.size);
this.concurrent = chunk.length;

chunk.map((item) => item.resolve(item.fn()));
}
}

0 comments on commit 7147ca1

Please sign in to comment.