Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Created daily_node_metrics table and received/stored station_id #188

Merged
merged 8 commits into from
Apr 30, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Set up [PostgreSQL](https://www.postgresql.org/) with default settings:
- Database: spark_stats

Alternatively, set the environment variable `$DATABASE_URL` with
`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`.
`postgres://${USER}:${PASS}@${HOST}:${PORT}/${DATABASE}`.

The Postgres user and database need to exist already, and the user
needs full management permissions for the database.
Expand Down
29 changes: 29 additions & 0 deletions lib/platform-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import createDebug from 'debug'

const debug = createDebug('spark:platform-stats')

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updatePlatformStats = async (pgClient, honestMeasurements) => {
await updateDailyStationStats(pgClient, honestMeasurements)
}

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updateDailyStationStats = async (pgClient, honestMeasurements) => {
// TODO: when we add more fields, we will update the ON CONFLICT clause
// to update those fields, and we won't just use a Set for the stationIds
// which currently removes all granular measurement details
const stationIds = [...new Set(honestMeasurements.map(m => m.stationId))]
debug('Updating daily station stats, unique_count=%s', stationIds.length)

await pgClient.query(`
INSERT INTO daily_stations (station_id, day)
SELECT unnest($1::text[]), now()
ON CONFLICT (station_id, day) DO NOTHING
`, [stationIds])
}
8 changes: 8 additions & 0 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class Measurement {
this.end_at = parseDateTime(m.end_at)
this.status_code = m.status_code
this.indexerResult = pointerize(m.indexer_result)
this.stationId = pointerize(m.station_id)
}
}

Expand Down Expand Up @@ -200,6 +201,13 @@ const assertValidMeasurement = measurement => {
assert(ethers.isAddress(measurement.participantAddress), 'valid participant address required')
assert(typeof measurement.inet_group === 'string', 'valid inet group required')
assert(typeof measurement.finished_at === 'number', 'field `finished_at` must be set to a number')
if (measurement.stationId) {
assert(
typeof measurement.stationId === 'string' &&
measurement.stationId.match(/^[0-9a-fA-F]{88}$/),
'stationId must be a hex string with 88 characters'
)
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import assert from 'node:assert'
import createDebug from 'debug'

import { updatePlatformStats } from './platform-stats.js'
import { getTaskId } from './retrieval-stats.js'

const debug = createDebug('spark:public-stats')
Expand Down Expand Up @@ -31,6 +32,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements })
}
await updateDailyParticipants(pgClient, participants)
await updateIndexerQueryStats(pgClient, honestMeasurements)
await updatePlatformStats(pgClient, honestMeasurements)
} finally {
await pgClient.end()
}
Expand Down
5 changes: 5 additions & 0 deletions migrations/006.do.daily-node-metrics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE daily_stations (
day DATE NOT NULL,
station_id TEXT NOT NULL,
PRIMARY KEY (day, station_id)
)
2 changes: 2 additions & 0 deletions test/helpers/test-data.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD'
export const VALID_STATION_ID = '8800000000000000000000000000000000000000000000000000000000000000000000000000000000000000'

export const VALID_TASK = {
cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS',
Expand All @@ -13,6 +14,7 @@ export const VALID_MEASUREMENT = {
provider_address: '/dns4/production-ipfs-peer.pinata.cloud/tcp/3000/ws/p2p/Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn',
protocol: 'bitswap',
participantAddress: VALID_PARTICIPANT_ADDRESS,
stationId: VALID_STATION_ID,
inet_group: 'some-group-id',
status_code: 200,
timeout: false,
Expand Down
81 changes: 81 additions & 0 deletions test/platform-stats.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import assert from 'node:assert'
import pg from 'pg'
import { beforeEach, describe, it } from 'mocha'

import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { VALID_MEASUREMENT, VALID_STATION_ID } from './helpers/test-data.js'
import { updateDailyStationStats } from '../lib/platform-stats.js'

const createPgClient = async () => {
const pgClient = new pg.Client({ connectionString: DATABASE_URL })
await pgClient.connect()
return pgClient
}

describe('platform-stats', () => {
let pgClient
before(async () => {
pgClient = await createPgClient()
await migrateWithPgClient(pgClient)
})

let today
beforeEach(async () => {
await pgClient.query('DELETE FROM daily_stations')

// Run all tests inside a transaction to ensure `now()` always returns the same value
// See https://dba.stackexchange.com/a/63549/125312
// This avoids subtle race conditions when the tests are executed around midnight.
await pgClient.query('BEGIN TRANSACTION')
bajtos marked this conversation as resolved.
Show resolved Hide resolved
today = await getCurrentDate()
})

afterEach(async () => {
await pgClient.query('END TRANSACTION')
})

after(async () => {
await pgClient.end()
})

describe('updateDailyStationStats', () => {
it('updates daily station stats for today with multiple measurements', async () => {
const validStationId2 = VALID_STATION_ID.slice(0, -1) + '1'
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: validStationId2 }
]

await updateDailyStationStats(pgClient, honestMeasurements)

const { rows } = await pgClient.query(`
SELECT station_id, day::TEXT FROM daily_stations
ORDER BY station_id`
)
assert.strictEqual(rows.length, 2)
assert.deepStrictEqual(rows, [
{ station_id: VALID_STATION_ID, day: today },
{ station_id: validStationId2, day: today }
])
})

it('ignores duplicate measurements for the same station on the same day', async () => {
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }
]

await updateDailyStationStats(pgClient, honestMeasurements)

const { rows } = await pgClient.query('SELECT station_id, day::TEXT FROM daily_stations')
assert.strictEqual(rows.length, 1)
assert.deepStrictEqual(rows, [{ station_id: VALID_STATION_ID, day: today }])
})
})

const getCurrentDate = async () => {
const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today')
return today
}
})
4 changes: 4 additions & 0 deletions test/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Point } from '../lib/telemetry.js'
import assert from 'node:assert'
import createDebug from 'debug'
import { assertPointFieldValue, assertRecordedTelemetryPoint } from './helpers/assertions.js'
import { VALID_STATION_ID } from './helpers/test-data.js'
import { RoundData } from '../lib/round.js'

const debug = createDebug('test')
Expand All @@ -23,6 +24,7 @@ describe('preprocess', () => {
const roundIndex = 0
const measurements = [{
participant_address: 'f410ftgmzttyqi3ti4nxbvixa4byql3o5d4eo3jtc43i',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
Expand All @@ -41,6 +43,7 @@ describe('preprocess', () => {
assert.deepStrictEqual(round.measurements, [
new Measurement({
participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
Expand Down Expand Up @@ -97,6 +100,7 @@ describe('getRetrievalResult', () => {
spark_version: '1.5.2',
zinnia_version: '0.14.0',
participant_address: 'f410fgkhpcrbmdvic52o3nivftrjxr7nzw47updmuzra',
station_id: VALID_STATION_ID,
finished_at: '2023-11-01T09:42:03.246Z',
timeout: false,
start_at: '2023-11-01T09:40:03.393Z',
Expand Down