-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
48 lines (43 loc) · 1.48 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
module.exports = function delayer(channel, opts) {
opts = opts || {};
opts.separator = opts.separator || '.';
opts.prefix = opts.prefix || 'delay';
opts.threshold = opts.threshold || 10000;
opts.round = opts.round || 1000;
channel.delay = function (delayMs) {
return {
publish: function (exchange, routingKey, content, options) {
delayMs = Math.ceil(delayMs / opts.round) * opts.round;
var ttl = delayMs;
var time = { ms: 1000, s: 60, m: 60, h: 24, d: 30, mo: 12, y: 999999 };
delayMs = Object.keys(time).map(function(unit) {
var mod = delayMs % time[unit];
delayMs = Math.floor(delayMs / time[unit]);
if (!mod) {
return '';
}
return mod + unit;
}).reverse().join('');
var name = [opts.prefix, exchange, delayMs].join(opts.separator);
return channel.assertExchange(name, 'fanout', {
durable: true,
autoDelete: true
}).then(function () {
return channel.assertQueue(name, {
durable: true,
autoDelete: true,
arguments: {
'x-dead-letter-exchange': exchange,
'x-message-ttl': ttl,
'x-expires': ttl + opts.threshold
}
});
}).then(function () {
return channel.bindQueue(name, name, '#');
}).then(function () {
return channel.publish(name, routingKey, content, options);
});
}
};
};
};