Skip to content

Commit

Permalink
Merge pull request #1905 from balena-io/loki-sync
Browse files Browse the repository at this point in the history
Loki: make sure to transform logs synchronously
  • Loading branch information
Page- authored Dec 16, 2024
2 parents ad6224f + 60fd99b commit f373294
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
38 changes: 22 additions & 16 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,23 @@ export class LokiBackend implements DeviceLogsBackend {
}

public async publish(
$ctx: LogContext,
ctx: LogContext,
logs: Array<DeviceLog & { version?: number }>,
): Promise<any> {
const ctx = await assertLokiLogContext($ctx);
const logEntries = this.fromDeviceLogsToEntries(ctx, logs);

const countLogs = logs.length;
incrementPublishCallTotal();
incrementPublishLogMessagesTotal(countLogs);
const stream = this.fromDeviceLogsToStream(ctx, logs);
const lokiCtx = await assertLokiLogContext(ctx);
const stream = this.fromLogEntriesToStream(lokiCtx, logEntries);
try {
await this.push(ctx, stream);
await this.push(lokiCtx, stream);
incrementPublishCallSuccessTotal();
} catch (err) {
incrementPublishCallFailedTotal();
incrementPublishLogMessagesDropped(countLogs);
let message = `Failed to publish logs for device ${ctx.uuid}`;
let message = `Failed to publish logs for device ${lokiCtx.uuid}`;
if (VERBOSE_ERROR_MESSAGE) {
message += JSON.stringify(logs, omitNanoTimestamp, '\t').substring(
0,
Expand All @@ -252,7 +254,7 @@ export class LokiBackend implements DeviceLogsBackend {
}
captureException(err, message);
throw new BadRequestError(
`Failed to publish logs for device ${ctx.uuid}`,
`Failed to publish logs for device ${lokiCtx.uuid}`,
);
}
}
Expand Down Expand Up @@ -394,14 +396,11 @@ export class LokiBackend implements DeviceLogsBackend {
}
}

private fromDeviceLogsToStream(
ctx: LokiLogContext,
private fromDeviceLogsToEntries(
ctx: LogContext,
logs: Array<DeviceLog & { version?: number }>,
) {
const labels = this.getLabels(ctx);
const stream = new loki.StreamAdapter();
stream.setLabels(labels);
for (const log of logs) {
return logs.map((log) => {
this.validateLog(log);
log.version = VERSION;
const timestamp = new loki.Timestamp();
Expand All @@ -411,13 +410,20 @@ export class LokiBackend implements DeviceLogsBackend {
const logJson = JSON.stringify(log, omitNanoTimestamp);
const structuredMetadata = this.getStructuredMetadata(ctx);
// create entry with labels, line and timestamp
const entry = new loki.EntryAdapter()
return new loki.EntryAdapter()
.setLine(logJson)
.setTimestamp(timestamp)
.setStructuredmetadataList(structuredMetadata);
// append entry to stream
stream.addEntries(entry);
}
});
}
private fromLogEntriesToStream(
ctx: LokiLogContext,
logEntries: loki.EntryAdapter[],
) {
const labels = this.getLabels(ctx);
const stream = new loki.StreamAdapter();
stream.setLabels(labels);
stream.setEntriesList(logEntries);
return stream;
}
}
6 changes: 5 additions & 1 deletion test/13_loki-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ export default () => {
createLog({ serviceId: 3 }),
];
// @ts-expect-error usage of private function
const stream = loki.fromDeviceLogsToStream(ctx, _.cloneDeep(logs));
const stream = loki.fromLogEntriesToStream(
ctx,
// @ts-expect-error usage of private function
loki.fromDeviceLogsToEntries(ctx, _.cloneDeep(logs)),
);
// @ts-expect-error usage of private function
const logsFromStream = loki.fromStreamToDeviceLogs(stream);
expect(logsFromStream).to.deep.equal(logs);
Expand Down

0 comments on commit f373294

Please sign in to comment.