Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add endpoint for participants with highest measurement count #164

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions observer/test/observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ describe('observer', () => {
beforeEach(async () => {
await pgPools.evaluate.query('DELETE FROM daily_participants')
await pgPools.evaluate.query('DELETE FROM participants')
await pgPools.stats.query('DELETE FROM daily_scheduled_rewards')
await givenDailyParticipants(pgPools.evaluate, today(), ['0xCURRENT'])
await givenDailyParticipants(pgPools.evaluate, '2000-01-01', ['0xOLD'])
})
Expand Down
3 changes: 3 additions & 0 deletions stats/lib/platform-routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
fetchDailyStationCount,
fetchMonthlyStationCount,
fetchDailyRewardTransfers,
fetchTopMeasurementParticipants,
bajtos marked this conversation as resolved.
Show resolved Hide resolved
fetchDailyStationAcceptedMeasurementCount
} from './platform-stats-fetchers.js'

Expand Down Expand Up @@ -30,6 +31,8 @@ export const handlePlatformRoutes = async (req, res, pgPools) => {
await respond(pgPools.evaluate, fetchMonthlyStationCount)
} else if (req.method === 'GET' && url === '/measurements/daily') {
await respond(pgPools.evaluate, fetchDailyStationAcceptedMeasurementCount)
} else if (req.method === 'GET' && url === '/participants/top-measurements') {
await respond(pgPools.evaluate, fetchTopMeasurementParticipants)
} else if (req.method === 'GET' && url === '/transfers/daily') {
await respond(pgPools.stats, fetchDailyRewardTransfers)
} else {
Expand Down
18 changes: 18 additions & 0 deletions stats/lib/platform-stats-fetchers.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from 'http-assert'
import { getDailyDistinctCount, getMonthlyDistinctCount } from './request-helpers.js'

/** @typedef {import('@filecoin-station/spark-stats-db').Queryable} Queryable */
Expand Down Expand Up @@ -43,6 +44,23 @@ export const fetchDailyStationAcceptedMeasurementCount = async (pgPool, filter)
return rows
}

/**
* @param {Queryable} pgPool
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchTopMeasurementParticipants = async (pgPool, filter) => {
assert(filter.to === filter.from, 400, 'Multi-day queries are not supported for this endpoint')
const yesterdayUTC = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString().split('T')[0]
assert(filter.to === yesterdayUTC, 400, 'filter.to must be set to yesterday, other values are not supported yet')
// Ignore the filter for this query
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
// Get the top measurement stations from the Materialized View
return (await pgPool.query('SELECT * FROM top_measurement_participants_yesterday_mv')).rows
}

/**
* @param {Queryable} pgPool
* @param {import('./typings.js').DateRangeFilter} filter
*/
export const fetchDailyRewardTransfers = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
SELECT day::TEXT, SUM(amount) as amount
Expand Down
113 changes: 94 additions & 19 deletions stats/test/platform-routes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { getPgPools } from '@filecoin-station/spark-stats-db'
import { assertResponseStatus, getPort } from './test-helpers.js'
import { createHandler } from '../lib/handler.js'

const STATION_STATS = { stationId: 'station1', participantAddress: 'f1abcdef', inetGroup: 'group1' }

const debug = createDebug('test')

describe('Platform Routes HTTP request handler', () => {
Expand Down Expand Up @@ -42,23 +44,25 @@ describe('Platform Routes HTTP request handler', () => {

beforeEach(async () => {
await pgPools.evaluate.query('DELETE FROM daily_stations')
await pgPools.evaluate.query('REFRESH MATERIALIZED VIEW top_measurement_participants_yesterday_mv')

await pgPools.stats.query('DELETE FROM daily_reward_transfers')
})

describe('GET /stations/daily', () => {
it('returns daily station metrics for the given date range', async () => {
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [
{ stationId: 'station2', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, stationId: 'station2', acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [
{ stationId: 'station2', acceptedMeasurementCount: 2 },
{ stationId: 'station3', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, stationId: 'station2', acceptedMeasurementCount: 2 },
{ ...STATION_STATS, stationId: 'station3', acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-13', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])

const res = await fetch(
Expand All @@ -82,25 +86,25 @@ describe('Platform Routes HTTP request handler', () => {
it('returns monthly station metrics for the given date range ignoring the day number', async () => {
// before the date range
await givenDailyStationMetrics(pgPools.evaluate, '2023-12-31', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])
// in the date range
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [
{ stationId: 'station2', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, stationId: 'station2', acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [
{ stationId: 'station2', acceptedMeasurementCount: 2 },
{ stationId: 'station3', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, stationId: 'station2', acceptedMeasurementCount: 2 },
{ ...STATION_STATS, stationId: 'station3', acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-02-13', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])
// after the date range
await givenDailyStationMetrics(pgPools.evaluate, '2024-03-01', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])

const res = await fetch(
Expand All @@ -123,17 +127,17 @@ describe('Platform Routes HTTP request handler', () => {
describe('GET /measurements/daily', () => {
it('returns daily total accepted measurement count for the given date range', async () => {
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [
{ stationId: 'station2', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, stationId: 'station2', acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [
{ stationId: 'station2', acceptedMeasurementCount: 2 },
{ stationId: 'station3', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, stationId: 'station2', acceptedMeasurementCount: 2 },
{ ...STATION_STATS, stationId: 'station3', acceptedMeasurementCount: 1 }
])
await givenDailyStationMetrics(pgPools.evaluate, '2024-01-13', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
{ ...STATION_STATS, acceptedMeasurementCount: 1 }
])

const res = await fetch(
Expand All @@ -153,6 +157,64 @@ describe('Platform Routes HTTP request handler', () => {
})
})

describe('GET /participants/top-measurements', () => {
it('returns top measurement stations for the given date', async () => {
const today = new Date()
const yesterday = new Date()
yesterday.setDate(today.getDate() - 1)

const todayUTC = today.toISOString().split('T')[0]
const yesterdayUTC = yesterday.toISOString().split('T')[0]

await givenDailyStationMetrics(pgPools.evaluate, yesterdayUTC, [
{ ...STATION_STATS, stationId: 's3', participantAddress: 'f1ghijkl', acceptedMeasurementCount: 50 },
{ ...STATION_STATS, acceptedMeasurementCount: 20 },
{ ...STATION_STATS, stationId: 's2', acceptedMeasurementCount: 30 },
{ ...STATION_STATS, stationId: 's2', inetGroup: 'group2', acceptedMeasurementCount: 40 }
])
await givenDailyStationMetrics(pgPools.evaluate, todayUTC, [
{ ...STATION_STATS, acceptedMeasurementCount: 10 }
])

await pgPools.evaluate.query('REFRESH MATERIALIZED VIEW top_measurement_participants_yesterday_mv')

const res = await fetch(
new URL(
`/participants/top-measurements?from=${yesterdayUTC}&to=${yesterdayUTC}`,
baseUrl
), {
redirect: 'manual'
}
)
await assertResponseStatus(res, 200)
const metrics = await res.json()
assert.deepStrictEqual(metrics, [{
participant_address: STATION_STATS.participantAddress,
inet_group_count: '2',
station_count: '2',
accepted_measurement_count: '90'
},
{
participant_address: 'f1ghijkl',
inet_group_count: '1',
station_count: '1',
accepted_measurement_count: '50'
}])
})

it('returns 400 if the date range is more than one day', async () => {
const res = await fetch(
new URL(
'/participants/top-measurements?from=2024-01-11&to=2024-01-12',
baseUrl
), {
redirect: 'manual'
}
)
await assertResponseStatus(res, 400)
})
})

describe('GET /transfers/daily', () => {
it('returns daily total Rewards sent for the given date range', async () => {
await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-10', [
Expand Down Expand Up @@ -189,12 +251,25 @@ describe('Platform Routes HTTP request handler', () => {

const givenDailyStationMetrics = async (pgPoolEvaluate, day, stationStats) => {
await pgPoolEvaluate.query(`
INSERT INTO daily_stations (day, station_id, accepted_measurement_count)
SELECT $1 AS day, UNNEST($2::text[]) AS station_id, UNNEST($3::int[]) AS accepted_measurement_count
INSERT INTO daily_stations (
day,
station_id,
participant_address,
inet_group,
accepted_measurement_count
)
SELECT
$1 AS day,
UNNEST($2::text[]) AS station_id,
UNNEST($3::text[]) AS participant_address,
UNNEST($4::text[]) AS inet_group,
UNNEST($5::int[]) AS accepted_measurement_count
ON CONFLICT DO NOTHING
`, [
day,
stationStats.map(s => s.stationId),
stationStats.map(s => s.participantAddress),
stationStats.map(s => s.inetGroup),
stationStats.map(s => s.acceptedMeasurementCount)
])
}
Expand Down
Loading