-
Notifications
You must be signed in to change notification settings - Fork 5
/
benchmark_throughput.js
157 lines (141 loc) · 4.68 KB
/
benchmark_throughput.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// @ts-check
const { intoConfig, replicateLDES } = require("ldes-client");
const fs = require("fs");
import { getEndpointUrls } from "./urlSource.js";
/**
* Creates a LDES client instance for the given URL
* @param {string} url - URL to the root of an LDES.
* @returns {Object} An instance of a client capable of replicating the LDES.
*/
const createClient = (url) => {
const client = replicateLDES(
intoConfig({
url,
onlyDefaultGraph: true,
})
);
return client;
};
/**
* Measures the time needed to replicate a stream of data from a given URL.
* The login for obtaining members is based on https://github.com/pietercolpaert/ldes-benchmark/tree/main/throughput
* @param {string} url - The LDES endpoint to replicate
* @param {number} maxMembers - The maximum number of members to replicate, set to 0 to replicate all members.
* @param {number} maxDurationSeconds - Optionally, the maximum amount of time in seconds to consume members from the stream.
* @returns {Promise<Object>} - Returns an object including the `url`, `quads`, `members` and `durationSec`
*/
const replicateStrem = async (url, maxMembers, maxDurationSeconds) => {
console.log("Replicating stream from", url);
try {
const client = createClient(url);
// requeset current high-resolution time for benchmarking
const start = process.hrtime.bigint();
const reader = client.stream({ highWaterMark: 10 }).getReader();
let el = await reader.read();
let quads = 0;
let members = 0;
while (el) {
if (el.value) {
quads += el.value.quads.length;
members += 1;
}
// abort if we've reached the end of the stream or the maximum number of members
if (el.done || (maxMembers && members >= maxMembers)) {
console.info("reached end of stream or maximum number of members");
await reader.cancel();
break;
}
// abort if we've reached the maximum duration
if (
maxDurationSeconds &&
Number(process.hrtime.bigint() - start) / 1e6 / 1000 >
maxDurationSeconds
) {
console.info("reached maximum duration");
await reader.cancel();
break;
}
el = await reader.read();
}
// requeset current high-resolution time for benchmarking
const end = process.hrtime.bigint();
return {
url,
quads,
members,
durationSec: Number(end - start) / 1e6 / 1000,
};
} catch (error) {
console.log(error);
return null;
}
};
const writeResults = (items, filename) => {
const data = JSON.stringify(items, null, 2);
// Write data to file
fs.writeFile(filename, data, (err) => {
if (err) {
console.error("Error writing file", err);
} else {
console.log(`Successfully wrote file at ${filename}`);
}
});
};
export default {
async run() {
const endpoints = await getEndpointUrls();
// create objects with information we need for the dashboard
const items = endpoints.map((endpoint) => {
return {
url: endpoint.url,
title: endpoint.title,
status: "unknown",
error: null,
quads: 0,
members: 0,
durationSec: 0,
throughputQuands: 0,
throughputMembers: 0,
};
});
for (let ix = 0; ix < items.length; ix++) {
const url = items[ix].url;
try {
// wakeup the endpoint
const response = await fetch(url, { method: "GET" });
if (response.ok) {
// benchmark the stream for 10 seconds, to give servers the chance to cache the results
await replicateStrem(url, 0, 10);
// sleep for 1 minute to avoid throttling
await new Promise((resolve) => setTimeout(resolve, 60000));
// benchmark the stream for 10 seconds to get the actual results
const result = await replicateStrem(url, 0, 10);
if (!result) {
items[ix].status = "offline";
items[ix].error = "Failed to replicate stream.";
continue;
}
const item = items[ix];
items[ix] = {
...item,
...result,
throughputQuands: Number(
(result.quads / result.durationSec).toFixed(1)
),
throughputMembers: Number(
(result.members / result.durationSec).toFixed(1)
),
status: "online",
};
} else {
items[ix].status = "offline";
items[ix].error = `${response.status} ${response.statusText}`;
}
} catch (error) {
items[ix].status = "offline";
items[ix].error = `${error.cause} ${error.message}`;
}
}
writeResults(items, "./benchmarks_data/throughput.json");
},
};