-
Notifications
You must be signed in to change notification settings - Fork 751
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add OrcSchemaConversionValidator to avoid infinite recursion in AvroO…
…rcSchemaConverter.getOrcSchema()
- Loading branch information
Tao Qin
committed
Oct 18, 2023
1 parent
6266a12
commit e0bd51d
Showing
3 changed files
with
259 additions
and
5 deletions.
There are no files selected for viewing
61 changes: 61 additions & 0 deletions
61
...apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.gobblin.source.extractor.extract.kafka.validator; | ||
|
||
import java.io.IOException; | ||
import org.apache.avro.Schema; | ||
import org.apache.gobblin.configuration.State; | ||
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry; | ||
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory; | ||
import org.apache.gobblin.kafka.schemareg.SchemaRegistryException; | ||
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; | ||
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class OrcSchemaConversionValidator extends TopicValidatorBase { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(OrcSchemaConversionValidator.class); | ||
|
||
public static final String MAX_RECURSIVE_DEPTH_KEY = "gobblin.kafka.topicValidators.orcSchemaConversionValidator.maxRecursiveDepth"; | ||
public static final int DEFAULT_MAX_RECURSIVE_DEPTH = 200; | ||
|
||
private final KafkaSchemaRegistry schemaRegistry; | ||
|
||
public OrcSchemaConversionValidator(State sourceState) { | ||
super(sourceState); | ||
this.schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(sourceState.getProperties()); | ||
} | ||
|
||
@Override | ||
public boolean validate(KafkaTopic topic) { | ||
LOGGER.debug("Validating ORC schema conversion for topic {}", topic.getName()); | ||
try { | ||
Schema schema = (Schema) this.schemaRegistry.getLatestSchema(topic.getName()); | ||
// Try converting the avro schema to orc schema to check if any errors. | ||
int maxRecursiveDepth = this.state.getPropAsInt(MAX_RECURSIVE_DEPTH_KEY, DEFAULT_MAX_RECURSIVE_DEPTH); | ||
AvroOrcSchemaConverter.tryGetOrcSchema(schema, 0, maxRecursiveDepth); | ||
} catch (StackOverflowError e) { | ||
LOGGER.warn("Failed to covert latest schema to ORC schema for topic: {}", topic.getName()); | ||
return false; | ||
} catch (IOException | SchemaRegistryException e) { | ||
LOGGER.warn("Failed to get latest schema for topic: {}, validation is skipped, exception: ", topic.getName(), e); | ||
return true; | ||
} | ||
return true; | ||
} | ||
} |
176 changes: 176 additions & 0 deletions
176
...he/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.gobblin.source.extractor.extract.kafka.validator; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.ImmutableMap; | ||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import org.apache.avro.Schema; | ||
import org.apache.gobblin.configuration.State; | ||
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry; | ||
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys; | ||
import org.apache.gobblin.kafka.schemareg.SchemaRegistryException; | ||
import org.apache.gobblin.kafka.serialize.MD5Digest; | ||
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; | ||
import org.testng.Assert; | ||
import org.testng.annotations.Test; | ||
|
||
|
||
public class OrcSchemaConversionValidatorTest { | ||
@Test | ||
public void testOrcSchemaConversionValidator() { | ||
KafkaTopic topic1 = new KafkaTopic("topic1", ImmutableList.of()); | ||
KafkaTopic topic2 = new KafkaTopic("topic2", ImmutableList.of()); | ||
KafkaTopic topic3 = new KafkaTopic("topic3", ImmutableList.of()); | ||
State state = new State(); | ||
state.setProp(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS, TestKafkaSchemaRegistry.class.getName()); | ||
|
||
OrcSchemaConversionValidator validator = new OrcSchemaConversionValidator(state); | ||
Assert.assertTrue(validator.validate(topic1)); // Pass validation | ||
Assert.assertTrue(validator.validate(topic2)); // Pass validation | ||
Assert.assertFalse(validator.validate(topic3)); // Fail validation, default max_recursive_depth = 200, the validation returns early | ||
|
||
state.setProp(OrcSchemaConversionValidator.MAX_RECURSIVE_DEPTH_KEY, 1); | ||
Assert.assertTrue(validator.validate(topic1)); // Pass validation | ||
Assert.assertFalse(validator.validate(topic2)); // Fail validation, because max_recursive_depth is set to 1, the validation returns early | ||
Assert.assertFalse(validator.validate(topic3)); // Fail validation, because max_recursive_depth is set to 1, the validation returns early | ||
} | ||
|
||
@Test | ||
public void testGetLatestSchemaFail() { | ||
KafkaTopic topic1 = new KafkaTopic("topic1", ImmutableList.of()); | ||
KafkaTopic topic2 = new KafkaTopic("topic2", ImmutableList.of()); | ||
KafkaTopic topic3 = new KafkaTopic("topic3", ImmutableList.of()); | ||
State state = new State(); | ||
state.setProp(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS, BadKafkaSchemaRegistry.class.getName()); | ||
|
||
OrcSchemaConversionValidator validator = new OrcSchemaConversionValidator(state); | ||
// Validator should always return PASS when it fails to get latest schema. | ||
Assert.assertTrue(validator.validate(topic1)); | ||
Assert.assertTrue(validator.validate(topic2)); | ||
Assert.assertTrue(validator.validate(topic3)); | ||
} | ||
|
||
// A KafkaSchemaRegistry class that returns the hardcoded schemas for the test topics. | ||
public static class TestKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Schema> { | ||
private final String schemaMaxInnerFieldDepthIs1 = "{" | ||
+ "\"type\": \"record\"," | ||
+ " \"name\": \"test\"," | ||
+ " \"fields\": [" | ||
+ " {\n" | ||
+ " \"name\": \"id\"," | ||
+ " \"type\": \"int\"" | ||
+ " }," | ||
+ " {" | ||
+ " \"name\": \"timestamp\"," | ||
+ " \"type\": \"string\"" | ||
+ " }" | ||
+ " ]" | ||
+ "}"; | ||
|
||
private final String schemaMaxInnerFieldDepthIs2 = "{" | ||
+ " \"type\": \"record\"," | ||
+ " \"name\": \"nested\"," | ||
+ " \"fields\": [" | ||
+ " {" | ||
+ " \"name\": \"nestedId\"," | ||
+ " \"type\": {\n" | ||
+ " \"type\": \"array\"," | ||
+ " \"items\": \"string\"" | ||
+ " }" | ||
+ " }," | ||
+ " {" | ||
+ " \"name\": \"timestamp\"," | ||
+ " \"type\": \"string\"" | ||
+ " }" | ||
+ " ]" | ||
+ "}"; | ||
|
||
private final String schemaWithRecursiveRef = "{" | ||
+ " \"type\": \"record\"," | ||
+ " \"name\": \"TreeNode\"," | ||
+ " \"fields\": [" | ||
+ " {" | ||
+ " \"name\": \"value\"," | ||
+ " \"type\": \"int\"" | ||
+ " }," | ||
+ " {" | ||
+ " \"name\": \"children\"," | ||
+ " \"type\": {" | ||
+ " \"type\": \"array\"," | ||
+ " \"items\": \"TreeNode\"" | ||
+ " }" | ||
+ " }" | ||
+ " ]" | ||
+ "}"; | ||
private final Map<String, Schema> topicToSchema; | ||
|
||
public TestKafkaSchemaRegistry(Properties props) { | ||
topicToSchema = ImmutableMap.of( | ||
"topic1", new Schema.Parser().parse(schemaMaxInnerFieldDepthIs1), | ||
"topic2", new Schema.Parser().parse(schemaMaxInnerFieldDepthIs2), | ||
"topic3", new Schema.Parser().parse(schemaWithRecursiveRef)); | ||
} | ||
@Override | ||
public Schema getLatestSchema(String topicName) { | ||
return topicToSchema.get(topicName); | ||
} | ||
|
||
@Override | ||
public MD5Digest register(String name, Schema schema) { | ||
return null; | ||
} | ||
|
||
@Override | ||
public Schema getById(MD5Digest id) { | ||
return null; | ||
} | ||
|
||
@Override | ||
public boolean hasInternalCache() { | ||
return false; | ||
} | ||
} | ||
|
||
// A KafkaSchemaRegistry class that always fail to get latest schema. | ||
public static class BadKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Schema> { | ||
public BadKafkaSchemaRegistry(Properties props) { | ||
} | ||
|
||
@Override | ||
public Schema getLatestSchema(String name) throws IOException, SchemaRegistryException { | ||
throw new SchemaRegistryException("Exception in getLatestSchema()"); | ||
} | ||
|
||
@Override | ||
public MD5Digest register(String name, Schema schema) throws IOException, SchemaRegistryException { | ||
return null; | ||
} | ||
|
||
@Override | ||
public Schema getById(MD5Digest id) throws IOException, SchemaRegistryException { | ||
return null; | ||
} | ||
|
||
@Override | ||
public boolean hasInternalCache() { | ||
return false; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters