-
Notifications
You must be signed in to change notification settings - Fork 0
/
11.txt
151 lines (125 loc) · 5.93 KB
/
11.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import {CircularQueue} from "./queue";
class SocketWorker {
private ws: WebSocket;
private requestQueue: CircularQueue;
private responseQueue: CircularQueue;
private queueIndex: number; // Worker 的队列索引
private batchCount: number = 0; // 当前批量计数
private batchThreshold: number = 100; // 批次阈值
private notifyCooldown: boolean = false; // 冷却标志位
private notifyCooldownTime: number = 500; // 冷却时间(ms)
private timeoutId: ReturnType<typeof setTimeout> | null = null;
private messageBuffer: string [] = [];
constructor(wsUrl: string, requestBuffer: SharedArrayBuffer, responseBuffer: SharedArrayBuffer, queueIndex: number) {
this.requestQueue = new CircularQueue(requestBuffer);
this.responseQueue = new CircularQueue(responseBuffer);
this.queueIndex = queueIndex;
this.ws = new WebSocket(wsUrl);
this.setupWebSocket();
this.startSendRequest();
this.startWriteResponse();
}
private setupWebSocket() {
this.ws.onmessage = (event) => {
let response: string = event.data;
this.messageBuffer.push(response);
};
this.ws.onopen = () => console.log("WebSocket connected");
this.ws.onerror = (error) => console.error("WebSocket error", error);
this.ws.onclose = () => console.warn("WebSocket closed");
}
private async startSendRequest() {
const BATCH_SIZE = 10; // 每批次处理的消息数
const DELAY_WHEN_EMPTY = 500; // 队列为空时的延时(毫秒)
const DELAY_BETWEEN_BATCHES = 10; // 批量处理后的延时(毫秒)
const processBatch = async () => {
let batch: string[] = this.requestQueue.readBatch(BATCH_SIZE)
console.log("11Received processBatch:1111", batch.length);
// 如果批量中有消息,逐条处理
if (batch.length > 0) {
for (const message of batch) {
if (this.ws.readyState === WebSocket.OPEN) {
try {
this.ws.send(JSON.stringify(message));
// console.log("Message sent:", message);
} catch (error) {
console.error("Failed to send message:", error);
}
} else {
console.warn("WebSocket not open, skipping message:", message);
}
}
// 处理完一批后,让出时间片延时一段时间
// console.log(`Processed ${batch.length} messages, pausing for ${DELAY_BETWEEN_BATCHES}ms...`);
setTimeout(processBatch, DELAY_BETWEEN_BATCHES); // 延时后继续处理
} else {
// 如果队列为空,延时再检查
//console.log("Queue is empty, waiting for new messages...");
setTimeout(processBatch, DELAY_WHEN_EMPTY);
}
};
// 启动消息处理
processBatch();
}
private async startWriteResponse() {
// const BATCH_SIZE = 100; // 每批次处理的消息数
const DELAY_WHEN_EMPTY = 50; // 队列为空时的延时(毫秒)
const DELAY_BETWEEN_BATCHES = 10; // 批量处理后的延时(毫秒)
const processBatch = async () => {
let batch: string[] = this.getBatchResponseMessage()
// 如果批量中有消息,逐条处理
if (batch.length > 0) {
this.responseQueue.writeBatch(batch);
// 处理完一批后,让出时间片延时一段时间
// console.log(`Processed ${batch.length} messages, pausing for ${DELAY_BETWEEN_BATCHES}ms...`);
setTimeout(processBatch, DELAY_BETWEEN_BATCHES); // 延时后继续处理
} else {
// 如果队列为空,延时再检查
//console.log("Queue is empty, waiting for new messages...");
setTimeout(processBatch, DELAY_WHEN_EMPTY);
}
};
// 启动消息处理
processBatch();
}
private getBatchResponseMessage(): string[] {
const batchSize = 100; // 每次批量处理的消息数量
const batchMessages: string[] = [];
while (this.messageBuffer.length > 0 && batchMessages.length < batchSize) {
batchMessages.push(this.messageBuffer.shift()!);
}
return batchMessages;
}
private notifyMainThread() {
// 准备通知主线程,清除超时任务
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
// 如果已经冷却完毕,则开始通知
if (!this.notifyCooldown) {
// 向主线程发送通知
postMessage({type: "NOTIFY", queueIndex: this.queueIndex});
// 避免频繁通知,这里要冷却一下
this.notifyCooldown = true;
// 冷却时间后解锁通知
setTimeout(() => {
this.notifyCooldown = false;
}, this.notifyCooldownTime);
}
this.batchCount = 0; // 重置批次计数
}
private startTimeoutNotification() {
// 如果当前还没有超时任务,就去创建一个,有的话忽略
if (!this.timeoutId) {
// 创建一个超时任务,超时时间 notifyCooldownTime,超过没取消这个超时任务就会通知主线程
this.timeoutId = setTimeout(() => {
this.notifyMainThread();
}, this.notifyCooldownTime);
}
}
}
self.onmessage = (e) => {
const {wsUrl, requestBuffer, responseBuffer, queueIndex} = e.data;
new SocketWorker(wsUrl, requestBuffer, responseBuffer, queueIndex);
};