Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parquet simple record as a source for the druid spark indexer #111

Closed
wants to merge 6 commits into from

Conversation

Igosuki
Copy link

@Igosuki Igosuki commented Oct 3, 2017

Probably can't be merged right away.

Works fine with the parquet file I included (mapped the csv from test resources into a parquet file). The strategy is to directly map to json without going through avro as loading generic data made it more complicated than not for me.

I'm going to run production tests soon (vs a few hundreds of gigs of data).

@drcrallen
Copy link
Contributor

For future maintainability, there is already a parquet input row parser in druid: https://github.com/druid-io/druid/blob/master/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java is there any reason that cannot be used?

It would be nice if the row parser (or factory) could be passed in and eliminate the giant switch statement which is not really sustainable.

Also regarding #10 it might make sense to have an RDD passed in the index method in the future, though I don't think such a refactoring would be in scope for a specific new format.

Getting rid of the giant switch statement should be investigated more though.

@drcrallen
Copy link
Contributor

test failure looks like a guava version problem:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 58, localhost, executor driver): java.lang.NoSuchMethodError: com.google.common.io.Files.asByteSource(Ljava/io/File;)Lcom/google/common/io/ByteSource;

Guava version reconciliation is always a huge issue.

@Igosuki
Copy link
Author

Igosuki commented Oct 4, 2017

Looks like checks have passed.
A few things :

  • This uses the simplest read api from parquet which I copied over because parquet-tools which this api belongs to imports a very old version of guava
  • I didn't add push down filters as an option, which can be very effective while doing parquet
  • I fixed the druid version to 0.10.1 but we can revert that to snapshot
  • I am unsure how hadoopFileAPI works with Spark/Parquet over a directory, meaning if it splits the ParquetFileReader workload properly or if it's a bottleneck.

@Gauravshah
Copy link

how would we specify the nested columns ?
also would it respect partition column ?

@Igosuki
Copy link
Author

Igosuki commented Oct 4, 2017 via email

@Gauravshah
Copy link

I feel going down dataframe route might be better.
Current way there are two transforms needed, Parquet to Json to Map<string,Object>
With dataframe router we could do Parquet to Map<String,Object>
Would allow us to do other transforms also would make use of parquet pruning features from spark

@Igosuki
Copy link
Author

Igosuki commented Oct 4, 2017

Sure, why not, let me have a look

Copy link

@xanec xanec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should not commit binary file (i.e., lineitems.parquet)? Could we generate this on-the-fly instead?

@@ -143,7 +150,8 @@ resolvers += "JitPack.IO" at "https://jitpack.io"
publishMavenStyle := true

//TODO: remove this before moving to druid.io
publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local")
//publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local")
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository")))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just putting a note here as a reminder to correct this before merge.

s"[${x.getClass.getCanonicalName}]. " +
"Hoping it can handle string input"
val baseData =
dataSchema.getDelegate.getParser match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of repeated code (i.e., for parquet case and text case). I suggest that we can have some refactoring to combine them.

return values.toString();
}

public void prettyPrint() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask if prettyPrint is used (i.e., in production)? Or if it is only used for testing/development? If it is the latter, then perhaps we should not include it in the class.

And also, if we are to remove it, then a lot of the methods/classes can also be removed.

@xanec
Copy link

xanec commented Oct 5, 2017

Please correct me if I am wrong but I think that the parser need not be "simple" in that the parsing can produce a POJO for our manipulation. Hence, it is up to us to decide what is the record that will be produced and how are we going to map it into druid.

@Igosuki
Copy link
Author

Igosuki commented Oct 27, 2017

@xanec You're right, I've had other things to patch on our end on Druid, I'm getting back to this as soon as I can to refactor it.

Copy over generic parquet read api to prevent importing the whole parquet-tools dependency
Revert "Copy over generic parquet read api to prevent importing the whole parquet-tools dependency"

This reverts commit e845195369a59a5aa0121e80b2bae7b3edbab395.
@Igosuki
Copy link
Author

Igosuki commented Dec 12, 2017

I rebased on 0.11 to use the transform and flatten APIs, but I had to copy maps in the parser because AbstractMaps in the ObjectFlatteners API aren't directly serializable by Kryo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants