-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
93 lines (81 loc) · 2.38 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
const split = require('split')
const BatchStream = require('batch-stream')
const traverse = require('traverse')
const hash = require('object-hash')
const Database = require('better-sqlite3')
const database = new Database('db.sqlite')
const executeQuery = (sql, bindings) => {
database.prepare(sql).run(bindings)
}
const createTables = () => {
executeQuery(`CREATE TABLE IF NOT EXISTS logs (
hash TEXT,
line TEXT
)`, {})
executeQuery(`CREATE TABLE IF NOT EXISTS log_fields (
hash TEXT,
path TEXT,
value TEXT
)`, {})
}
const indexMessage = (message) => {
const messageHash = hash(message)
const messageIndex = {}
traverse(message).forEach(function (x) {
if (!this.isLeaf) {
return
}
messageIndex[this.path.join('.')] = (typeof x === 'object' || typeof x === 'boolean') ? JSON.stringify(x) : x
})
return {
messageHash,
messageIndex
}
}
const insertMessage = (hash, line) => {
const sql = `INSERT INTO logs (hash, line) VALUES ($hash, $line)`
const values = { hash, line }
try {
executeQuery(sql, values)
} catch (err) {
if (!err.message.includes('UNIQUE constraint failed')) {
throw err
}
}
}
const insertMessageIndex = (hash, messageIndex) => {
const sql = `INSERT INTO log_fields (hash, path, value) VALUES ${Object.keys(messageIndex).map((key, index) => `($hash, $path${index}, $value${index})`).join(', ')}`
const values = Object.keys(messageIndex).reduce((prev, key, index) => Object.assign(prev, { [`path${index}`]: key, [`value${index}`]: messageIndex[key] }), { hash: hash })
try {
executeQuery(sql, values)
} catch (err) {
if (!err.message.includes('UNIQUE constraint failed') && !err.message.includes('too many SQL variables')) {
throw err
}
}
}
let lineCounter = 0
const processBatch = (lines) => {
lines.forEach((line, index) => {
try {
const message = JSON.parse(line)
const { messageHash, messageIndex } = indexMessage(message)
insertMessage(messageHash, line)
insertMessageIndex(messageHash, messageIndex)
lineCounter += 1
console.log(lineCounter)
} catch (err) {
console.error(`Invalid log line: ${line}`)
console.error(err.stack)
}
})
}
const run = () => {
database.pragma('journal_mode = WAL')
createTables()
process.stdin
.pipe(split())
.pipe(new BatchStream({ size: 2 }))
.on('data', processBatch)
}
run()