Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-1011][Doc] Add Transform component introduction. #1016

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/modules/transform/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"label": "Transform",
"position": 9
}
165 changes: 165 additions & 0 deletions docs/modules/transform/configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
---
title: Configuration Instructions
sidebar_position: 2
---

# Parameter Configuration Description
## TransformConfig Configuration Description
```java
public class TransformConfig {

@JsonProperty("sourceInfo")
private SourceInfo sourceInfo; // Definition of data source decoding
@JsonProperty("sinkInfo")
private SinkInfo sinkInfo; // Definition of data result encoding
@JsonProperty("transformSql")
private String transformSql; //Data transformation SQL
}
```

## SourceInfo Configuration Description
### CSV
```java
public CsvSourceInfo(
@JsonProperty("charset") String charset, // Character set
@JsonProperty("delimiter") String delimiter, // Delimiter
@JsonProperty("escapeChar") String escapeChar, // Escape character, if empty, no unescaping operation is performed during decoding
@JsonProperty("fields") List<FieldInfo> fields) // Field list, if empty, decode by default according to the delimiter, field names are assigned as $1, $2, $3... starting from 1;
// If the number of defined fields is less than the number of decoded fields, the extra fields will be discarded
);
```

### KV
```java
public KvSourceInfo(
@JsonProperty("charset") String charset, // Character set
@JsonProperty("fields") List<FieldInfo> fields) // Field list, if empty, decode by default using the Key in KV as the field name
// If the field name does not match the decoded field name, the field value will be empty, and extra field names will be discarded
);
```

### ProtoBuffer
```java
public PbSourceInfo(
@JsonProperty("charset") String charset, // Character set
@JsonProperty("protoDescription") String protoDescription, // Base64 encoded ProtoBuf protocol description
@JsonProperty("rootMessageType") String rootMessageType, // MessageType of the decoded source data, MessageType needs to be defined in the ProtoBuf protocol
@JsonProperty("rowsNodePath") String rowsNodePath) // Array node path of the ProtoBuf protocol containing multiple data to be converted
);
```

#### Generate ProtoBuf Protocol Description
- Install Protocol Buffers compiler
```shell
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
curl -LO $PB_REL/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip
unzip protoc-3.15.8-linux-x86_64.zip -d $HOME/.local
export PATH="$HOME/.local/bin:$PATH"
protoc --version
#Displays libprotoc 3.15.8
```
- Parse the protocol to generate a Base64 encoded description
```shell
# transform.proto is the proto protocol file, transform.description is the binary description file after parsing the protocol
protoc --descriptor_set_out=transform.description ./transform.proto --proto_path=.
# Base64 encode the binary description file transform.description and write it to the file transform.base64, which is the parameter protoDescription in the configuration interface
base64 transform.description |tr -d '\n' > transform.base64
```
- Example of transform.proto
```ProtoBuf
syntax = "proto3";
package test;
message SdkMessage {
bytes msg = 1;
int64 msgTime = 2;
map<string, string> extinfo = 3;
}
message SdkDataRequest {
string sid = 1;
repeated SdkMessage msgs = 2;
uint64 packageID = 3;
}
```
- Example of transform.base64
```text
CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUgNtc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLkV4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUYAiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMhAudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM=
```
- Example of transform.description
![](img/transform_description.png)

### Json
```java
public JsonSourceInfo(
@JsonProperty("charset") String charset, // Character set
@JsonProperty("rowsNodePath") String rowsNodePath) // Array node path of the Json protocol containing multiple data to be converted
);
```
## SinkInfo Configuration Description
### CSV
```java
public CsvSinkInfo(
@JsonProperty("charset") String charset, // Character set
@JsonProperty("delimiter") String delimiter, // Delimiter
@JsonProperty("escapeChar") String escapeChar, // Escape character, if empty, no escaping operation is performed during encoding
@JsonProperty("fields") List<FieldInfo> fields) // Field list, if empty, encode by default according to the Select field order of TransformSQL
);
```
### KV
```java
public KvSinkInfo(
@JsonProperty("charset") String charset, // Character set
@JsonProperty("fields") List<FieldInfo> fields) // Field list, if empty, encode by default using the Alias of Select fields in TransformSQL as the Key
);
```
# TransformSQL Configuration Description
## CSV / KV Field Reference
- SourceInfo does not have a configured field list.
- For CSV format, field names are referenced using 2, $3...
- For KV format, field names directly reference the Key in the source data.
- If the field names in SourceInfo and SinkInfo are inconsistent, you can use the Alias of Select fields to map the conversion.
## ProtoBuf / Json Tree Structure Field Reference
- All fields can only be prefixed with "$root.", "$child".
- "$root" means the root node.
- "$child" means the array node of multiple rows.
- For multi-level nodes, use a decimal point to separate, such as $root.extParams.name.
- For array nodes, use parentheses to identify the array index, such as $root.msgs(1).msgTime.
## Operator Support
- Currently supported operators
- Arithmetic operators: +, -, *, /, (, )
- Comparison operators: =, !=, >, >=, <, <=
- Logical operators: and, or, !, not, (, )
## Function Description
- CONCAT(string1, string2, ...), returns a concatenated string of string1, string2, ... If any parameter is NULL, it returns NULL. For example, CONCAT('AA', 'BB', 'CC') returns "AABBCC".
- NOW(), returns the current SQL timestamp in the local timezone.
- See the function description section for details.
## SQL Example
```sql
select ftime,extinfo from source where extinfo='ok'

select $1 ftime,$2 extinfo from source where $2!='ok'

select $root.sid,$root.packageID,$child.msgTime,$child.msg from source

select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source

select $root.sid,
($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,
$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)*$root.packageID field3,
$root.msgs(0).msg field4
from source
where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime+$root.msgs(0).msgTime+$root.msgs(1).msgTime)

select $root.sid,
$root.packageID,
$child.msgTime,
concat($root.sid,$root.packageID,$child.msgTime,$child.msg) msg,$root.msgs.msgTime.msg
from source

select now() from source
```
# Common Issues
- SDK calls are thread-safe.
- Configuration changes, directly modifying the parameters of the configuration object will not take effect, you need to rebuild the SDK object.
- If the CSV, KV format conversion source data contains line breaks, delimiters (vertical bars, commas, etc.), backslashes (escape characters), you need to configure the correct escape character and line separator.
- If not configured, the field order of the converted data will be disordered, line breaks will cause one piece of data to become two, and vertical bar delimiters will cause field misalignment.
- Avoid creating an SDK object for each piece of data processed, SDK object initialization requires compiling the conversion SQL and establishing an AST semantic parsing tree, frequent calls will cause performance problems, the recommended usage is to reuse an initialized SDK object to process data in the program.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 35 additions & 0 deletions docs/modules/transform/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
title: Overview
sidebar_position: 1
---

