From ac25ccbbeafff3b7b75750444efbe43279f527ad Mon Sep 17 00:00:00 2001 From: Koheiru Date: Thu, 6 Jun 2024 14:48:03 +0100 Subject: [PATCH] Fixed influx data format parsing Influx line parsing improved because influx line protocol allows to use space and comma symbols in tag name, tag value, field name and field value. In this case such symbols should be escaped or quoted. Signed-off-by: Koheiru --- cmd/tsbs_load_influx/scan.go | 7 +- internal/utils/parsing.go | 35 +++++++ internal/utils/parsing_test.go | 144 +++++++++++++++++++++++++++++ pkg/targets/timescaledb/process.go | 3 +- 4 files changed, 185 insertions(+), 4 deletions(-) create mode 100644 internal/utils/parsing.go create mode 100644 internal/utils/parsing_test.go diff --git a/cmd/tsbs_load_influx/scan.go b/cmd/tsbs_load_influx/scan.go index c4605d7aa..e92b4e0df 100644 --- a/cmd/tsbs_load_influx/scan.go +++ b/cmd/tsbs_load_influx/scan.go @@ -5,12 +5,13 @@ import ( "bytes" "strings" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/data" "github.com/timescale/tsbs/pkg/data/usecases/common" "github.com/timescale/tsbs/pkg/targets" ) -const errNotThreeTuplesFmt = "parse error: line does not have 3 tuples, has %d" +const errNotThreeTuplesFmt = "parse error: line does not have 3 tuples, has %d -> %s" var newLine = []byte("\n") @@ -47,9 +48,9 @@ func (b *batch) Append(item data.LoadedPoint) { b.rows++ // Each influx line is format "csv-tags csv-fields timestamp", so we split by space // and then on the middle element, we split by comma to count number of fields added - args := strings.Split(thatStr, " ") + args := utils.SplitLine(thatStr, ' ') if len(args) != 3 { - fatal(errNotThreeTuplesFmt, len(args)) + fatal(errNotThreeTuplesFmt, len(args), thatStr) return } b.metrics += uint64(len(strings.Split(args[1], ","))) diff --git a/internal/utils/parsing.go b/internal/utils/parsing.go new file mode 100644 index 000000000..ff7189a8c --- /dev/null +++ b/internal/utils/parsing.go @@ -0,0 +1,35 @@ +package utils + +func SplitLine(line string, delim rune) []string { + output := make([]string, 0, 3) + beginning := 0 + quoting := false + escaping := false + for index, symbol := range line { + if symbol == '\\' { + escaping = !escaping + continue + } + + if symbol == delim { + if !escaping && !quoting { + item := line[beginning:index] + output = append(output, string(item)) + beginning = index + 1 + } + } else if symbol == '"' { + if !escaping { + quoting = !quoting + } + } + + escaping = false + } + + if beginning < len(line) { + item := line[beginning:] + output = append(output, string(item)) + } + + return output +} diff --git a/internal/utils/parsing_test.go b/internal/utils/parsing_test.go new file mode 100644 index 000000000..a51186aa7 --- /dev/null +++ b/internal/utils/parsing_test.go @@ -0,0 +1,144 @@ +package utils + +import ( + "strings" + "testing" +) + +func isEquals(left []string, right []string) bool { + if len(left) != len(right) { + return false + } + for i, value := range left { + if value != right[i] { + return false + } + } + return true +} + +func formatStrings(values []string) string { + if len(values) == 0 { + return "[]" + } else { + return "[\"" + strings.Join(values, "\", \"") + "\"]" + } +} + +func TestInfluxLineParsing(t *testing.T) { + cases := []struct { + desc string + input string + expected []string + }{ + { + desc: "simple line", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value field1_name=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + "field1_name=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + { + desc: "line with single quoted tag", + input: "measurement,\"tag1 name=tag1_value,tag2_name=tag2 value\" field1_name=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,\"tag1 name=tag1_value,tag2_name=tag2 value\"", + "field1_name=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + { + desc: "line with multiple quoted tags", + input: "measurement,\"tag1 name\"=tag1_value,tag2_name=\"tag2 value\" field1_name=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,\"tag1 name\"=tag1_value,tag2_name=\"tag2 value\"", + "field1_name=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + { + desc: "line with single quoted field", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value \"field1 name=field1_value,field1_name=field2 value\" 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + "\"field1 name=field1_value,field1_name=field2 value\"", + "1556813561098000000", + }, + }, + { + desc: "line with multiple quoted fields", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value \"field1 name\"=field1_value,field1_name=\"field2 value\" 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + "\"field1 name\"=field1_value,field1_name=\"field2 value\"", + "1556813561098000000", + }, + }, + { + desc: "line with escaped quotas", + input: "measurement,tag1_name=tag1_value,tag2_name=\\\"tag2_value field1_name\\\"=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=\\\"tag2_value", + "field1_name\\\"=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + } + + delim := ' ' + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + actual := SplitLine(c.input, delim) + if !isEquals(actual, c.expected) { + t.Errorf("%s: expected '%s', actual: '%s'", c.desc, formatStrings(c.expected), formatStrings(actual)) + } + }) + } +} + +func TestInfluxFieldsParsing(t *testing.T) { + cases := []struct { + desc string + input string + expected []string + }{ + { + desc: "simple line", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + expected: []string{ + "measurement", + "tag1_name=tag1_value", + "tag2_name=tag2_value", + }, + }, + { + desc: "line with quoted value with delimiter", + input: "measurement,tag1_name=\"tag1_value,tag2_name=tag2_value\"", + expected: []string{ + "measurement", + "tag1_name=\"tag1_value,tag2_name=tag2_value\"", + }, + }, + { + desc: "line with quoted value without delimiter", + input: "measurement,tag1_name=\"tag1_value\",tag2_name=tag2_value", + expected: []string{ + "measurement", + "tag1_name=\"tag1_value\"", + "tag2_name=tag2_value", + }, + }, + } + + delim := ',' + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + actual := SplitLine(c.input, delim) + if !isEquals(actual, c.expected) { + t.Errorf("%s: expected '%s', actual: '%s'", c.desc, formatStrings(c.expected), formatStrings(actual)) + } + }) + } +} diff --git a/pkg/targets/timescaledb/process.go b/pkg/targets/timescaledb/process.go index 15871dd87..5ec600b48 100644 --- a/pkg/targets/timescaledb/process.go +++ b/pkg/targets/timescaledb/process.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/targets" "github.com/jackc/pgx/v4" @@ -147,7 +148,7 @@ func (p *processor) splitTagsAndMetrics(rows []*insertData, dataCols int) ([][]s json = subsystemTagsToJSON(strings.Split(tags[commonTagsLen], ",")) } - metrics := strings.Split(data.fields, ",") + metrics := utils.SplitLine(data.fields, ',') numMetrics += uint64(len(metrics) - 1) // 1 field is timestamp timeInt, err := strconv.ParseInt(metrics[0], 10, 64)