diff --git a/cmd/tsbs_load_influx/scan.go b/cmd/tsbs_load_influx/scan.go index c4605d7aa..e92b4e0df 100644 --- a/cmd/tsbs_load_influx/scan.go +++ b/cmd/tsbs_load_influx/scan.go @@ -5,12 +5,13 @@ import ( "bytes" "strings" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/data" "github.com/timescale/tsbs/pkg/data/usecases/common" "github.com/timescale/tsbs/pkg/targets" ) -const errNotThreeTuplesFmt = "parse error: line does not have 3 tuples, has %d" +const errNotThreeTuplesFmt = "parse error: line does not have 3 tuples, has %d -> %s" var newLine = []byte("\n") @@ -47,9 +48,9 @@ func (b *batch) Append(item data.LoadedPoint) { b.rows++ // Each influx line is format "csv-tags csv-fields timestamp", so we split by space // and then on the middle element, we split by comma to count number of fields added - args := strings.Split(thatStr, " ") + args := utils.SplitLine(thatStr, ' ') if len(args) != 3 { - fatal(errNotThreeTuplesFmt, len(args)) + fatal(errNotThreeTuplesFmt, len(args), thatStr) return } b.metrics += uint64(len(strings.Split(args[1], ","))) diff --git a/internal/utils/parsing.go b/internal/utils/parsing.go new file mode 100644 index 000000000..ff7189a8c --- /dev/null +++ b/internal/utils/parsing.go @@ -0,0 +1,35 @@ +package utils + +func SplitLine(line string, delim rune) []string { + output := make([]string, 0, 3) + beginning := 0 + quoting := false + escaping := false + for index, symbol := range line { + if symbol == '\\' { + escaping = !escaping + continue + } + + if symbol == delim { + if !escaping && !quoting { + item := line[beginning:index] + output = append(output, string(item)) + beginning = index + 1 + } + } else if symbol == '"' { + if !escaping { + quoting = !quoting + } + } + + escaping = false + } + + if beginning < len(line) { + item := line[beginning:] + output = append(output, string(item)) + } + + return output +} diff --git a/internal/utils/parsing_test.go b/internal/utils/parsing_test.go new file mode 100644 index 000000000..a51186aa7 --- /dev/null +++ b/internal/utils/parsing_test.go @@ -0,0 +1,144 @@ +package utils + +import ( + "strings" + "testing" +) + +func isEquals(left []string, right []string) bool { + if len(left) != len(right) { + return false + } + for i, value := range left { + if value != right[i] { + return false + } + } + return true +} + +func formatStrings(values []string) string { + if len(values) == 0 { + return "[]" + } else { + return "[\"" + strings.Join(values, "\", \"") + "\"]" + } +} + +func TestInfluxLineParsing(t *testing.T) { + cases := []struct { + desc string + input string + expected []string + }{ + { + desc: "simple line", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value field1_name=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + "field1_name=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + { + desc: "line with single quoted tag", + input: "measurement,\"tag1 name=tag1_value,tag2_name=tag2 value\" field1_name=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,\"tag1 name=tag1_value,tag2_name=tag2 value\"", + "field1_name=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + { + desc: "line with multiple quoted tags", + input: "measurement,\"tag1 name\"=tag1_value,tag2_name=\"tag2 value\" field1_name=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,\"tag1 name\"=tag1_value,tag2_name=\"tag2 value\"", + "field1_name=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + { + desc: "line with single quoted field", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value \"field1 name=field1_value,field1_name=field2 value\" 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + "\"field1 name=field1_value,field1_name=field2 value\"", + "1556813561098000000", + }, + }, + { + desc: "line with multiple quoted fields", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value \"field1 name\"=field1_value,field1_name=\"field2 value\" 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + "\"field1 name\"=field1_value,field1_name=\"field2 value\"", + "1556813561098000000", + }, + }, + { + desc: "line with escaped quotas", + input: "measurement,tag1_name=tag1_value,tag2_name=\\\"tag2_value field1_name\\\"=field1_value,field1_name=field2_value 1556813561098000000", + expected: []string{ + "measurement,tag1_name=tag1_value,tag2_name=\\\"tag2_value", + "field1_name\\\"=field1_value,field1_name=field2_value", + "1556813561098000000", + }, + }, + } + + delim := ' ' + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + actual := SplitLine(c.input, delim) + if !isEquals(actual, c.expected) { + t.Errorf("%s: expected '%s', actual: '%s'", c.desc, formatStrings(c.expected), formatStrings(actual)) + } + }) + } +} + +func TestInfluxFieldsParsing(t *testing.T) { + cases := []struct { + desc string + input string + expected []string + }{ + { + desc: "simple line", + input: "measurement,tag1_name=tag1_value,tag2_name=tag2_value", + expected: []string{ + "measurement", + "tag1_name=tag1_value", + "tag2_name=tag2_value", + }, + }, + { + desc: "line with quoted value with delimiter", + input: "measurement,tag1_name=\"tag1_value,tag2_name=tag2_value\"", + expected: []string{ + "measurement", + "tag1_name=\"tag1_value,tag2_name=tag2_value\"", + }, + }, + { + desc: "line with quoted value without delimiter", + input: "measurement,tag1_name=\"tag1_value\",tag2_name=tag2_value", + expected: []string{ + "measurement", + "tag1_name=\"tag1_value\"", + "tag2_name=tag2_value", + }, + }, + } + + delim := ',' + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + actual := SplitLine(c.input, delim) + if !isEquals(actual, c.expected) { + t.Errorf("%s: expected '%s', actual: '%s'", c.desc, formatStrings(c.expected), formatStrings(actual)) + } + }) + } +} diff --git a/pkg/targets/timescaledb/process.go b/pkg/targets/timescaledb/process.go index 15871dd87..5ec600b48 100644 --- a/pkg/targets/timescaledb/process.go +++ b/pkg/targets/timescaledb/process.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/targets" "github.com/jackc/pgx/v4" @@ -147,7 +148,7 @@ func (p *processor) splitTagsAndMetrics(rows []*insertData, dataCols int) ([][]s json = subsystemTagsToJSON(strings.Split(tags[commonTagsLen], ",")) } - metrics := strings.Split(data.fields, ",") + metrics := utils.SplitLine(data.fields, ',') numMetrics += uint64(len(metrics) - 1) // 1 field is timestamp timeInt, err := strconv.ParseInt(metrics[0], 10, 64)