# Introduction
InLong Transform helps InLong expand its access and distribution capabilities, adapting to a richer variety of data protocols and reporting scenarios on the access side, and accommodating complex and diverse data analysis scenarios on the distribution side. It improves data quality and collaboration, providing computational capabilities decoupled from the computing engine such as connection, aggregation, filtering, grouping, value extraction, sampling, etc. It simplifies the preliminary operations for users to report data, lowers the threshold for data usage, and focuses on the business value of data, achieving the principle of "what is visible is usable."

![](img/transform_introduction.png)

# Application Scenarios
- Data Cleaning: During the data integration process, it is necessary to clean data from different sources to eliminate errors, duplicates, and inconsistencies. Transform capabilities can help enterprises clean data more effectively, improving data quality.
- Data Fusion: Combining data from different sources for unified analysis and reporting. Transform capabilities can handle data in various formats and structures, achieving data fusion and integration.
- Data Standardization: Converting data into a unified standard format for cross-system and cross-platform data analysis. Transform capabilities can help enterprises standardize and normalize data.
- Data Partitioning and Indexing: To improve the performance of data querying and analysis, partition data and create indexes. Transform capabilities can dynamically adjust the field values for partitioning and indexing, thereby improving the performance of data warehouses.
- Data Aggregation and Calculation: During data analysis, extract valuable business information through data aggregation and calculation. Transform capabilities can achieve complex data aggregation and calculation, covering multi-dimensional data analysis.
- Data Security and Privacy Protection: Ensure data security and privacy during the data integration process. Transform capabilities can achieve data masking, encryption, and authorization management, protecting data security and privacy.
- Cross-Team Data Sharing: Share only a filtered subset of the data stream for data security considerations; agree on data interfaces with partner teams for data dependency decoupling, dynamically adjusting the merging of multiple streams into the data stream interface.

# Features
- Describe the Transform processing logic of the data stream through SQL, supporting standard SQL syntax.
- Provide a rich SQL Function to handle various Transform needs and support UDF extensions.
- Support CSV, KV, ProtoBuffer, JSON, and other flat table and tree structure data source decoding frameworks.
- Support CSV, KV, and other data target encoding frameworks.
- Data source decoding and data target encoding are extensible for development.

# Future Planning
- Support richer Transform UDFs, data source decoders, and data target encoders.
- Support Group and Join capabilities based on Time Window.
- Integrate Transform into each module of InLong to enhance processing capabilities and user experience.
- Agent: Responsible for collecting raw data from various data sources. After expanding Transform capabilities, it adds support for complex data source protocols such as PB, Json, and increases data filtering and format conversion capabilities.
- Realtime Synchronization: Currently, real-time synchronization is implemented based on FlinkSQL transformation, one data stream per job; after expanding Transform capabilities, it adds support for complex data source protocols such as PB, Json; and supports multiple data streams per job.
- Offline Synchronization: Offline synchronization is currently planned to be implemented based on Flink Batch, with InLongTransform as a custom function to expand its transformation capabilities; it can use the data target of the InLong data stream as a data source, achieve internal data integration, implement preprocessing, and trigger downstream offline jobs through the end of the pre-sort job or the offline synchronization job or partition closure event.
- Manager: After expanding Transform capabilities, the Manager interface provides preliminary transformation operations for raw data, verifies the correctness of the transformation logic configuration, and improves user experience.
- Sort: Currently, Sort defines that one data stream has only one offline data target per type, but after expanding Transform capabilities, it allows multiple copies and subsets to be stored, and enriches the final storage content through association with static database tables, optimizing the processing of subsequent business tasks.
Loading
Loading