From 60fd99bb80dde1bc7755b2af6a2ea37c4aa939c3 Mon Sep 17 00:00:00 2001 From: Pagan Gazzard Date: Mon, 16 Dec 2024 20:51:40 +0000 Subject: [PATCH] Loki: make sure to transform logs synchronously Change-type: patch --- src/features/device-logs/lib/backends/loki.ts | 38 +++++++++++-------- test/13_loki-backend.ts | 6 ++- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/features/device-logs/lib/backends/loki.ts b/src/features/device-logs/lib/backends/loki.ts index 85545d794..1dc9b78d8 100644 --- a/src/features/device-logs/lib/backends/loki.ts +++ b/src/features/device-logs/lib/backends/loki.ts @@ -229,21 +229,23 @@ export class LokiBackend implements DeviceLogsBackend { } public async publish( - $ctx: LogContext, + ctx: LogContext, logs: Array, ): Promise { - const ctx = await assertLokiLogContext($ctx); + const logEntries = this.fromDeviceLogsToEntries(ctx, logs); + const countLogs = logs.length; incrementPublishCallTotal(); incrementPublishLogMessagesTotal(countLogs); - const stream = this.fromDeviceLogsToStream(ctx, logs); + const lokiCtx = await assertLokiLogContext(ctx); + const stream = this.fromLogEntriesToStream(lokiCtx, logEntries); try { - await this.push(ctx, stream); + await this.push(lokiCtx, stream); incrementPublishCallSuccessTotal(); } catch (err) { incrementPublishCallFailedTotal(); incrementPublishLogMessagesDropped(countLogs); - let message = `Failed to publish logs for device ${ctx.uuid}`; + let message = `Failed to publish logs for device ${lokiCtx.uuid}`; if (VERBOSE_ERROR_MESSAGE) { message += JSON.stringify(logs, omitNanoTimestamp, '\t').substring( 0, @@ -252,7 +254,7 @@ export class LokiBackend implements DeviceLogsBackend { } captureException(err, message); throw new BadRequestError( - `Failed to publish logs for device ${ctx.uuid}`, + `Failed to publish logs for device ${lokiCtx.uuid}`, ); } } @@ -394,14 +396,11 @@ export class LokiBackend implements DeviceLogsBackend { } } - private fromDeviceLogsToStream( - ctx: LokiLogContext, + private fromDeviceLogsToEntries( + ctx: LogContext, logs: Array, ) { - const labels = this.getLabels(ctx); - const stream = new loki.StreamAdapter(); - stream.setLabels(labels); - for (const log of logs) { + return logs.map((log) => { this.validateLog(log); log.version = VERSION; const timestamp = new loki.Timestamp(); @@ -411,13 +410,20 @@ export class LokiBackend implements DeviceLogsBackend { const logJson = JSON.stringify(log, omitNanoTimestamp); const structuredMetadata = this.getStructuredMetadata(ctx); // create entry with labels, line and timestamp - const entry = new loki.EntryAdapter() + return new loki.EntryAdapter() .setLine(logJson) .setTimestamp(timestamp) .setStructuredmetadataList(structuredMetadata); - // append entry to stream - stream.addEntries(entry); - } + }); + } + private fromLogEntriesToStream( + ctx: LokiLogContext, + logEntries: loki.EntryAdapter[], + ) { + const labels = this.getLabels(ctx); + const stream = new loki.StreamAdapter(); + stream.setLabels(labels); + stream.setEntriesList(logEntries); return stream; } } diff --git a/test/13_loki-backend.ts b/test/13_loki-backend.ts index 3d792ddfc..9da4c2e62 100644 --- a/test/13_loki-backend.ts +++ b/test/13_loki-backend.ts @@ -58,7 +58,11 @@ export default () => { createLog({ serviceId: 3 }), ]; // @ts-expect-error usage of private function - const stream = loki.fromDeviceLogsToStream(ctx, _.cloneDeep(logs)); + const stream = loki.fromLogEntriesToStream( + ctx, + // @ts-expect-error usage of private function + loki.fromDeviceLogsToEntries(ctx, _.cloneDeep(logs)), + ); // @ts-expect-error usage of private function const logsFromStream = loki.fromStreamToDeviceLogs(stream); expect(logsFromStream).to.deep.equal(logs);