diff --git a/lib/platform-stats.js b/lib/platform-stats.js index 96d7e032..f144bc71 100644 --- a/lib/platform-stats.js +++ b/lib/platform-stats.js @@ -2,6 +2,14 @@ import createDebug from 'debug' const debug = createDebug('spark:platform-stats') +/** + * @param {import('pg').Client} pgClient + * @param {import('./preprocess').Measurement[]} honestMeasurements + */ +export const updatePlatformStats = async (pgClient, honestMeasurements) => { + await updateDailyNodeMetrics(pgClient, honestMeasurements) +} + /** * @param {import('pg').Client} pgClient * @param {import('./preprocess').Measurement[]} honestMeasurements @@ -11,8 +19,8 @@ export const updateDailyNodeMetrics = async (pgClient, honestMeasurements) => { for (const m of honestMeasurements) { await pgClient.query(` INSERT INTO daily_node_metrics (station_id, metric_date) - VALUES ($1, now()::date) + VALUES ($1, now()) ON CONFLICT (station_id, metric_date) DO NOTHING - `, [m.station_id]) // TODO: when we add more fields, we should update the ON CONFLICT clause to update the fields + `, [m.stationId]) // TODO: when we add more fields, we should update the ON CONFLICT clause to update the fields } } diff --git a/lib/preprocess.js b/lib/preprocess.js index 2cdaec0b..94594301 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -29,7 +29,7 @@ export class Measurement { this.end_at = parseDateTime(m.end_at) this.status_code = m.status_code this.indexerResult = pointerize(m.indexer_result) - this.station_id = pointerize(m.station_id) + this.stationId = pointerize(m.station_id) } } @@ -201,6 +201,9 @@ const assertValidMeasurement = measurement => { assert(ethers.isAddress(measurement.participantAddress), 'valid participant address required') assert(typeof measurement.inet_group === 'string', 'valid inet group required') assert(typeof measurement.finished_at === 'number', 'field `finished_at` must be set to a number') + if (measurement.stationId) { + assert(typeof measurement.stationId === 'string' && measurement.stationId.length === 64, 'stationId must be a string of 64 characters') + } } /** diff --git a/lib/public-stats.js b/lib/public-stats.js index 2ba2e6e5..bc10feb5 100644 --- a/lib/public-stats.js +++ b/lib/public-stats.js @@ -1,7 +1,7 @@ import assert from 'node:assert' import createDebug from 'debug' -import { updateDailyNodeMetrics } from './platform-stats.js' +import { updatePlatformStats } from './platform-stats.js' import { getTaskId } from './retrieval-stats.js' const debug = createDebug('spark:public-stats') @@ -32,7 +32,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements }) } await updateDailyParticipants(pgClient, participants) await updateIndexerQueryStats(pgClient, honestMeasurements) - await updateDailyNodeMetrics(pgClient, honestMeasurements) + await updatePlatformStats(pgClient, honestMeasurements) } finally { await pgClient.end() } diff --git a/test/helpers/test-data.js b/test/helpers/test-data.js index 695d99f5..76668b08 100644 --- a/test/helpers/test-data.js +++ b/test/helpers/test-data.js @@ -1,4 +1,5 @@ export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD' +export const VALID_STATION_ID = '6400000000000000000000000000000000000000000000000000000000000000' export const VALID_TASK = { cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS', @@ -13,6 +14,7 @@ export const VALID_MEASUREMENT = { provider_address: '/dns4/production-ipfs-peer.pinata.cloud/tcp/3000/ws/p2p/Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn', protocol: 'bitswap', participantAddress: VALID_PARTICIPANT_ADDRESS, + stationId: VALID_STATION_ID, inet_group: 'some-group-id', status_code: 200, timeout: false, @@ -23,8 +25,7 @@ export const VALID_MEASUREMENT = { finished_at: new Date('2023-11-01T09:00:10.000Z').getTime(), byte_length: 1024, retrievalResult: 'OK', - indexerResult: 'OK', - station_id: 'station1' + indexerResult: 'OK' } // Fraud detection is mutating the measurements parsed from JSON diff --git a/test/platform-stats.test.js b/test/platform-stats.test.js index 7b933cbc..bf19c347 100644 --- a/test/platform-stats.test.js +++ b/test/platform-stats.test.js @@ -4,7 +4,7 @@ import { beforeEach, describe, it } from 'mocha' import { DATABASE_URL } from '../lib/config.js' import { migrateWithPgClient } from '../lib/migrate.js' -import { VALID_MEASUREMENT } from './helpers/test-data.js' +import { VALID_MEASUREMENT, VALID_STATION_ID } from './helpers/test-data.js' import { updateDailyNodeMetrics } from '../lib/platform-stats.js' const createPgClient = async () => { @@ -15,15 +15,16 @@ const createPgClient = async () => { describe('platform-stats', () => { let pgClient - before(async () => { pgClient = await createPgClient() await migrateWithPgClient(pgClient) }) + let today beforeEach(async () => { await pgClient.query('DELETE FROM daily_node_metrics') await pgClient.query('BEGIN TRANSACTION') + today = await getCurrentDate() }) afterEach(async () => { @@ -34,29 +35,40 @@ describe('platform-stats', () => { await pgClient.end() }) - it('updates daily node metrics with new measurements', async () => { - const honestMeasurements = [ - { ...VALID_MEASUREMENT, station_id: 'station1' }, - { ...VALID_MEASUREMENT, station_id: 'station2' } - ] + describe('updateDailyNodeMetrics', () => { + it('updates daily node metrics for today with multiple measurements', async () => { + const validStationId2 = VALID_STATION_ID.slice(0, -1) + '1' + const honestMeasurements = [ + { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }, + { ...VALID_MEASUREMENT, stationId: validStationId2 } + ] - await updateDailyNodeMetrics(pgClient, honestMeasurements) + await updateDailyNodeMetrics(pgClient, honestMeasurements) - const { rows } = await pgClient.query('SELECT station_id FROM daily_node_metrics') - assert.strictEqual(rows.length, 2) - assert.deepStrictEqual(rows.map(row => row.station_id).sort(), ['station1', 'station2']) - }) + const { rows } = await pgClient.query('SELECT station_id, metric_date::TEXT FROM daily_node_metrics ORDER BY station_id') + assert.strictEqual(rows.length, 2) + assert.deepStrictEqual(rows, [ + { station_id: VALID_STATION_ID, metric_date: today }, + { station_id: validStationId2, metric_date: today } + ]) + }) - it('ignores duplicate measurements for the same station on the same day', async () => { - const honestMeasurements = [ - { ...VALID_MEASUREMENT, station_id: 'station1' }, - { ...VALID_MEASUREMENT, station_id: 'station1' } // Duplicate station_id - ] + it('ignores duplicate measurements for the same station on the same day', async () => { + const honestMeasurements = [ + { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }, + { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID } + ] - await updateDailyNodeMetrics(pgClient, honestMeasurements) + await updateDailyNodeMetrics(pgClient, honestMeasurements) - const { rows } = await pgClient.query('SELECT station_id FROM daily_node_metrics') - assert.strictEqual(rows.length, 1) - assert.strictEqual(rows[0].station_id, 'station1') + const { rows } = await pgClient.query('SELECT station_id, metric_date::TEXT FROM daily_node_metrics') + assert.strictEqual(rows.length, 1) + assert.deepStrictEqual(rows, [{ station_id: VALID_STATION_ID, metric_date: today }]) + }) }) + + const getCurrentDate = async () => { + const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') + return today + } }) diff --git a/test/preprocess.js b/test/preprocess.js index d99edb7e..a69c11b1 100644 --- a/test/preprocess.js +++ b/test/preprocess.js @@ -3,6 +3,7 @@ import { Point } from '../lib/telemetry.js' import assert from 'node:assert' import createDebug from 'debug' import { assertPointFieldValue, assertRecordedTelemetryPoint } from './helpers/assertions.js' +import { VALID_STATION_ID } from './helpers/test-data.js' import { RoundData } from '../lib/round.js' const debug = createDebug('test') @@ -23,6 +24,7 @@ describe('preprocess', () => { const roundIndex = 0 const measurements = [{ participant_address: 'f410ftgmzttyqi3ti4nxbvixa4byql3o5d4eo3jtc43i', + station_id: VALID_STATION_ID, spark_version: '1.2.3', inet_group: 'ig1', finished_at: '2023-11-01T09:00:00.000Z', @@ -41,6 +43,7 @@ describe('preprocess', () => { assert.deepStrictEqual(round.measurements, [ new Measurement({ participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E', + station_id: VALID_STATION_ID, spark_version: '1.2.3', inet_group: 'ig1', finished_at: '2023-11-01T09:00:00.000Z', @@ -97,6 +100,7 @@ describe('getRetrievalResult', () => { spark_version: '1.5.2', zinnia_version: '0.14.0', participant_address: 'f410fgkhpcrbmdvic52o3nivftrjxr7nzw47updmuzra', + station_id: VALID_STATION_ID, finished_at: '2023-11-01T09:42:03.246Z', timeout: false, start_at: '2023-11-01T09:40:03.393Z',