Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate daily_stations primary key #274

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions lib/platform-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,71 @@ export const updatePlatformStats = async (pgClient, honestMeasurements, allMeasu
* @param {import('./preprocess.js').Measurement[]} allMeasurements
*/
export const updateDailyStationStats = async (pgClient, honestMeasurements, allMeasurements) => {
/** @type {Map<string, {accepted: number, total: number}>} */
const measurementCountsPerStation = new Map()
/** @type{Map<string, {accepted: number, total: number}>} */
const statsPerStation = new Map()

// JSON stringify the key to make it comparable, since objects compare by reference
/** @type {(m: import('./preprocess.js').Measurement) => string} */
const getKey = (m) => JSON.stringify({
stationId: m.stationId,
participantAddress: m.participantAddress,
inet_group: m.inet_group
})

for (const m of honestMeasurements) {
if (m.stationId == null) continue

const mc = measurementCountsPerStation.get(m.stationId) ?? { accepted: 0, total: 0 }
mc.accepted += 1
measurementCountsPerStation.set(m.stationId, mc)
const key = getKey(m)
const stationStats = statsPerStation.get(key) ?? { accepted: 0, total: 0 }

stationStats.accepted += 1
statsPerStation.set(key, stationStats)
}

for (const m of allMeasurements) {
if (m.stationId == null) continue

const mc = measurementCountsPerStation.get(m.stationId) ?? { accepted: 0, total: 0 }
mc.total += 1
measurementCountsPerStation.set(m.stationId, mc)
const key = getKey(m)
const stationStats = statsPerStation.get(key) ?? { accepted: 0, total: 0 }

stationStats.total += 1
statsPerStation.set(key, stationStats)
}

debug('Updating daily station stats, station_count=%s', measurementCountsPerStation.size)
debug('Updating daily station stats, station_count=%s', statsPerStation.size)

// Convert the map to two arrays for the query
const keys = Array.from(statsPerStation.keys()).map(k => JSON.parse(k))
const values = Array.from(statsPerStation.values())

await pgClient.query(`
INSERT INTO daily_stations (
day,
station_id,
participant_address,
inet_group,
accepted_measurement_count,
total_measurement_count
)
VALUES (now(), unnest($1::text[]), unnest($2::int[]), unnest($3::int[]))
ON CONFLICT (station_id, day) DO UPDATE
VALUES (
now(),
unnest($1::text[]),
unnest($2::text[]),
unnest($3::text[]),
unnest($4::int[]),
unnest($5::int[])
)
ON CONFLICT (day, station_id, participant_address, inet_group) DO UPDATE
SET accepted_measurement_count = daily_stations.accepted_measurement_count
+ EXCLUDED.accepted_measurement_count,
total_measurement_count = daily_stations.total_measurement_count
+ EXCLUDED.total_measurement_count
`, [
Array.from(measurementCountsPerStation.keys()),
Array.from(measurementCountsPerStation.values()).map(v => v.accepted),
Array.from(measurementCountsPerStation.values()).map(v => v.total)
keys.map(k => k.stationId),
keys.map(k => k.participantAddress),
keys.map(k => k.inet_group),
values.map(v => v.accepted),
values.map(v => v.total)
])
}

Expand Down
11 changes: 11 additions & 0 deletions migrations/010.do.daily-stations-primary-key.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE daily_stations
ADD COLUMN inet_group TEXT NOT NULL DEFAULT 'pre_column_addition',
ADD COLUMN participant_address TEXT NOT NULL DEFAULT 'pre_column_addition';

-- Drop existing primary key
ALTER TABLE daily_stations
DROP CONSTRAINT daily_stations_pkey;

-- Add new primary key
ALTER TABLE daily_stations
ADD PRIMARY KEY (day, station_id, inet_group, participant_address);
3 changes: 2 additions & 1 deletion test/helpers/test-data.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD'
export const VALID_STATION_ID = '8800000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
export const VALID_INET_GROUP = 'some-group-id'

export const VALID_TASK = {
cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS',
Expand All @@ -17,7 +18,7 @@ export const VALID_MEASUREMENT = {
protocol: 'bitswap',
participantAddress: VALID_PARTICIPANT_ADDRESS,
stationId: VALID_STATION_ID,
inet_group: 'some-group-id',
inet_group: VALID_INET_GROUP,
status_code: 200,
// TODO: these fields are not part of the Measurement object yet
// timeout: false,
Expand Down
55 changes: 47 additions & 8 deletions test/platform-stats.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import { beforeEach, describe, it } from 'mocha'

import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { VALID_MEASUREMENT, VALID_STATION_ID } from './helpers/test-data.js'
import {
VALID_MEASUREMENT,
VALID_STATION_ID,
VALID_PARTICIPANT_ADDRESS,
VALID_INET_GROUP
} from './helpers/test-data.js'
import {
mapParticipantsToIds,
updateDailyParticipants,
Expand Down Expand Up @@ -54,36 +59,70 @@ describe('platform-stats', () => {

describe('updateDailyStationStats', () => {
it('updates daily station stats for today with multiple measurements', async () => {
const stationIdMeasurement1 = { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }
const stationIdMeasurement2 = { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID_2 }

/** @type {Measurement[]} */
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID_2 }
stationIdMeasurement1,
stationIdMeasurement2,
{ ...stationIdMeasurement1, participantAddress: '0x20' },
{ ...stationIdMeasurement2, inet_group: 'other-group' }
]
/** @type {Measurement[]} */
const allMeasurements = [
...honestMeasurements,
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID, fraudAssessment: 'INVALID_TASK' },
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID_2, fraudAssessment: 'INVALID_TASK' }
{ ...stationIdMeasurement1, fraudAssessment: 'INVALID_TASK' },
{ ...stationIdMeasurement2, fraudAssessment: 'INVALID_TASK' },
{ ...stationIdMeasurement1, participantAddress: '0x20', fraudAssessment: 'INVALID_TASK' },
{ ...stationIdMeasurement2, inet_group: 'other-group', fraudAssessment: 'INVALID_TASK' }
]

await updateDailyStationStats(pgClient, honestMeasurements, allMeasurements)

const { rows } = await pgClient.query(`
SELECT station_id, day::TEXT, accepted_measurement_count, total_measurement_count
FROM daily_stations
SELECT
station_id,
day::TEXT,
participant_address,
inet_group,
accepted_measurement_count,
total_measurement_count
FROM
daily_stations
ORDER BY station_id`
)
assert.strictEqual(rows.length, 2)
assert.strictEqual(rows.length, 4)
assert.deepStrictEqual(rows, [
{
day: today,
station_id: VALID_STATION_ID,
participant_address: VALID_PARTICIPANT_ADDRESS,
inet_group: VALID_INET_GROUP,
accepted_measurement_count: 1,
total_measurement_count: 2
},
{
day: today,
station_id: VALID_STATION_ID,
participant_address: '0x20',
inet_group: VALID_INET_GROUP,
accepted_measurement_count: 1,
total_measurement_count: 2
},
{
day: today,
station_id: VALID_STATION_ID_2,
participant_address: VALID_PARTICIPANT_ADDRESS,
inet_group: VALID_INET_GROUP,
accepted_measurement_count: 1,
total_measurement_count: 2
},
{
day: today,
station_id: VALID_STATION_ID_2,
participant_address: VALID_PARTICIPANT_ADDRESS,
inet_group: 'other-group',
accepted_measurement_count: 1,
total_measurement_count: 2
}
Expand Down