diff --git a/backend/agents/otel-tracing.js b/backend/agents/otel-tracing.js index 84240b6e..3789c48b 100644 --- a/backend/agents/otel-tracing.js +++ b/backend/agents/otel-tracing.js @@ -1,6 +1,8 @@ 'use strict'; const opentelemetry = require('@opentelemetry/sdk-node'); +const { MongoDBInstrumentation } = require('@opentelemetry/instrumentation-mongodb'); + const { TraceIdRatioBasedSampler } = require('@opentelemetry/sdk-trace-node'); const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-http'); const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node'); @@ -24,6 +26,9 @@ const sdk = new opentelemetry.NodeSDK({ traceExporter: new OTLPTraceExporter(), sampler: new TraceIdRatioBasedSampler(samplePercentage), instrumentations: [ + new MongoDBInstrumentation({ + // see under for available configuration + }), getNodeAutoInstrumentations({ // we recommend disabling fs autoinstrumentation since it can be noisy // and expensive during startup diff --git a/backend/controllers/uploads.js b/backend/controllers/uploads.js index a3bcb3f5..4a9b12c1 100644 --- a/backend/controllers/uploads.js +++ b/backend/controllers/uploads.js @@ -5,7 +5,7 @@ var sysStats = require("../sys_stats"); var System = require("../models/system"); var Talkgroup = require("../models/talkgroup"); var { callModel: Call } = require("../models/call"); -const { trace } = opentelemetry; +const { trace, context } = opentelemetry; const { PutObjectCommand, S3Client } = require("@aws-sdk/client-s3"); const { fromIni } = require("@aws-sdk/credential-providers"); @@ -37,70 +37,81 @@ const client = new S3Client({ exports.upload = async function (req, res, next) { const tracer = trace.getTracer("upload-service") - const parentSpan = trace.getActiveSpan(); - await tracer.startActiveSpan('upload_handler', { parent: parentSpan }, async (span) => { - try { - span.setAttribute('call.shortName', req.params.shortName.toLowerCase()); - span.setAttribute('call.talkgroup_num', req.body.talkgroup_num); - span.setAttribute('call.start_time', req.body.start_time); - - if (!req.file || ((path.extname(req.file.originalname) != '.m4a') && (path.extname(req.file.originalname) != '.mp3'))) { - console.warn("[" + req.params.shortName + "] Error file name is wrong or file does not exist"); - span.setStatus({ - code: opentelemetry.SpanStatusCode.ERROR, - message: "Invalid file or filename", - }); - res.status(500); - res.send("Error, invalid filename:\n"); - return; - } + return context.with(context.active(), async () => { + const parentSpan = trace.getActiveSpan(context.active()); + await tracer.startActiveSpan('upload_handler', { parent: parentSpan }, async (span) => { + try { + span.setAttribute('call.shortName', req.params.shortName.toLowerCase()); + span.setAttribute('call.talkgroup_num', req.body.talkgroup_num); + span.setAttribute('call.start_time', req.body.start_time); - // Extract and parse request data - const parseSpan = tracer.startSpan('parse_request_data'); - - var shortName = req.params.shortName.toLowerCase(); - var apiKey = req.body.api_key; - var talkgroupNum = parseInt(req.body.talkgroup_num); - var freq = parseFloat(req.body.freq); - var time = new Date(parseInt(req.body.start_time) * 1000); - var stopTime = new Date(); - if (req.body.stop_time) { - var stopTime = new Date(parseInt(req.body.stop_time) * 1000); - } - var startTime = req.body.start_time; - var emergency = parseInt(req.body.emergency); + if (!req.file || ((path.extname(req.file.originalname) != '.m4a') && (path.extname(req.file.originalname) != '.mp3'))) { + console.warn("[" + req.params.shortName + "] Error file name is wrong or file does not exist"); + span.setStatus({ + code: opentelemetry.SpanStatusCode.ERROR, + message: "Invalid file or filename", + }); + res.status(500); + res.send("Error, invalid filename:\n"); + return; + } - let errorCount = parseInt(req.body.error_count); - let spikeCount = parseInt(req.body.spike_count); + // Extract and parse request data - if (Number.isNaN(errorCount)) { - errorCount = 0; - } - if (Number.isNaN(spikeCount)) { - spikeCount = 0; - } - try { - var srcList = JSON.parse(req.body.source_list); - } catch (err) { - var srcList = []; - console.warn("[" + req.params.shortName + "] Error /:shortName/upload Parsing Source/Freq List - Error: " + err); - res.status(500); - res.send("Error parsing sourcelist " + err); - return; - } - parseSpan.end(); + var shortName = req.params.shortName.toLowerCase(); + var apiKey = req.body.api_key; + var talkgroupNum = parseInt(req.body.talkgroup_num); + var freq = parseFloat(req.body.freq); + var time = new Date(parseInt(req.body.start_time) * 1000); + var stopTime = new Date(); + if (req.body.stop_time) { + var stopTime = new Date(parseInt(req.body.stop_time) * 1000); + } + var startTime = req.body.start_time; + var emergency = parseInt(req.body.emergency); - let item = null; + let errorCount = parseInt(req.body.error_count); + let spikeCount = parseInt(req.body.spike_count); - // Validate that the system exists and the API key is correct - const validateSpan = tracer.startSpan("validate_system"); + if (Number.isNaN(errorCount)) { + errorCount = 0; + } + if (Number.isNaN(spikeCount)) { + spikeCount = 0; + } - try { - item = await System.findOne({ 'shortName': shortName }, ["key", "ignoreUnknownTalkgroup"]); - validateSpan.setAttribute('system.exists', !!item); + try { + var srcList = JSON.parse(req.body.source_list); + } catch (err) { + var srcList = []; + console.warn("[" + req.params.shortName + "] Error /:shortName/upload Parsing Source/Freq List - Error: " + err); + res.status(500); + res.send("Error parsing sourcelist " + err); + return; + } + let item = null; + + // Validate that the system exists and the API key is correct + await tracer.startActiveSpan('validate_system', { parent: trace.getActiveSpan(context.active()) }, async (validateSpan) => { + + try { + item = await System.findOne({ 'shortName': shortName }, ["key", "ignoreUnknownTalkgroup"]); + validateSpan.setAttribute('system.exists', !!item); + + } catch (err) { + validateSpan.recordException(err); + validateSpan.setStatus({ + code: opentelemetry.SpanStatusCode.ERROR, + message: "Error validating system: " + err.message, + }); + } finally { + validateSpan.end(); + } + }); + if (!item) { validateSpan.setStatus({ code: opentelemetry.SpanStatusCode.ERROR, @@ -122,167 +133,158 @@ exports.upload = async function (req, res, next) { res.send("API Keys do not match!\n"); return; } - } catch (err) { - validateSpan.recordException(err); - validateSpan.setStatus({ - code: opentelemetry.SpanStatusCode.ERROR, - message: "Error validating system: " + err.message, - }); - res.status(500).send("Error, invalid shortName:\n" + err); - return; - } finally { - validateSpan.end(); - } - - // Blocking sensitive talkgroups - const sensitiveSpan = tracer.startSpan("check_sensitive_talkgroups"); - if ((shortName == "hennearmer") && ((talkgroupNum == 3421) || (talkgroupNum == 3423))) { - sensitiveSpan.setStatus({ - code: opentelemetry.SpanStatusCode.ERROR, - message: "Sensitive Talkgroup", - }); - res.status(200).end(); - return; - } - sensitiveSpan.end(); - if (item.ignoreUnknownTalkgroup == true) { - const talkgroupSpan = tracer.startSpan("check_talkgroup_exists"); - talkgroupExists = await Talkgroup.exists({ - 'shortName': shortName, - 'num': talkgroupNum - }); - if (!talkgroupExists) { - try { - fs.unlinkSync(req.file.path) - //file removed - } catch (err) { - console.error("[" + call.shortName + "] error deleting: " + req.file.path); - } - res.status(500); - res.send("Talkgroup does not exist, skipping.\n"); + // Blocking sensitive talkgroups + if ((shortName == "hennearmer") && ((talkgroupNum == 3421) || (talkgroupNum == 3423))) { + sensitiveSpan.setStatus({ + code: opentelemetry.SpanStatusCode.ERROR, + message: "Sensitive Talkgroup", + }); + res.status(200).end(); return; } - talkgroupSpan.end(); - } - res.status(200).end(); - - // Prepare call object - let call; - const prepareSpan = tracer.startSpan('prepare_call_object'); - - var local_path = "/" + shortName + "/" + time.getFullYear() + "/" + (time.getMonth() + 1) + "/" + time.getDate() + "/"; - var object_key = "media/" + shortName + "-" + talkgroupNum + "-" + startTime + path.extname(req.file.originalname); - - var endpoint = s3_endpoint; - var bucket = s3_bucket; - var url = s3_public_url + "/" + object_key; - - call = new Call({ - shortName: shortName, - talkgroupNum: talkgroupNum, - objectKey: object_key, - endpoint: endpoint, - bucket: bucket, - time: time, - name: talkgroupNum + "-" + startTime + ".m4a", - freq: freq, - errorCount: errorCount, - spikeCount: spikeCount, - url: url, - emergency: emergency, - path: local_path, - srcList: srcList, - len: -1 - }); + if (item.ignoreUnknownTalkgroup == true) { + talkgroupExists = await Talkgroup.exists({ + 'shortName': shortName, + 'num': talkgroupNum + }); - try { - if (req.body.call_length) { - call.len = parseFloat(req.body.call_length); - } else { - call.len = (stopTime - time) / 1000; + if (!talkgroupExists) { + try { + fs.unlinkSync(req.file.path) + //file removed + } catch (err) { + console.error("[" + call.shortName + "] error deleting: " + req.file.path); + } + res.status(500); + res.send("Talkgroup does not exist, skipping.\n"); + return; + } } - } catch (err) { - console.error(err); - } - prepareSpan.setAttribute('call.object_key', object_key); - prepareSpan.setAttribute('call.url', url); - prepareSpan.end(); - let fileContent; - // Upload file to S3 - const uploadSpan = tracer.startSpan('upload_to_s3'); - try { - fileContent = fs.readFileSync(req.file.path); - //s3Src = fs.createReadStream(req.file.path); - - const command = new PutObjectCommand({ - Bucket: s3_bucket, - Key: object_key, - Body: fileContent, - ACL: 'public-read' - }); - await client.send(command); + res.status(200).end(); - } catch (err) { - uploadSpan.recordException(err); - uploadSpan.setStatus({ - code: opentelemetry.SpanStatusCode.ERROR, - message: "Upload Error: " + err.message, + // Prepare call object + let call; + //const prepareSpan = tracer.startSpan('prepare_call_object'); + + var local_path = "/" + shortName + "/" + time.getFullYear() + "/" + (time.getMonth() + 1) + "/" + time.getDate() + "/"; + var object_key = "media/" + shortName + "-" + talkgroupNum + "-" + startTime + path.extname(req.file.originalname); + + var endpoint = s3_endpoint; + var bucket = s3_bucket; + var url = s3_public_url + "/" + object_key; + + call = new Call({ + shortName: shortName, + talkgroupNum: talkgroupNum, + objectKey: object_key, + endpoint: endpoint, + bucket: bucket, + time: time, + name: talkgroupNum + "-" + startTime + ".m4a", + freq: freq, + errorCount: errorCount, + spikeCount: spikeCount, + url: url, + emergency: emergency, + path: local_path, + srcList: srcList, + len: -1 }); - console.warn(`[${call.shortName}] Upload Error: ${err}`); - } finally { - uploadSpan.end(); - } - // Save call to database - const saveSpan = tracer.startSpan('save_call'); - try { - await call.save(); + try { + if (req.body.call_length) { + call.len = parseFloat(req.body.call_length); + } else { + call.len = (stopTime - time) / 1000; + } + } catch (err) { + console.error(err); + } + // prepareSpan.setAttribute('call.object_key', object_key); + // prepareSpan.setAttribute('call.url', url); + // prepareSpan.end(); + + let fileContent; + // Upload file to S3 + await tracer.startActiveSpan('upload_to_s3', { parent: trace.getActiveSpan(context.active()) }, async (uploadSpan) => { + try { + fileContent = fs.readFileSync(req.file.path); + //s3Src = fs.createReadStream(req.file.path); + + const command = new PutObjectCommand({ + Bucket: s3_bucket, + Key: object_key, + Body: fileContent, + ACL: 'public-read' + }); + await client.send(command); + + } catch (err) { + uploadSpan.recordException(err); + uploadSpan.setStatus({ + code: opentelemetry.SpanStatusCode.ERROR, + message: "Upload Error: " + err.message, + }); + console.warn(`[${call.shortName}] Upload Error: ${err}`); + } finally { + uploadSpan.end(); + } + }); + // Save call to database + await tracer.startActiveSpan('save_call', { parent: trace.getActiveSpan(context.active()) }, async (saveSpan) => { + try { + await call.save(); + } catch (err) { + saveSpan.recordException(err); + saveSpan.setStatus({ + code: opentelemetry.SpanStatusCode.ERROR, + message: "Error saving call: " + err.message, + }); + console.warn(`[${call.shortName}] Error saving call: ${err}`); + } finally { + saveSpan.end(); + } + }); + + // Update system stats sysStats.addCall(call.toObject()); + // we only want to notify clients if the clip is longer than 1 second. if (call.len >= 1) { req.call = call.toObject(); next(); } - } catch (err) { - saveSpan.recordException(err); - saveSpan.setStatus({ - code: opentelemetry.SpanStatusCode.ERROR, - message: "Error saving call: " + err.message, + + // Clean up temporary file + await tracer.startActiveSpan('cleanup_temp_file', { parent: trace.getActiveSpan(context.active()) }, async (cleanupSpan) => { + try { + fs.unlinkSync(req.file.path); + cleanupSpan.setAttribute('file.deleted', true); + } catch (err) { + cleanupSpan.recordException(err); + console.warn("There was an Error deleting: " + req.file.path); + } finally { + cleanupSpan.end(); + } }); - console.warn(`[${call.shortName}] Error saving call: ${err}`); - } finally { - saveSpan.end(); - } - // Clean up temporary file - const cleanupSpan = tracer.startSpan('cleanup_temp_file'); - try { - fs.unlinkSync(req.file.path); - cleanupSpan.setAttribute('file.deleted', true); - } catch (err) { - cleanupSpan.recordException(err); - console.warn("There was an Error deleting: " + req.file.path); + } catch (error) { + console.error("Error processing call upload: " + error); + span.recordException(error); + span.setStatus({ + code: opentelemetry.SpanStatusCode.ERROR, + message: error.message, + }); } finally { - cleanupSpan.end(); + span.end(); } - - - } catch (error) { - console.error("Error processing call upload: " + error); - span.recordException(error); - span.setStatus({ - code: opentelemetry.SpanStatusCode.ERROR, - message: error.message, - }); - } finally { - span.end(); - } + }); }); }; diff --git a/backend/package-lock.json b/backend/package-lock.json index bfa52fb5..ac76ffb3 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -15,6 +15,7 @@ "@honeycombio/opentelemetry-node": "^0.7.2", "@opentelemetry/api": "^1.7.0", "@opentelemetry/auto-instrumentations-node": "^0.41.1", + "@opentelemetry/instrumentation-mongodb": "^0.46.0", "@opentelemetry/resource-detector-docker": "^0.1.2", "@smithy/node-http-handler": "^3.1.4", "adm-zip": "^0.5.10", @@ -1743,6 +1744,22 @@ "@opentelemetry/api": "^1.0.0" } }, + "node_modules/@opentelemetry/auto-instrumentations-node/node_modules/@opentelemetry/instrumentation-mongodb": { + "version": "0.39.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation-mongodb/-/instrumentation-mongodb-0.39.0.tgz", + "integrity": "sha512-m9dMj39pcCshzlfCEn2lGrlNo7eV5fb9pGBnPyl/Am9Crh7Or8vOqvByCNd26Dgf5J978zTdLGF+6tM8j1WOew==", + "dependencies": { + "@opentelemetry/instrumentation": "^0.48.0", + "@opentelemetry/sdk-metrics": "^1.9.1", + "@opentelemetry/semantic-conventions": "^1.0.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, "node_modules/@opentelemetry/auto-instrumentations-node/node_modules/@opentelemetry/otlp-exporter-base": { "version": "0.48.0", "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-exporter-base/-/otlp-exporter-base-0.48.0.tgz", @@ -2508,13 +2525,43 @@ } }, "node_modules/@opentelemetry/instrumentation-mongodb": { - "version": "0.39.0", - "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation-mongodb/-/instrumentation-mongodb-0.39.0.tgz", - "integrity": "sha512-m9dMj39pcCshzlfCEn2lGrlNo7eV5fb9pGBnPyl/Am9Crh7Or8vOqvByCNd26Dgf5J978zTdLGF+6tM8j1WOew==", + "version": "0.46.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation-mongodb/-/instrumentation-mongodb-0.46.0.tgz", + "integrity": "sha512-VF/MicZ5UOBiXrqBslzwxhN7TVqzu1/LN/QDpkskqM0Zm0aZ4CVRbUygL8d7lrjLn15x5kGIe8VsSphMfPJzlA==", "dependencies": { - "@opentelemetry/instrumentation": "^0.48.0", + "@opentelemetry/instrumentation": "^0.52.0", "@opentelemetry/sdk-metrics": "^1.9.1", - "@opentelemetry/semantic-conventions": "^1.0.0" + "@opentelemetry/semantic-conventions": "^1.22.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "node_modules/@opentelemetry/instrumentation-mongodb/node_modules/@opentelemetry/api-logs": { + "version": "0.52.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api-logs/-/api-logs-0.52.1.tgz", + "integrity": "sha512-qnSqB2DQ9TPP96dl8cDubDvrUyWc0/sK81xHTK8eSUspzDM3bsewX903qclQFvVhgStjRWdC5bLb3kQqMkfV5A==", + "dependencies": { + "@opentelemetry/api": "^1.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@opentelemetry/instrumentation-mongodb/node_modules/@opentelemetry/instrumentation": { + "version": "0.52.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation/-/instrumentation-0.52.1.tgz", + "integrity": "sha512-uXJbYU/5/MBHjMp1FqrILLRuiJCs3Ofk0MeRDk8g1S1gD47U8X3JnSwcMO1rtRo1x1a7zKaQHaoYu49p/4eSKw==", + "dependencies": { + "@opentelemetry/api-logs": "0.52.1", + "@types/shimmer": "^1.0.2", + "import-in-the-middle": "^1.8.1", + "require-in-the-middle": "^7.1.1", + "semver": "^7.5.2", + "shimmer": "^1.2.1" }, "engines": { "node": ">=14" @@ -2523,6 +2570,17 @@ "@opentelemetry/api": "^1.3.0" } }, + "node_modules/@opentelemetry/instrumentation-mongodb/node_modules/import-in-the-middle": { + "version": "1.11.0", + "resolved": "https://registry.npmjs.org/import-in-the-middle/-/import-in-the-middle-1.11.0.tgz", + "integrity": "sha512-5DimNQGoe0pLUHbR9qK84iWaWjjbsxiqXnw6Qz64+azRgleqv9k2kTt5fw7QsOpmaGYtuxxursnPPsnTKEx10Q==", + "dependencies": { + "acorn": "^8.8.2", + "acorn-import-attributes": "^1.9.5", + "cjs-module-lexer": "^1.2.2", + "module-details-from-path": "^1.0.3" + } + }, "node_modules/@opentelemetry/instrumentation-mongoose": { "version": "0.35.0", "resolved": "https://registry.npmjs.org/@opentelemetry/instrumentation-mongoose/-/instrumentation-mongoose-0.35.0.tgz", @@ -3584,6 +3642,14 @@ "acorn": "^8" } }, + "node_modules/acorn-import-attributes": { + "version": "1.9.5", + "resolved": "https://registry.npmjs.org/acorn-import-attributes/-/acorn-import-attributes-1.9.5.tgz", + "integrity": "sha512-n02Vykv5uA3eHGM/Z2dQrcD56kL8TyDb2p1+0P83PClMnC/nc+anbQRhIOWnSq4Ke/KvDPrY3C9hDtC/A3eHnQ==", + "peerDependencies": { + "acorn": "^8" + } + }, "node_modules/adm-zip": { "version": "0.5.10", "resolved": "https://registry.npmjs.org/adm-zip/-/adm-zip-0.5.10.tgz", diff --git a/backend/package.json b/backend/package.json index 504231a3..53a2b579 100644 --- a/backend/package.json +++ b/backend/package.json @@ -9,6 +9,7 @@ "@honeycombio/opentelemetry-node": "^0.7.2", "@opentelemetry/api": "^1.7.0", "@opentelemetry/auto-instrumentations-node": "^0.41.1", + "@opentelemetry/instrumentation-mongodb": "^0.46.0", "@opentelemetry/resource-detector-docker": "^0.1.2", "@smithy/node-http-handler": "^3.1.4", "adm-zip": "^0.5.10",