Skip to content

Commit

Permalink
[doc](group-commit) Optimize the document of group commit
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Dec 16, 2024
1 parent 92aa6dc commit 9d605fb
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 648 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
{
"title": "Group Commit",
"title": "高并发导入优化(Group Commit",
"language": "zh-CN"
}
---
Expand All @@ -25,15 +25,15 @@ under the License.
-->


Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)``Stream Load``Http Stream`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 将数据高频写入 Doris,同时通过使用 PreparedStatement 可以获得更高的性能。在日志场景下,您也可以利用 Stream Load 或者 Http Stream 将数据高频写入 Doris。
Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)``Stream Load`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 将数据高频写入 Doris,同时通过使用 PreparedStatement 可以获得更高的性能。在日志场景下,您也可以利用 Stream Load 将数据高频写入 Doris。

## Group Commit 模式

Group Commit 写入有三种模式,分别是:

* 关闭模式(`off_mode`

不开启 Group Commit,保持以上三种导入方式的默认行为
不开启 Group Commit。

* 同步模式(`sync_mode`

Expand All @@ -43,7 +43,7 @@ Group Commit 写入有三种模式,分别是:

Doris 首先将数据写入 WAL (`Write Ahead Log`),然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。为了防止 WAL 占用较大的磁盘空间,单次导入数据量较大时,会自动切换为`sync_mode`。这适用于写入延迟敏感以及高频写入的场景。

WAL的数量可以通过FE http接口查看,具体可见[这里](../../../admin-manual/fe/get-wal-size-action.md),也可以在BE的metrics中搜索关键词`wal`查看。
WAL的数量可以通过FE http接口查看,具体可见[这里](../../admin-manual/open-api/fe-http/get-wal-size-action),也可以在BE的metrics中搜索关键词`wal`查看。

## Group Commit 使用方式

Expand Down Expand Up @@ -133,7 +133,7 @@ private static void groupCommitInsertBatch() throws Exception {
set enable_prepared_stmt_audit_log=true;
```

关于 **JDBC** 的更多用法,参考[使用 Insert 方式同步数据](./insert-into-manual.md)
关于 **JDBC** 的更多用法,参考[使用 Insert 方式同步数据](./import-way/insert-into-manual.md)

### 使用Golang进行Group Commit

Expand Down Expand Up @@ -199,25 +199,23 @@ func groupCommitInsertBatch(db *sql.DB) {
valueStrings := make([]string, 0, batchSize)
valueArgs := make([]interface{}, 0, batchSize*16)
for i := 0; i < batchSize; i++ {
for i = 0; i < batchSize; i++ {
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, "N")
valueArgs = append(valueArgs, "O")
valueArgs = append(valueArgs, time.Now())
valueArgs = append(valueArgs, time.Now())
valueArgs = append(valueArgs, time.Now())
valueArgs = append(valueArgs, "DELIVER IN PERSON")
valueArgs = append(valueArgs, "SHIP")
valueArgs = append(valueArgs, "N/A")
}
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, rand.Intn(1000))
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
valueArgs = append(valueArgs, "N")
valueArgs = append(valueArgs, "O")
valueArgs = append(valueArgs, time.Now())
valueArgs = append(valueArgs, time.Now())
valueArgs = append(valueArgs, time.Now())
valueArgs = append(valueArgs, "DELIVER IN PERSON")
valueArgs = append(valueArgs, "SHIP")
valueArgs = append(valueArgs, "N/A")
}
stmt := fmt.Sprintf("INSERT INTO %s VALUES %s",
table, strings.Join(valueStrings, ","))
Expand Down Expand Up @@ -372,67 +370,8 @@ func logInsertStatistics() {
# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label
```

关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./stream-load-manual)。
关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。

### Http Stream

* 异步模式

```sql
# 导入时在 header 中增加"group_commit:async_mode"配置
curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream
{
"TxnId": 7011,
"Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0",
"Comment": "",
"GroupCommit": true,
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 19,
"LoadTimeMs": 65,
"StreamLoadPutTimeMs": 41,
"ReadDataTimeMs": 47,
"WriteDataTimeMs": 23
}
# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程
# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label
```

* 同步模式

```sql
# 导入时在 header 中增加"group_commit:sync_mode"配置
curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream
{
"TxnId": 3011,
"Label": "group_commit_fe470e6752aadbe6_a8f3ac328b02ea91",
"Comment": "",
"GroupCommit": true,
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 19,
"LoadTimeMs": 10066,
"StreamLoadPutTimeMs": 31,
"ReadDataTimeMs": 32,
"WriteDataTimeMs": 10034
}
# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程
# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label
```

关于 Http Stream 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./stream-load-manual.md#tvf-在-stream-load-中的应用---http_stream-模式)。

## 自动提交条件

Expand Down Expand Up @@ -522,7 +461,7 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");
group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal
```
2. `group_commit_memory_rows_for_max_filter_ratio`**
2. `group_commit_memory_rows_for_max_filter_ratio`
* 描述:当 group commit 导入的总行数不高于该值,`max_filter_ratio` 正常工作,否则不工作
Expand Down
Loading

0 comments on commit 9d605fb

Please sign in to comment.