Skip to content

Commit

Permalink
[Enhancement] Support column mismatch fill property in files()
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Dec 26, 2024
1 parent f074803 commit 0de41ef
Show file tree
Hide file tree
Showing 15 changed files with 392 additions and 58 deletions.
70 changes: 48 additions & 22 deletions be/src/exec/csv_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ static std::string string_2_asc(const std::string& input) {
return oss.str();
}

static std::string make_column_count_not_matched_error_message(int expected_count, int actual_count,
CSVParseOptions& parse_options) {
static std::string make_column_count_not_matched_error_message_for_load(int expected_count, int actual_count,
CSVParseOptions& parse_options) {
std::stringstream error_msg;
error_msg << "Target column count: " << expected_count
<< " doesn't match source value column count: " << actual_count << ". "
Expand All @@ -55,6 +55,20 @@ static std::string make_column_count_not_matched_error_message(int expected_coun
return error_msg.str();
}

static std::string make_column_count_not_matched_error_message_for_query(int expected_count, int actual_count,
CSVParseOptions& parse_options,
const std::string& row,
const std::string& filename) {
std::stringstream error_msg;
error_msg << "Schema column count: " << expected_count
<< " doesn't match source value column count: " << actual_count << ". "
<< "Column separator: " << string_2_asc(parse_options.column_delimiter) << ", "
<< "Row delimiter: " << string_2_asc(parse_options.row_delimiter) << ", "
<< "Row: '" << row << "', File: " << filename << ". "
<< "Consider setting 'fill_mismatch_column_with' = 'null'";
return error_msg.str();
}

static std::string make_value_type_not_matched_error_message(int field_pos, const Slice& field,
const SlotDescriptor* slot) {
std::stringstream error_msg;
Expand Down Expand Up @@ -357,17 +371,23 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) {
if (status.is_end_of_file()) {
break;
}
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
std::string error_msg = make_column_count_not_matched_error_message(_num_fields_in_csv,
row.columns.size(), _parse_options);
_report_error(record, error_msg);
}
if (_state->enable_log_rejected_record()) {
std::string error_msg = make_column_count_not_matched_error_message(_num_fields_in_csv,
row.columns.size(), _parse_options);
_report_rejected_record(record, error_msg);
if (_is_load) {
std::string error_msg = make_column_count_not_matched_error_message_for_load(
_num_fields_in_csv, row.columns.size(), _parse_options);
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
_report_error(record, error_msg);
}
if (_state->enable_log_rejected_record()) {
_report_rejected_record(record, error_msg);
}
continue;
} else {
// files() query return error
std::string error_msg = make_column_count_not_matched_error_message_for_query(
_num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(),
_curr_reader->filename());
return Status::DataQualityError(error_msg);
}
continue;
}
if (!validate_utf8(record.data, record.size)) {
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
Expand Down Expand Up @@ -459,17 +479,23 @@ Status CSVScanner::_parse_csv(Chunk* chunk) {
_curr_reader->split_record(record, &fields);

if (fields.size() != _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) {
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
std::string error_msg =
make_column_count_not_matched_error_message(_num_fields_in_csv, fields.size(), _parse_options);
_report_error(record, error_msg);
}
if (_state->enable_log_rejected_record()) {
std::string error_msg =
make_column_count_not_matched_error_message(_num_fields_in_csv, fields.size(), _parse_options);
_report_rejected_record(record, error_msg);
if (_is_load) {
std::string error_msg = make_column_count_not_matched_error_message_for_load(
_num_fields_in_csv, fields.size(), _parse_options);
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
_report_error(record, error_msg);
}
if (_state->enable_log_rejected_record()) {
_report_rejected_record(record, error_msg);
}
continue;
} else {
// files() query return error
std::string error_msg = make_column_count_not_matched_error_message_for_query(
_num_fields_in_csv, fields.size(), _parse_options, record.to_string(),
_curr_reader->filename());
return Status::DataQualityError(error_msg);
}
continue;
}
if (!validate_utf8(record.data, record.size)) {
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ FileScanner::FileScanner(starrocks::RuntimeState* state, starrocks::RuntimeProfi
_row_desc(nullptr),
_strict_mode(false),
_error_counter(0),
_is_load(true),
_schema_only(schema_only) {}

FileScanner::~FileScanner() = default;
Expand Down Expand Up @@ -135,6 +136,10 @@ Status FileScanner::open() {
_strict_mode = _params.strict_mode;
}

if (_params.__isset.is_load) {
_is_load = _params.is_load;
}

if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) {
return Status::InternalError("Slot map of dest to src must be set in strict mode");
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class FileScanner {

bool _strict_mode;
int64_t _error_counter;
// When column mismatch, query and load have different behaviors.
// Query returns error, while load counts the filtered rows, and return error or not based on max filter ratio,
// so need to check query or load in scanner.
// Currently only used in csv scanner.
bool _is_load;

// sources
std::vector<SlotDescriptor*> _src_slot_descriptors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class TableFunctionTable extends Table {
public static final String PROPERTY_AUTO_DETECT_SAMPLE_FILES = "auto_detect_sample_files";
public static final String PROPERTY_AUTO_DETECT_SAMPLE_ROWS = "auto_detect_sample_rows";

private static final String PROPERTY_FILL_MISMATCH_COLUMN_WITH = "fill_mismatch_column_with";

public static final String PROPERTY_CSV_COLUMN_SEPARATOR = "csv.column_separator";
public static final String PROPERTY_CSV_ROW_DELIMITER = "csv.row_delimiter";
public static final String PROPERTY_CSV_SKIP_HEADER = "csv.skip_header";
Expand All @@ -130,19 +132,48 @@ public class TableFunctionTable extends Table {
private static final String PROPERTY_LIST_FILES_ONLY = "list_files_only";
private static final String PROPERTY_LIST_RECURSIVELY = "list_recursively";

public enum MisMatchFillValue {
NONE, // error
NULL;

public static MisMatchFillValue fromString(String value) {
for (MisMatchFillValue fillValue : values()) {
if (fillValue.name().equalsIgnoreCase(value)) {
return fillValue;
}
}
return null;
}

public static List<String> getCandidates() {
return Arrays.stream(values()).map(p -> p.name().toLowerCase()).collect(Collectors.toList());
}
}

public enum FilesTableType {
LOAD,
UNLOAD,
QUERY,
LIST
}

private String path;
private String format;

// for load data
private int autoDetectSampleFiles;
private int autoDetectSampleRows;
private FilesTableType filesTableType = FilesTableType.QUERY;

// for load/query data
private int autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES;
private int autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS;

private List<String> columnsFromPath = new ArrayList<>();
private boolean strictMode = false;
private final Map<String, String> properties;

private List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();

private MisMatchFillValue misMatchFillValue = MisMatchFillValue.NONE;

// for unload data
private String compressionType;
private Optional<List<Integer>> partitionColumnIDs = Optional.empty();
Expand All @@ -164,12 +195,12 @@ public class TableFunctionTable extends Table {
private boolean listFilesOnly = false;
private boolean listRecursively = false;

// Ctor for load data / list files via table function
// Ctor for load data / query data / list files via table function
public TableFunctionTable(Map<String, String> properties) throws DdlException {
this(properties, null);
}

// Ctor for load data / list files via table function
// Ctor for load data / query data / list files via table function
public TableFunctionTable(Map<String, String> properties, Consumer<TableFunctionTable> pushDownSchemaFunc)
throws DdlException {
super(TableType.TABLE_FUNCTION);
Expand All @@ -180,9 +211,11 @@ public TableFunctionTable(Map<String, String> properties, Consumer<TableFunction
parseProperties();

if (listFilesOnly) {
this.filesTableType = FilesTableType.LIST;
setSchemaForListFiles();
} else {
setSchemaForLoad();
// set filesTableType as LOAD in insert analyzer, and default is QUERY
setSchemaForLoadAndQuery();
}

if (pushDownSchemaFunc != null) {
Expand All @@ -196,12 +229,13 @@ public TableFunctionTable(List<Column> columns, Map<String, String> properties,
checkNotNull(properties, "properties is null");
checkNotNull(sessionVariable, "sessionVariable is null");
this.properties = properties;
this.filesTableType = FilesTableType.UNLOAD;
parsePropertiesForUnload(columns, sessionVariable);
setNewFullSchema(columns);
}

private void setSchemaForLoad() throws DdlException {
parseFilesForLoad();
private void setSchemaForLoadAndQuery() throws DdlException {
parseFilesForLoadAndQuery();

// infer schema from files
List<Column> columns = new ArrayList<>();
Expand Down Expand Up @@ -338,6 +372,14 @@ public String getPath() {
return path;
}

public void setFilesTableType(FilesTableType filesTableType) {
this.filesTableType = filesTableType;
}

public boolean isLoadType() {
return filesTableType == FilesTableType.LOAD;
}

public boolean isListFilesOnly() {
return listFilesOnly;
}
Expand Down Expand Up @@ -393,23 +435,34 @@ private void parsePropertiesForLoad(Map<String, String> properties) throws DdlEx
strictMode = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_MODE));
}

if (!properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_FILES)) {
autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES;
} else {
if (properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_FILES)) {
String property = properties.get(PROPERTY_AUTO_DETECT_SAMPLE_FILES);
try {
autoDetectSampleFiles = Integer.parseInt(properties.get(PROPERTY_AUTO_DETECT_SAMPLE_FILES));
autoDetectSampleFiles = Integer.parseInt(property);
} catch (NumberFormatException e) {
throw new DdlException("failed to parse auto_detect_sample_files: ", e);
ErrorReport.reportDdlException(
ErrorCode.ERR_INVALID_VALUE, PROPERTY_AUTO_DETECT_SAMPLE_FILES, property, "int number");
}
}

if (!properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)) {
autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS;
} else {
if (properties.containsKey(PROPERTY_FILL_MISMATCH_COLUMN_WITH)) {
String property = properties.get(PROPERTY_FILL_MISMATCH_COLUMN_WITH);
misMatchFillValue = MisMatchFillValue.fromString(property);
if (misMatchFillValue == null) {
String msg = String.format("%s (case insensitive)", String.join(", ", MisMatchFillValue.getCandidates()));
ErrorReport.reportSemanticException(
ErrorCode.ERR_INVALID_VALUE, PROPERTY_FILL_MISMATCH_COLUMN_WITH, property, msg);
}
}

// csv properties
if (properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)) {
String property = properties.get(PROPERTY_AUTO_DETECT_SAMPLE_ROWS);
try {
autoDetectSampleRows = Integer.parseInt(properties.get(PROPERTY_AUTO_DETECT_SAMPLE_ROWS));
autoDetectSampleRows = Integer.parseInt(property);
} catch (NumberFormatException e) {
throw new DdlException("failed to parse auto_detect_sample_files: ", e);
ErrorReport.reportDdlException(
ErrorCode.ERR_INVALID_VALUE, PROPERTY_AUTO_DETECT_SAMPLE_ROWS, property, "int number");
}
}

Expand Down Expand Up @@ -467,7 +520,7 @@ private void parsePropertiesForLoad(Map<String, String> properties) throws DdlEx
}
}

private void parseFilesForLoad() throws DdlException {
private void parseFilesForLoadAndQuery() throws DdlException {
try {
// fake:// is a faked path, for testing purpose
if (path.startsWith("fake://")) {
Expand Down Expand Up @@ -635,6 +688,10 @@ public boolean isStrictMode() {
return strictMode;
}

public boolean isFlexibleColumnMapping() {
return misMatchFillValue != MisMatchFillValue.NONE;
}

@Override
public String toString() {
return String.format("TABLE('path'='%s', 'format'='%s')", path, format);
Expand Down
23 changes: 18 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
private boolean useVectorizedLoad;

private LoadJob.JSONOptions jsonOptions = new LoadJob.JSONOptions();

private boolean flexibleColumnMapping = false;
// When column mismatch, query and load have different behaviors.
// Query returns error, while load counts the filtered rows, and return error or not based on max filter ratio,
// so need to check query or load in scanner.
// Currently only used in csv scanner.
private boolean isLoad = true;

private boolean nullExprInAutoIncrement;

Expand Down Expand Up @@ -224,8 +230,10 @@ public void init(Analyzer analyzer) throws StarRocksException {
}
}

private boolean isLoad() {
return desc.getTable() == null;
// broker table is deprecated
// TODO: remove
private boolean isBrokerTable() {
return desc.getTable() != null;
}

@Deprecated
Expand Down Expand Up @@ -257,6 +265,10 @@ public void setFlexibleColumnMapping(boolean enable) {
this.flexibleColumnMapping = enable;
}

public void setIsLoad(boolean isLoad) {
this.isLoad = isLoad;
}

public void setUseVectorizedLoad(boolean useVectorizedLoad) {
this.useVectorizedLoad = useVectorizedLoad;
}
Expand Down Expand Up @@ -314,6 +326,7 @@ private void initParams(ParamCreateContext context)
params.setEscape(fileGroup.getEscape());
params.setJson_file_size_limit(Config.json_file_size_limit);
params.setFlexible_column_mapping(flexibleColumnMapping);
params.setIs_load(isLoad);
initColumns(context);
initWhereExpr(fileGroup.getWhereExpr(), analyzer);
}
Expand All @@ -338,7 +351,7 @@ private void initColumns(ParamCreateContext context) throws StarRocksException {
// for query, there is no column exprs, they will be got from table's schema in "Load.initColumns"
List<ImportColumnDesc> columnExprs = Lists.newArrayList();
List<String> columnsFromPath = Lists.newArrayList();
if (isLoad()) {
if (!isBrokerTable()) {
columnExprs = context.fileGroup.getColumnExprList();
columnsFromPath = context.fileGroup.getColumnsFromPath();
}
Expand Down Expand Up @@ -494,7 +507,7 @@ private void getFileStatusAndCalcInstance() throws StarRocksException {
}
Preconditions.checkState(fileStatusesList.size() == fileGroups.size());

if (isLoad() && filesAdded == 0) {
if (!isBrokerTable() && filesAdded == 0) {
// return at most 3 paths to users
int limit = 3;
List<String> allFilePaths =
Expand Down Expand Up @@ -740,7 +753,7 @@ public void updateScanRangeLocations() {
@Override
protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
if (!isLoad()) {
if (isBrokerTable()) {
BrokerTable brokerTable = (BrokerTable) targetTable;
output.append(prefix).append("TABLE: ").append(brokerTable.getName()).append("\n");
output.append(prefix).append("PATH: ")
Expand Down
Loading

0 comments on commit 0de41ef

Please sign in to comment.