-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventWorker.js
61 lines (57 loc) · 1.83 KB
/
eventWorker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
'use strict';
var config = require('./config').config;
var util = require('util');
var queue = require('raintank-queue');
var cluster = require('cluster');
var EventDefinitions = require("./lib/eventDefinitions");
var numCPUs = config.numCPUs;
var client;
function init() {
console.log("initializing");
var eventConsumer = new queue.Consumer({
url: config.queue.url,
exchangeName: "grafana_events", //this should match the name of the exchange the producer is using.
exchangeType: "topic", // this should match the exchangeType the producer is using.
queueName: '', //leave blank for an auto generated name. recommended when creating an exclusive queue.
exclusive: true, //make the queue exclusive.
durable: false,
autoDelete: true,
queuePattern: 'EVENT.#', //match all EVENTS
retryCount: -1, // keep trying to connect forever.
handler: processEvent
});
eventConsumer.on('error', function(err) {
console.log("eventConsumer emitted fatal error.")
console.log(err);
process.exit(1);
});
}
function processEvent(message) {
var event = JSON.parse(message.content.toString());
var obj = new EventDefinitions.Model(event);
obj.save(function(err) {
if (err) {
console.log('failed to save serviceEvent.');
console.log(err);
return;
}
console.log(obj);
console.log('Event saved.');
});
};
process.on( "SIGINT", function() {
console.log('CLOSING [SIGINT]');
process.exit();
});
if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
cluster.fork();
});
} else {
init();
}