From 4a2086447ff1b81c7b0afa345d1468c3fd7ed2e9 Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Mon, 19 Jun 2017 11:53:53 +0200 Subject: [PATCH 1/6] Implement parquet simple record as a source for the druid spark indexer --- build.sbt | 11 +- .../spark/parquet/ParquetInputRowParser.java | 45 ++++++ .../tools/read/SimpleParquetRecord.java | 15 ++ .../indexer/spark/SparkDruidIndexer.scala | 146 ++++++++++++------ .../spark/SparkDruidIndexerModule.scala | 6 + .../index_spark_2.10_parquet_spec.json | 136 ++++++++++++++++ src/test/resources/lineitems.parquet | Bin 0 -> 9606 bytes .../spark/TestScalaBatchIndexTask.scala | 6 +- .../indexer/spark/TestSparkDruidIndexer.scala | 142 ++++++++++++++++- 9 files changed, 454 insertions(+), 53 deletions(-) create mode 100644 src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java create mode 100644 src/test/resources/index_spark_2.10_parquet_spec.json create mode 100644 src/test/resources/lineitems.parquet diff --git a/build.sbt b/build.sbt index 8ff15ae..c5bd75b 100644 --- a/build.sbt +++ b/build.sbt @@ -30,6 +30,13 @@ val hadoop_version = "2.7.3" val spark_version = "2.1.0" val guava_version = "16.0.1" val mesos_version = "0.25.0" +val parquet_version = "1.8.2" + +libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version +libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version +libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version +libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version +libraryDependencies += "org.apache.parquet" % "parquet-tools" % parquet_version val sparkDep = ("org.apache.spark" %% "spark-core" % spark_version exclude("org.roaringbitmap", "RoaringBitmap") @@ -99,6 +106,7 @@ libraryDependencies += "io.druid" % "druid-processing" % druid_version % "provid libraryDependencies += "io.druid" % "druid-server" % druid_version % "provided" libraryDependencies += "io.druid" % "druid-indexing-service" % druid_version % "provided" libraryDependencies += "io.druid" % "druid-indexing-hadoop" % druid_version % "provided" + libraryDependencies += "org.joda" % "joda-convert" % "1.8.1" % "provided" // Prevents intellij silliness and sbt warnings libraryDependencies += "com.google.guava" % "guava" % guava_version % "provided"// Prevents serde problems for guice exceptions @@ -143,7 +151,8 @@ resolvers += "JitPack.IO" at "https://jitpack.io" publishMavenStyle := true //TODO: remove this before moving to druid.io -publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local") +//publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local") +publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository"))) pomIncludeRepository := { _ => false } pomExtra := ( diff --git a/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java new file mode 100644 index 0000000..79dd655 --- /dev/null +++ b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java @@ -0,0 +1,45 @@ +package io.druid.indexer.spark.parquet; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.ParseSpec; +import org.apache.parquet.tools.read.SimpleParquetRecord; +import org.apache.parquet.tools.read.SimpleRecord; + +public class ParquetInputRowParser implements InputRowParser { + private final ParseSpec parseSpec; + private final MapInputRowParser mapParser; + + @JsonCreator + public ParquetInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec + ) + { + this.parseSpec = parseSpec; + this.mapParser = new MapInputRowParser(this.parseSpec); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @Override + public ParquetInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new ParquetInputRowParser(parseSpec); + } + + @Override + public InputRow parse(SimpleRecord input) + { + // We should really create a ParquetBasedInputRow that does not need an intermediate map but accesses + // the SimpleRecord directly... + return mapParser.parse(SimpleParquetRecord.toJson(input)); + } +} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java new file mode 100644 index 0000000..49bbcc3 --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java @@ -0,0 +1,15 @@ +package org.apache.parquet.tools.read; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class SimpleParquetRecord extends SimpleRecord { + public static Map toJson(SimpleRecord record) { + Map result = Maps.newLinkedHashMap(); + for (NameValue value : record.getValues()) { + result.put(value.getName(), toJsonValue(value.getValue())); + } + return result; + } +} diff --git a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala index c5e16b6..1897985 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala @@ -36,6 +36,7 @@ import io.druid.data.input.impl._ import io.druid.guice.annotations.{Json, Self} import io.druid.guice.{GuiceInjectors, JsonConfigProvider} import io.druid.indexer.HadoopyStringInputRowParser +import io.druid.indexer.spark.parquet.ParquetInputRowParser import io.druid.initialization.Initialization import io.druid.java.util.common.{IAE, ISE} import io.druid.java.util.common.granularity.Granularity @@ -53,6 +54,9 @@ import io.druid.timeline.partition.{HashBasedNumberedShardSpec, NoneShardSpec, S import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.Job +import org.apache.parquet.hadoop.ParquetInputFormat +import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{AccumulableInfo, SparkListener, SparkListenerApplicationEnd, SparkListenerStageCompleted} import org.apache.spark.storage.StorageLevel @@ -130,56 +134,104 @@ object SparkDruidIndexer { logInfo(s"Splitting [$totalGZSize] gz bytes into [$startingPartitions] partitions") + // Hadoopify the data so it doesn't look so silly in Spark's DAG - val baseData = sc.textFile(dataFiles mkString ",") - // Input data is probably in gigantic files, so redistribute - .filter( - s => { - val row = dataSchema.getDelegate.getParser match { - case x: StringInputRowParser => x.parse(s) - case x: HadoopyStringInputRowParser => x.parse(s) - case x => - logTrace( - "Could not figure out how to handle class " + - s"[${x.getClass.getCanonicalName}]. " + - "Hoping it can handle string input" + val baseData = + dataSchema.getDelegate.getParser match { + case x: ParquetInputRowParser => + val job = new Job() + ParquetInputFormat.setReadSupportClass(job, classOf[SimpleReadSupport]) + sc.newAPIHadoopFile(dataFiles.mkString(","), classOf[ParquetInputFormat[SimpleRecord]], + classOf[Void], classOf[SimpleRecord], job.getConfiguration).values.filter( + s => { + val row = dataSchema.getDelegate.getParser match { + case x: ParquetInputRowParser => x.parse(s) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + x.asInstanceOf[InputRowParser[Any]].parse(s) + } + passableIntervals.exists(_.contains(row.getTimestamp)) + }).repartition(startingPartitions) + // Persist the strings only rather than the event map + // We have to do the parsing twice this way, but serde of the map is killer as well + .persist(StorageLevel.DISK_ONLY) + .mapPartitions( + it => { + val i = dataSchema.getDelegate.getParser match { + case x: ParquetInputRowParser => it.map(x.parse) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + it.map(x.asInstanceOf[InputRowParser[Any]].parse) + } + val queryGran = dataSchema.getDelegate.getGranularitySpec.getQueryGranularity + val segmentGran = dataSchema.getDelegate.getGranularitySpec.getSegmentGranularity + i.map( + r => { + var k: Long = queryGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + if (k < 0) { + // Example: AllGranularity + k = segmentGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + } + k -> r.asInstanceOf[MapBasedInputRow].getEvent + } + ) + } ) - x.asInstanceOf[InputRowParser[Any]].parse(s) - } - passableIntervals.exists(_.contains(row.getTimestamp)) - } - ) - .repartition(startingPartitions) - // Persist the strings only rather than the event map - // We have to do the parsing twice this way, but serde of the map is killer as well - .persist(StorageLevel.DISK_ONLY) - .mapPartitions( - it => { - val i = dataSchema.getDelegate.getParser match { - case x: StringInputRowParser => it.map(x.parse) - case x: HadoopyStringInputRowParser => it.map(x.parse) - case x => - logTrace( - "Could not figure out how to handle class " + - s"[${x.getClass.getCanonicalName}]. " + - "Hoping it can handle string input" - ) - it.map(x.asInstanceOf[InputRowParser[Any]].parse) - } - val queryGran = dataSchema.getDelegate.getGranularitySpec.getQueryGranularity - val segmentGran = dataSchema.getDelegate.getGranularitySpec.getSegmentGranularity - i.map( - r => { - var k: Long = queryGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis - if (k < 0) { - // Example: AllGranularity - k = segmentGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + case x => + sc.textFile(dataFiles mkString ",").filter( + s => { + val row = dataSchema.getDelegate.getParser match { + case x: StringInputRowParser => x.parse(s) + case x: HadoopyStringInputRowParser => x.parse(s) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + x.asInstanceOf[InputRowParser[Any]].parse(s) } - k -> r.asInstanceOf[MapBasedInputRow].getEvent - } - ) - } - ) + passableIntervals.exists(_.contains(row.getTimestamp)) + }).repartition(startingPartitions) + // Persist the strings only rather than the event map + // We have to do the parsing twice this way, but serde of the map is killer as well + .persist(StorageLevel.DISK_ONLY) + .mapPartitions( + it => { + val i = dataSchema.getDelegate.getParser match { + case x: StringInputRowParser => it.map(x.parse) + case x: HadoopyStringInputRowParser => it.map(x.parse) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + it.map(x.asInstanceOf[InputRowParser[Any]].parse) + } + val queryGran = dataSchema.getDelegate.getGranularitySpec.getQueryGranularity + val segmentGran = dataSchema.getDelegate.getGranularitySpec.getSegmentGranularity + i.map( + r => { + var k: Long = queryGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + if (k < 0) { + // Example: AllGranularity + k = segmentGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + } + k -> r.asInstanceOf[MapBasedInputRow].getEvent + } + ) + } + ) + } logInfo("Starting uniqes") val optionalDims: Option[Set[String]] = if (dataSchema.getDelegate.getParser.getParseSpec.getDimensionsSpec.hasCustomDimensions) { diff --git a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala index 5c7176f..ddc1d8e 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala @@ -25,6 +25,9 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.google.inject.Binder import io.druid.initialization.DruidModule import java.util + +import io.druid.indexer.spark.parquet.ParquetInputRowParser + import scala.collection.JavaConverters._ class SparkDruidIndexerModule extends DruidModule @@ -32,6 +35,9 @@ class SparkDruidIndexerModule extends DruidModule override def getJacksonModules: util.List[_ <: Module] = { List( new SimpleModule("SparkDruidIndexer") + .registerSubtypes( + new NamedType(classOf[ParquetInputRowParser], "parquet") + ) .registerSubtypes( new NamedType(classOf[SparkBatchIndexTask], "index_spark") ) diff --git a/src/test/resources/index_spark_2.10_parquet_spec.json b/src/test/resources/index_spark_2.10_parquet_spec.json new file mode 100644 index 0000000..a6a22a9 --- /dev/null +++ b/src/test/resources/index_spark_2.10_parquet_spec.json @@ -0,0 +1,136 @@ +{ + "type": "index_spark", + "id": "taskId", + "dataSchema": { + "dataSource": "defaultDataSource", + "parser": { + "type": "string", + "parseSpec": { + "format": "parquet", + "timestampSpec": { + "column": "l_shipdate", + "format": "yyyy-MM-dd", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_returnflag", + "l_linestatus", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "dimensionExclusions": [ + "l_commitdate", + "l_receiptdate", + "l_tax", + "l_quantity", + "count", + "l_extendedprice", + "l_shipdate", + "l_discount" + ], + "spatialDimensions": [] + }, + "delimiter": "|", + "listDelimiter": ",", + "columns": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ] + }, + "encoding": "UTF-8" + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "longSum", + "name": "L_QUANTITY_longSum", + "fieldName": "l_quantity" + }, + { + "type": "doubleSum", + "name": "L_EXTENDEDPRICE_doubleSum", + "fieldName": "l_extendedprice" + }, + { + "type": "doubleSum", + "name": "L_DISCOUNT_doubleSum", + "fieldName": "l_discount" + }, + { + "type": "doubleSum", + "name": "L_TAX_doubleSum", + "fieldName": "l_tax" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "YEAR", + "queryGranularity": { + "type": "duration", + "duration": 86400000, + "origin": "1970-01-01T00:00:00.000Z" + }, + "intervals": [ + "1992-01-01T00:00:00.000Z/1999-01-01T00:00:00.000Z" + ] + } + }, + "intervals": [ + "1992-01-01T00:00:00.000Z/1999-01-01T00:00:00.000Z" + ], + "paths": [ + "file:/someFile" + ], + "targetPartitionSize": 8139, + "maxRowsInMemory": 389, + "properties": { + "some.property": "someValue", + "java.util.logging.manager": "org.apache.logging.log4j.jul.LogManager", + "user.timezone": "UTC", + "org.jboss.logging.provider": "log4j2", + "file.encoding": "UTF-8", + "druid.processing.columnCache.sizeBytes": "1000000000" + }, + "master": "local[999]", + "context": {}, + "indexSpec": { + "bitmap": { + "type": "concise" + }, + "dimensionCompression": null, + "metricCompression": null + }, + "classpathPrefix": "somePrefix.jar", + "hadoopDependencyCoordinates": [ + "org.apache.spark:spark-core_2.10:1.6.1-mmx0" + ], + "groupId": "taskId", + "dataSource": "defaultDataSource", + "resource": { + "availabilityGroup": "taskId", + "requiredCapacity": 1 + } +} diff --git a/src/test/resources/lineitems.parquet b/src/test/resources/lineitems.parquet new file mode 100644 index 0000000000000000000000000000000000000000..84becd943f1c4cb5c180ed482c6d38f32ccfa30c GIT binary patch literal 9606 zcma)i3wTu3)%M;nJDdY&$Q;as6HIVMn3&XLIQL5kG=l^aB_Tw>Xcd?wC&}QDgvkVh zVgIs|sG@}y5HBbyB3i1!B2bHnN-bKn+Dc2keO}OyTGanN2@>G<|DP|b0#&hS|vz=xAVb${fdHnDXueuT&~p5u_dZx6a4M=74^s0UP>vxloTUG zrNfElUFf(lOk9{WlUQ>Z$?69hOyFP%wId>L%WTzr{qB>!{o-ewYU7WK@e zFx#Yhm$cNM!LGb6gRs<{814aw&ywlSF+&tKGKDXP?FP+mkju8WmveH2;%^QPE*f;Y z5zH}7-H<;9M#~* z2MkNqY>2j1OV(L_z`?wNdtFydg?&UUeu`ilw!(%93TF7H1Vz(KS`;v4*vWn>XqIlW zzX_V=DE!YjTQegU3fMv68v~|e=z2Ke(+&Qwf~hz_wlZ@)6E94L%}ZU1R$o&~PGe3x;ke z>{-FlHIsjqGaLgxH5{|p5j0yd=wzsE+w8vu#ZVNeVQIR}$^=z0HTEK8D31SDPBUfZ z3bth^k(&h_2jkBb;Choz;VjwoU&6t6EXrxR!}Hk5Qu(_$)$(-lz)-j=pgAghz09Y| zg^M^@SBk&qG)pb~37llek&xi1vc+ZyifzbjnPBRQ%1;Y8CZND3;y`fLuL`<>W6B4I zq7`NehG`g)uYHQc4+)rtqnLDvpy|5CyBu&>_`D!%IL3iK$Cz_IQNu5XdLXD7np1cm z{Ha@!H#prf3Ks+{%XAz%4R)ytKT4M!OV>P<>N;!#(zgl0;_c@NONj^oE&7PRO zFT*|wa}UiHxERN2ib643GUEi@)HE7^hc%s!;!MqEnNXaa$0?S>0)lDiGL@jcrBe<* z!0C-b2oDtyQ7RqE8Sp~D12y~UNI0@51~r2Y_o<2)^~n|uaF%7Ts|7&Rp#h&J^H=+T zso9WkFo^?fpoC=szMVc@XIH~RCjW-d(%CBDQe`)Ax}vZdh#rGoi|sUaKL;DxM*^_x z(2I%9ujdp;WkV4-7PB}ND4N0Py5Z>sZ-qF^1SWkr{|7uI-pF}?-R(0JzQCv0{JTJ* zZ1Bxsb!?TtoKrN5e&REfb8NYIYqH)nE+fG@SLr{&=o}GSNPzui+!q9mIcMDXY&^sGLHTehOF}4`4 zG1&{@TWHAVs8wbDPdOFPt3d+fe}$8+Z1*`z;bzEpKx-j(cX{r53@*nK&UP8U0&vGc z!7n>KG}>A|;t*>GM4+ zbGt~%2$GYTGq7Hv^;eR&zak|oY5fe6b6?ItwxK}r%8lv`8dSQM58lMiP|1<{DZNSk z6`y<(?OsJtESe!9?cvgD-9eO?+#5ceimA{%jQ+PRl#HZWz2)~#pkwsAPI-J5pc0Q~a#MWTUD87Y# zK-9==-eGrOd!djPpT`ejiwdt$W2DILspdTn)8XF(LxL|3KHa|q9Me{!sxAR>hNDN7 z-psOzuQ=&z&O08|89S-cbqNa>X)piUcYU)goR9n@f&{)XQ9aEK~ zye#Ru3CTJu&GmxOj>@P6l4&_+RJW)Q(j427qqaSOb_Gnuj#_L~uBJM+8AUjbK;^YG z&3}!AtPe7Wzd1`oorxOkGRcrpDx(gQB@_=8^lW4STd zqqI0to4p`uiVf8rb_*C$pD;JYhcfo4v4TF$8ZB?9{<)GL_O?Ey4kq%-l0WXH&bhOS2S$S}#WWt^79q=2ofhFZK$GBnFpv2R5{*EC!63liE4 z9p>^93ae%pUnikvm~eD?Cx>1k=0G3{FO+rIrT}^($MA1Lde#i(0SO%nT;-AA76+h5 zzK~3>Oh?V~s|8clWdD5g41WSB-4~lk2(YK7E2SEEzi|uMc1R29NsUe z=!5*f6Aa7J=!DqM0_d%5$Nzp7JPPBng7!@=b_0NwY!=@w8LFijaNISVVOw+qyBWQg z?P#<6qi<2HuuE~0IPqxZ>*%4dv*OEAEyV&{?v(&87+})nl47cs1+zB?PzXUh5Z}6G z`437O>H-cXlJ8+q#&H(bXIUDMr?Sym271q^73mM?>d5gnE@c!t4 zVcWV=6_juWfUe1Az>$`zRE3eH!?}oJNkZ$YJ0PwIKvl!?OTdF>0h*BqDo{<;sHOLW zqNV`3wv1TbF9hH2e|jwYq0I2yxBxP?UlK#az7$Ju{bhGRC$OQBsJw022}>WY3)# zkBXb1#@N-wbKA@w2Q3h5PPsva8&KBYJhQCVN;KAkh%?K2^&)hewR8mPT#OE6EGLu2 zSSIVK>n!I}M%aWL#%AoUAvrNNaA}ONL`gBJ<@MCDo|=S7P-_kY~*uNUA}Ul@qI; zK(b<9$%*wUvh++vCP{R{k0>(9Q)B`iQI9QS)x$~8=HE>v{M3>OWa%9wN6jh5wmHL} z97u95-93=ZO_1d^r1!`_=1-3SjvU^+{|zW0ab)enZ-0ZUld&-IWI-bf9-G&VX;M*~ zwG++TmhAs}0S)sli^id+TvPHIdP-r;8YUGT-t*~hk8W?(i7lWjy<@`TpnG)Zdaw7a z*@N-GtKXpEuK380@$eY55kk?w+O|=d+xyV@Xnar9zOWVDx2Fae$A7$vb#c3H-NW8te|Ie|$ zW%6^sr6F-O=L@{g)&ds?gt~PPzk%MhW=XxL%jW$Uk1g@DC0x?E-DoqU7CE!b3n}?2TK#RcC_^A zoE5k}RCgHTk`YJ2&y7FuG?j(5OWu1HUMkzZ;kVFv;*i;p^TAt}J%MM0HTlzVQ})0w z_Tpy2x_?Ewg;0sR2RB{keFQnjKU@K)@162jNK1d;`Wm!ZwXYm32frWauyY4Ak#^r|Y!E)|15XhAltSq-V zed8T$6E{9~2=k>cyoGVcmpVge2}j30`zoyvuG)If0jNK7`o8^8f9;CjW0%HXT?)eE z?|hGa_gz-U+PU#<*RTaby6V-3VezgU|0AG1`uw9%bro6k3=P;d%QK(xT(;vM>%H$@ zgdRQ%p$AtjI*Cw8zj8gu&PM3=QGb2GlhSp!((P$;eGN45poybB2mo*&x4py)iVi-u zu?L`6eboeja>t*}fg(q5zV&hVti5F}bid^ch_iRAnz2PlLGhmTj2Z zYM!$V2=&y!90CJ2-p<9qvt-XsjEC;>Z0AnoVtn*ZLopVu+X224hr4#*@n<`o_`4`a zxRvJ?W3#RUQ{e9{NBiS?&5A6xTHMR+>eO&|`>|KRH02NPzYb4()?>5zj}wTiQ^W|z zCD(^=&AD%|LtO5mzv0>o5Q*!4;vH}w`1@=eZRmrKHh{ad{i2QF-us=4xf9=B{}OB! zWFb-~eEi3ccHw6JA3_XSlmmEK0k>ZI8ai#i_RdIIQL^HjXFad(o3Qseh$stR1*Njy zdd`FB{3{k?*Uw76<7>E?v-VJlD;rRX8T->Op~vydCO{7$7khFKzI6bB*8Sz*!TrJN zn_mMrz=L&rV;RQdgYNPq0kWJB*u8%%Xn|8q&0jnW1zgdxN)XwZ|MMz{8qK>gbyZ%K3a=l0^&wbVAgr14`BPZy zdK>{PEa`stVQ932yadAT*m#V-{*%j>*yHtz9%N=2?{9+>dXf3fS;(CE+!rU2nT!qn zF=Xx-u&LYgVZqEXUhZsZSm))=1?9ho{_1-|x@iX?1W%D*eJnq|q5U6;~)1i!9@dP$pedJx-OcC!!@)Qm&YTM)O zF>@9q9q`QoHnXn%C$2qg&J@jxUEmQ@WayyUs#$wr*brk_p)l{|7BIEp6D z!%XP-bR~1&{#BTAOnVtAl-sv0j`gkm-tcsI;K}{4;n1aa4>ukG@uAcSFNM}UczF*{_esX~UZ6fK1-JQ5FYT;3$brlhn>IA7RC#?#3v7CriwX;ZdHF+S5*n>cJ+J-N-c(m=L zLi%9J~M0Bt=cReW?PQ*)oeLX2D^WKz*lRo_= z;eS~>j)*f#=91bXQvEt9u`w@lwH4%>AQ7*c&XL*~WO^~W!s^3B%uP)u)eb2MQZ!=X zRZB1QUa~+`>VAS9JdWpTFDBI=k>yYPU+NW^u;P!h-Q|Q5Z{DJ;w=9h zYS9YHQJHel#|aIHAu8q7i06*GnTR?5209?`PCA^<@(<0M9IT}0k_m+mi}OK0)L&V0 z8wr-stLcS#)2LZeMd*!}{DjbR=oMHl$0rb+UNVjNZ>QCH!$i~HMF;o`#VJ&w`65Rf z=>`6cG?Si3xxArtY;fr;o`|EtrRUujG$qeHP#wf4fyp_w<8*AN_I9i zCBWp)bz8B7Cms@_mrIG(=1@~x7ahr8si#~oVB#sasWX|J=MQGNUFTtfj)vShoo+|E zBMO0S9SLYzpK{|3Eur`b5VW;6cO)9!P+Jq-BZQhdQ~cT@5X50dyrH2jn_e$w9FH`o zpd;zJ?V)&6+D)-#q=EiP&&NV>XijIM;Tl+`7(#oz;Tl%O^nz$8(U_b!1nX&jao*m7 z2(F-isI$ZM&lM+@cC@<<37kiwwIiMCY&bXkF4xxBHg+NPe?vEr_PjcAEIUDlQ@5xi z)ShTfw}tB6c&mRg7jKT!=^TzHd|`Mi%?yD#ooH+A$c$b=3pde967_Eq>-k|JY}OKQ zZcbhhPWuJo7LJG<&q}1)S`!V%k%>{@;wMmPR$CILyIl>QUUx?>XSy$*9P@pyA8Q_N zjHYl+WQL-WD7`Iakfd6G!_MpUH6^Q_p$|^|OKd{SC?CNFlT`U$U%D-PH1>kBCK2P{ z2f9P*m^ZsU-IlJ>XnE8k(J%016uu<4Ine~&8shZJoFl&I`C)}L`;%jK7AsYk&|f50 z`qC{^Gh+Kf(NIU)jVIGB@n!CA2`YL(tCGZ+1Z1^!)~ES-@f*U+=~nvh8*bnLnn)M% z#~zEcCial_*kE5gRasN{XJ2SA{qy86Njls~@?r@#e$4KvF6V}t7G6YupBEBiG^PA8 z5bA7AC+Nmp?3qZqt?9}CH1zo^Cs_{{8r(S zaPGn^lBUm)#KMbn3b=;&z}RvQk+AS}U)l}tDt{yFBT4%Ft;@$e%7x;yV^0`6NL2id zFdUXFE6$2Hq}n<_rA@Q6! zv*_=A$$7IsW;yC+B3`g_Xs}s)Spxo2@s5_w4k?JeL)@vq`RZzZ^5+GW`QiGzAHMr0 z`rydUWIEBFOt`6z@@2PvsEml-$o%(`=p9*PB>%y`M!Ai1{x1Ieq4&Z~#J#mB`r&{q z1Xfd`tN6c3{QcWp^T<~V^$iOPfPA+(1=B)JbMnK(d<_keHTPfO>+Hz+CA8YP=KW*r zo?B&7KS$t@-`1^)M*9UntqhT-PyQ5NBmPlx+Y{+{drSF5k{ULei)@Dj^_|x&ykzo! zDrI_bLqFjhxVy4@^5#fg`Kst@B6uk>{E(1#8`4HOyPsZ2T3_-_P2p_AgYk&mymL!V zyBIfS(fQba;n1qDD<{VG%})K~22R--E%1?s_~OWY(SLJuTFVEs2XgMs0x+YYIpIzI z*~ES2W$}J-moGZ#7sH7Tj{G6A5`E|Hx-Ib(c>WT2L1h>7ocua?Yyq`N*;5%=< zLW&IMkdxP~^u_1J8$M}MZ^@`?BlGBTU!r056D0jgZC)b0g%pO5z5lHEvvpr=*63qH zj_|ze1K0)_os(ZyZXlsyV|e9Q`CY?i@#0$A>aXt1Ci4bg$iGr2_a+kLI5*xA3iiu6 zAy)WOZEb11!3zqJe$v3?jJ8yxo4UrGhs@VA#V2R9$5ZJuXF59D+fSPjk~5NtR=2ft zcDYlQZ0|v{#QB+f#`K_cYSRLFwH)j;A|2yh&fL1syGk_C{bYGNgA3%nfa`XD8C9Px?KH4Q`_S)U0R! z;N%RC`($=Mr3_5*rf0V`o>CcZ@q{AjVCU?;G$%LbL?EBb4GK91;WBbgcV)022(n-1 zvzPYI=Soli4~qOd`}cA5dAgTjeP;H-L7*s=e&pX3^8YdT49=4$(#!dskloFLqEtHj z=imH|QXktH$JWdFB$vIhABak&z907T_YLdgIqg(?IS%pJ-3%n)==b zIpZa7{z0pc6Tj_5%@YJoR2Q{~yj7Jbh3+#j~4&2t21=Vdg(NIydO+1IQ0Z z?d90$&)ziUtiNF;mjB~CF1pgpz=ig z_@jOP*H`u#epbNr%2=I~y`F)rRQg{$b>-hXvrpV3`Q)XTSLsMU0Z1A}Omel}ILwuT7! + segment.getBinaryVersion should be(9) + segment.getDataSource should equal(dataSource) + interval.contains(segment.getInterval) should be(true) + segment.getInterval.contains(interval) should be(false) + segment.getSize should be > 0L + segment.getDimensions.asScala.toSet should equal(Set("l_suppkey")) + segment.getMetrics.asScala.toSet should equal(Set("sum_val")) + val file = new File(segment.getLoadSpec.get("path").toString) + file.exists() should be(true) + val segDir = Files.createTempDirectory(outDir.toPath, "loadableSegment-%s" format segment.getIdentifier).toFile + val copyResult = CompressionUtils.unzip(file, segDir) + copyResult.size should be > 0L + copyResult.getFiles.asScala.map(_.getName).toSet should equal( + Set("00000.smoosh", "meta.smoosh", "version.bin", "factory.json") + ) + val index = StaticIndex.INDEX_IO.loadIndex(segDir) + try { + val qindex = new QueryableIndexIndexableAdapter(index) + qindex.getDimensionNames.asScala.toSet should equal(Set("l_suppkey")) + val dimVal = qindex.getDimValueLookup("l_suppkey").asScala + dimVal should not be 'Empty + dimVal should contain allOf("1883", "1989", "2073") + qindex.getMetricNames.asScala.toSet should equal(Set(aggName)) + qindex.getMetricType(aggName) should equal(aggregatorFactory.getTypeName) + qindex.getNumRows should be(11) + qindex.getRows.asScala.head.getMetrics()(0) should be(1) + index.getDataInterval.getEnd.getMillis should not be JodaUtils.MAX_INSTANT + } + finally { + index.close() + } + } + loadResults(1) match { + case segment => + segment.getBinaryVersion should be(9) + segment.getDataSource should equal(dataSource) + interval.contains(segment.getInterval) should be(true) + segment.getInterval.contains(interval) should be(false) + segment.getSize should be > 0L + segment.getDimensions.asScala.toSet should equal(Set("l_suppkey")) + segment.getMetrics.asScala.toSet should equal(Set("sum_val")) + val file = new File(segment.getLoadSpec.get("path").toString) + file.exists() should be(true) + val segDir = Files.createTempDirectory(outDir.toPath, "loadableSegment-%s" format segment.getIdentifier).toFile + val copyResult = CompressionUtils.unzip(file, segDir) + copyResult.size should be > 0L + copyResult.getFiles.asScala.map(_.getName).toSet should equal( + Set("00000.smoosh", "meta.smoosh", "version.bin", "factory.json") + ) + val index = StaticIndex.INDEX_IO.loadIndex(segDir) + try { + val qindex = new QueryableIndexIndexableAdapter(index) + qindex.getDimensionNames.asScala.toSet should equal(Set("l_suppkey")) + val dimVal = qindex.getDimValueLookup("l_suppkey").asScala + dimVal should not be 'Empty + dimVal should contain allOf("1807", "2150", "5133", "5405") + qindex.getMetricNames.asScala.toSet should equal(Set(aggName)) + qindex.getMetricType(aggName) should equal(aggregatorFactory.getTypeName) + qindex.getNumRows should be(4) + qindex.getRows.asScala.head.getMetrics()(0) should be(1) + index.getDataInterval.getEnd.getMillis should not be JodaUtils.MAX_INSTANT + } + finally { + index.close() + } + } + } + finally { + closer.close() + } + } } object StaticTestSparkDruidIndexer From 4c9b4d60da2535bd35dd7e9905a8ba9231bb92dd Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Tue, 3 Oct 2017 18:59:36 +0200 Subject: [PATCH 2/6] Exclude guava deps Copy over generic parquet read api to prevent importing the whole parquet-tools dependency --- build.sbt | 3 +- project/plugins.sbt | 2 + .../parquet/tools/read/SimpleListRecord.java | 30 ++++ .../tools/read/SimpleListRecordConverter.java | 34 ++++ .../parquet/tools/read/SimpleMapRecord.java | 43 +++++ .../tools/read/SimpleMapRecordConverter.java | 34 ++++ .../parquet/tools/read/SimpleReadSupport.java | 41 +++++ .../parquet/tools/read/SimpleRecord.java | 149 ++++++++++++++++ .../tools/read/SimpleRecordConverter.java | 159 ++++++++++++++++++ .../tools/read/SimpleRecordMaterializer.java | 42 +++++ 10 files changed, 535 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleRecord.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java create mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java diff --git a/build.sbt b/build.sbt index c5bd75b..7c5d75a 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,6 @@ libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version -libraryDependencies += "org.apache.parquet" % "parquet-tools" % parquet_version val sparkDep = ("org.apache.spark" %% "spark-core" % spark_version exclude("org.roaringbitmap", "RoaringBitmap") @@ -172,5 +171,5 @@ pomExtra := ( testOptions += Tests.Argument(TestFrameworks.JUnit, "-Duser.timezone=UTC") // WTF SBT? -javaOptions in Test += "-Duser.timezone=UTC" +javaOptions in Test += "-Duser.timezone=UTC" fork in Test := true diff --git a/project/plugins.sbt b/project/plugins.sbt index 652b260..6ac98bb 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,3 +4,5 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") + diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java new file mode 100644 index 0000000..1c2a96c --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +public class SimpleListRecord extends SimpleRecord { + @Override + protected Object toJsonObject() { + Object[] result = new Object[values.size()]; + for (int i = 0; i < values.size(); i++) { + result[i] = toJsonValue(values.get(i).getValue()); + } + return result; + } +} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java b/src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java new file mode 100644 index 0000000..1f09a31 --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import org.apache.parquet.schema.GroupType; + +public class SimpleListRecordConverter extends SimpleRecordConverter { + + public SimpleListRecordConverter(GroupType schema, String name, SimpleRecordConverter parent) { + super(schema, name, parent); + } + + @Override + public void start() { + record = new SimpleListRecord(); + } + +} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java new file mode 100644 index 0000000..9b9243e --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class SimpleMapRecord extends SimpleRecord { + @Override + protected Object toJsonObject() { + Map result = Maps.newLinkedHashMap(); + for (NameValue value : values) { + String key = null; + Object val = null; + for (NameValue kv : ((SimpleRecord) value.getValue()).values) { + if (kv.getName().equals("key")) { + key = (String) kv.getValue(); + } else if (kv.getName().equals("value")) { + val = toJsonValue(kv.getValue()); + } + } + result.put(key, val); + } + return result; + } +} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java b/src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java new file mode 100644 index 0000000..e26cc1a --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import org.apache.parquet.schema.GroupType; + +public class SimpleMapRecordConverter extends SimpleRecordConverter { + + public SimpleMapRecordConverter(GroupType schema, String name, SimpleRecordConverter parent) { + super(schema, name, parent); + } + + @Override + public void start() { + record = new SimpleMapRecord(); + } + +} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java b/src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java new file mode 100644 index 0000000..49aeeaf --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +public class SimpleReadSupport extends ReadSupport { + @Override + public RecordMaterializer prepareForRead(Configuration conf, Map metaData, MessageType schema, ReadContext context) { + return new SimpleRecordMaterializer(schema); + } + + @Override + public ReadContext init(InitContext context) { + return new ReadContext(context.getFileSchema()); + } +} + diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleRecord.java new file mode 100644 index 0000000..5f97c88 --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleRecord.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.*; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.BinaryNode; + +public class SimpleRecord { + public static final int TAB_SIZE = 2; + protected final List values; + + public SimpleRecord() { + this.values = new ArrayList(); + } + + public void add(String name, Object value) { + values.add(new NameValue(name,value)); + } + + public List getValues() { + return Collections.unmodifiableList(values); + } + + public String toString() { + return values.toString(); + } + + public void prettyPrint() { + prettyPrint(new PrintWriter(System.out,true)); + } + + public void prettyPrint(PrintWriter out) { + prettyPrint(out, 0); + } + + public void prettyPrint(PrintWriter out, int depth) { + for (NameValue value : values) { + out.print(Strings.repeat(".", depth)); + + out.print(value.getName()); + Object val = value.getValue(); + if (val == null) { + out.print(" = "); + out.print(""); + } else if (byte[].class == val.getClass()) { + out.print(" = "); + out.print(new BinaryNode((byte[]) val).asText()); + } else if (short[].class == val.getClass()) { + out.print(" = "); + out.print(Arrays.toString((short[])val)); + } else if (int[].class == val.getClass()) { + out.print(" = "); + out.print(Arrays.toString((int[])val)); + } else if (long[].class == val.getClass()) { + out.print(" = "); + out.print(Arrays.toString((long[])val)); + } else if (float[].class == val.getClass()) { + out.print(" = "); + out.print(Arrays.toString((float[])val)); + } else if (double[].class == val.getClass()) { + out.print(" = "); + out.print(Arrays.toString((double[])val)); + } else if (boolean[].class == val.getClass()) { + out.print(" = "); + out.print(Arrays.toString((boolean[])val)); + } else if (val.getClass().isArray()) { + out.print(" = "); + out.print(Arrays.deepToString((Object[])val)); + } else if (SimpleRecord.class.isAssignableFrom(val.getClass())) { + out.println(":"); + ((SimpleRecord)val).prettyPrint(out, depth+1); + continue; + } else { + out.print(" = "); + out.print(String.valueOf(val)); + } + + out.println(); + } + } + + public void prettyPrintJson(PrintWriter out) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + out.write(mapper.writeValueAsString(this.toJsonObject())); + } + + protected Object toJsonObject() { + Map result = Maps.newLinkedHashMap(); + for (NameValue value : values) { + result.put(value.getName(), toJsonValue(value.getValue())); + } + return result; + } + + protected static Object toJsonValue(Object val) { + if (SimpleRecord.class.isAssignableFrom(val.getClass())) { + return ((SimpleRecord) val).toJsonObject(); + } else if (byte[].class == val.getClass()) { + return new BinaryNode((byte[]) val); + } else { + return val; + } + } + + public static final class NameValue { + private final String name; + private final Object value; + + public NameValue(String name, Object value) { + this.name = name; + this.value = value; + } + + public String toString() { + return name + ": " + value; + } + + public String getName() { + return name; + } + + public Object getValue() { + return value; + } + } +} + diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java b/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java new file mode 100644 index 0000000..5f78444 --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +/** + * + * + * @author + */ +public class SimpleRecordConverter extends GroupConverter { + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder(); + + private final Converter converters[]; + private final String name; + private final SimpleRecordConverter parent; + protected SimpleRecord record; + + public SimpleRecordConverter(GroupType schema) { + this(schema, null, null); + } + + public SimpleRecordConverter(GroupType schema, String name, SimpleRecordConverter parent) { + this.converters = new Converter[schema.getFieldCount()]; + this.parent = parent; + this.name = name; + + int i = 0; + for (Type field: schema.getFields()) { + converters[i++] = createConverter(field); + } + } + + private Converter createConverter(Type field) { + OriginalType otype = field.getOriginalType(); + + if (field.isPrimitive()) { + if (otype != null) { + switch (otype) { + case MAP: break; + case LIST: break; + case UTF8: return new StringConverter(field.getName()); + case MAP_KEY_VALUE: break; + case ENUM: break; + } + } + + return new SimplePrimitiveConverter(field.getName()); + } + + GroupType groupType = field.asGroupType(); + if (otype != null) { + switch (otype) { + case MAP: return new SimpleMapRecordConverter(groupType, field.getName(), this); + case LIST: return new SimpleListRecordConverter(groupType, field.getName(), this); + } + } + return new SimpleRecordConverter(groupType, field.getName(), this); + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + SimpleRecord getCurrentRecord() { + return record; + } + + @Override + public void start() { + record = new SimpleRecord(); + } + + @Override + public void end() { + if (parent != null) { + parent.getCurrentRecord().add(name, record); + } + } + + private class SimplePrimitiveConverter extends PrimitiveConverter { + protected final String name; + + public SimplePrimitiveConverter(String name) { + this.name = name; + } + + @Override + public void addBinary(Binary value) { + record.add(name, value.getBytes()); + } + + @Override + public void addBoolean(boolean value) { + record.add(name, value); + } + + @Override + public void addDouble(double value) { + record.add(name, value); + } + + @Override + public void addFloat(float value) { + record.add(name, value); + } + + @Override + public void addInt(int value) { + record.add(name, value); + } + + @Override + public void addLong(long value) { + record.add(name, value); + } + } + + private class StringConverter extends SimplePrimitiveConverter { + public StringConverter(String name) { + super(name); + } + + @Override + public void addBinary(Binary value) { + record.add(name, value.toStringUsingUTF8()); + } + } +} + diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java b/src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java new file mode 100644 index 0000000..98c41e8 --- /dev/null +++ b/src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.read; + +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +public class SimpleRecordMaterializer extends RecordMaterializer { + public final SimpleRecordConverter root; + + public SimpleRecordMaterializer(MessageType schema) { + this.root = new SimpleRecordConverter(schema); + } + + @Override + public SimpleRecord getCurrentRecord() { + return root.getCurrentRecord(); + } + + @Override + public GroupConverter getRootConverter() { + return root; + } +} + From d37e617ebade9f000126b9838c586f4d59db88b6 Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Thu, 12 Oct 2017 17:02:19 +0200 Subject: [PATCH 3/6] DC/OS git submit script --- bin/dcos_submit.sh | 121 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 bin/dcos_submit.sh diff --git a/bin/dcos_submit.sh b/bin/dcos_submit.sh new file mode 100644 index 0000000..fd31730 --- /dev/null +++ b/bin/dcos_submit.sh @@ -0,0 +1,121 @@ +#!/usr/bin/shs + +set -x + +# Define target environment +if [ "$environment" = "staging" ]; then SUFFIX="-staging"; fi + + +# SPARK_DCOS_URL the spark service url +# SPARK_MASTER spark master url, including port +# SPARK_ARGS spark arguments, as a json array's content +# SPARK_JARS list of urls where to download jars from +# SPARK_CLASS_NAME class name of the spark app +# SPARK_APP_NAME spark app name for display +# SPARK_DRIVER_CORES driver cores +# SPARK_DRIVER_MEMORY driver memory +# SPARK_CORES_MAX max available cores on mesos +# SPARK_EXECUTOR_MEMORY executor memory +# SPARK_CASSANDRA_HOST cassandra host name +# SPARK_CASSANDRA_USER cassandra user name +# SPARK_CASSANDRA_PASSWORD cassandra password +# SPARK_POSTGRES_HOST postgres host +# SPARK_POSTGRES_USER postgres user +# SPARK_POSTGRES_PASSWORD postgres password + +SPARK_DCOS_URL="mydcosurl" +SPARK_MASTER="mysparkmaster" +SPARK_ARGS="" +SPARK_JARS="myjarurl" +SPARK_CLASS_NAME="$jobclass" +SPARK_APP_NAME="SparkJobs$SUFFIX" +SPARK_DRIVER_CORES="2" +SPARK_DRIVER_MEMORY="1024" +SPARK_CORES_MAX="10" +SPARK_EXECUTOR_MEMORY="2g" +SPARK_CASSANDRA_HOST="xxx" +DCOS_ACS_TOKEN="read acs token" +SPARK_DOCKER_IMAGE="mesosphere/spark:1.1.0-2.1.1-hadoop-2.6" + +if [ "$environment" = "staging" ]; then +SPARK_CASSANDRA_USER="xxx" +SPARK_CASSANDRA_PASSWORD="xxxx" +SPARK_POSTGRES_HOST="xxxx" +SPARK_POSTGRES_USER="xxxx" +SPARK_POSTGRES_PASSWORD="xxxx" +else +SPARK_CASSANDRA_USER="xxx" +SPARK_CASSANDRA_PASSWORD="xxxx" +SPARK_POSTGRES_HOST="xxxx" +SPARK_POSTGRES_USER="xxxx" +SPARK_POSTGRES_PASSWORD="xxxx" +fi + +generate_payload() { + cat <<-EOF +{ + "action" : "CreateSubmissionRequest", + "appArgs" : [ $SPARK_ARGS ], + "appResource" : "$SPARK_JARS", + "clientSparkVersion" : "2.1.1", + "environmentVariables" : { + "SPARK_ENV_LOADED" : "1" + }, + "mainClass" : "$SPARK_CLASS_NAME", + "sparkProperties" : { + "spark.jars" : "$SPARK_JARS", + "spark.driver.supervise" : "false", + "spark.app.name" : "$SPARK_APP_NAME", + "spark.eventLog.enabled": "true", + "spark.submit.deployMode" : "cluster", + "spark.master" : "spark://${SPARK_MASTER}", + "spark.driver.cores" : "${SPARK_DRIVER_CORES}", + "spark.driver.memory" : "${SPARK_DRIVER_MEMORY}", + "spark.cores.max" : "${SPARK_CORES_MAX}", + "spark.executor.memory" : "${SPARK_EXECUTOR_MEMORY}", + "spark.postgres.hostname" : "${SPARK_POSTGRES_HOST}", + "spark.postgres.username" : "${SPARK_POSTGRES_USER}", + "spark.postgres.password" : "${SPARK_POSTGRES_PASSWORD}", + "spark.mesos.executor.docker.image": "${SPARK_DOCKER_IMAGE}", + "spark.driver.supervise": false, + "spark.ssl.noCertVerification": true + } +} +EOF +} + +acsHeader="Authorization: token=$DCOS_ACS_TOKEN" + +submission=$(curl -sS -X POST "${SPARK_DCOS_URL}/v1/submissions/create" --header "$acsHeader" --header "Content-Type:application/json;charset=UTF-8" --data "$(generate_payload)") +submissionSuccess=$(echo $submission | jq -r 'select(.success)') + +if [[ $submissionSuccess ]]; then + SECONDS=0 + + submissionId=$(echo $submission | jq -rj '.submissionId') + # Wait for the job to finish + until test "$(curl --header "$acsHeader" -sS $SPARK_DCOS_URL/v1/submissions/status/$submissionId | jq -r '.driverState')" != "RUNNING"; do sleep 10; echo "Waiting..."; done + lastStatus=$(curl --header "$acsHeader" -sS $SPARK_DCOS_URL/v1/submissions/status/$submissionId | jq -r '.') + echo $lastStatus + taskState=$(echo $lastStatus | jq -r '.message' | grep state | cut -d " " -f 2) + taskMessage=$(echo $lastStatus | jq -r '.message' | grep message | cut -d " " -f 2) + + if [ "$taskState" = "TASK_FINISHED" ] || [ "$taskState" = "TASK_OK" ]; then + echo "Job succeeded." + echo "$taskMessage" + DURATION=$SECONDS + curl -s https://metrics-api.librato.com/v1/metrics -u $LIBRATO_USERNAME:$LIBRATO_TOKEN -H 'Content-Type: application/json' \ + -d "{\"gauges\":{\"jenkins.spark_jobs.$SPARK_APP_NAME.duration.s\":{\"value\":$DURATION,\"source\":\"jenkins\"}}}" + exit 0; + else + echo "Job failed:" + echo "$taskMessage" + # TODO: notify people! + exit 1; + fi +else + echo "Submission failed ! Exiting..." + echo $submission | jq -r '.message' + exit 1 +fi + From 558f3fef8992a324a2d0bbe06f598a240c8c144c Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Thu, 7 Dec 2017 18:40:18 +0100 Subject: [PATCH 4/6] Druid 0.11.0 Revert "Copy over generic parquet read api to prevent importing the whole parquet-tools dependency" This reverts commit e845195369a59a5aa0121e80b2bae7b3edbab395. --- build.sbt | 11 +- .../parquet/tools/read/SimpleListRecord.java | 30 ---- .../tools/read/SimpleListRecordConverter.java | 34 ---- .../parquet/tools/read/SimpleMapRecord.java | 43 ----- .../tools/read/SimpleMapRecordConverter.java | 34 ---- .../parquet/tools/read/SimpleReadSupport.java | 41 ----- .../parquet/tools/read/SimpleRecord.java | 149 ---------------- .../tools/read/SimpleRecordConverter.java | 159 ------------------ .../tools/read/SimpleRecordMaterializer.java | 42 ----- 9 files changed, 6 insertions(+), 537 deletions(-) delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleRecord.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java diff --git a/build.sbt b/build.sbt index 7c5d75a..fc2744c 100644 --- a/build.sbt +++ b/build.sbt @@ -24,7 +24,7 @@ homepage := Some(url("https://github.com/metamx/druid-spark-batch")) crossScalaVersions := Seq("2.11.7", "2.10.6") releaseIgnoreUntrackedFiles := true -val druid_version = "0.11.0-SNAPSHOT" +val druid_version = "0.11.0" // This is just used here for Path, so anything that doesn't break spark should be fine val hadoop_version = "2.7.3" val spark_version = "2.1.0" @@ -32,10 +32,11 @@ val guava_version = "16.0.1" val mesos_version = "0.25.0" val parquet_version = "1.8.2" -libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version -libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version -libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version -libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version +libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-tools" % parquet_version exclude("com.google.guava", "guava") val sparkDep = ("org.apache.spark" %% "spark-core" % spark_version exclude("org.roaringbitmap", "RoaringBitmap") diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java deleted file mode 100644 index 1c2a96c..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleListRecord.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -public class SimpleListRecord extends SimpleRecord { - @Override - protected Object toJsonObject() { - Object[] result = new Object[values.size()]; - for (int i = 0; i < values.size(); i++) { - result[i] = toJsonValue(values.get(i).getValue()); - } - return result; - } -} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java b/src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java deleted file mode 100644 index 1f09a31..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleListRecordConverter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import org.apache.parquet.schema.GroupType; - -public class SimpleListRecordConverter extends SimpleRecordConverter { - - public SimpleListRecordConverter(GroupType schema, String name, SimpleRecordConverter parent) { - super(schema, name, parent); - } - - @Override - public void start() { - record = new SimpleListRecord(); - } - -} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java deleted file mode 100644 index 9b9243e..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleMapRecord.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import com.google.common.collect.Maps; - -import java.util.Map; - -public class SimpleMapRecord extends SimpleRecord { - @Override - protected Object toJsonObject() { - Map result = Maps.newLinkedHashMap(); - for (NameValue value : values) { - String key = null; - Object val = null; - for (NameValue kv : ((SimpleRecord) value.getValue()).values) { - if (kv.getName().equals("key")) { - key = (String) kv.getValue(); - } else if (kv.getName().equals("value")) { - val = toJsonValue(kv.getValue()); - } - } - result.put(key, val); - } - return result; - } -} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java b/src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java deleted file mode 100644 index e26cc1a..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleMapRecordConverter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import org.apache.parquet.schema.GroupType; - -public class SimpleMapRecordConverter extends SimpleRecordConverter { - - public SimpleMapRecordConverter(GroupType schema, String name, SimpleRecordConverter parent) { - super(schema, name, parent); - } - - @Override - public void start() { - record = new SimpleMapRecord(); - } - -} diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java b/src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java deleted file mode 100644 index 49aeeaf..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleReadSupport.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.parquet.hadoop.api.InitContext; -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class SimpleReadSupport extends ReadSupport { - @Override - public RecordMaterializer prepareForRead(Configuration conf, Map metaData, MessageType schema, ReadContext context) { - return new SimpleRecordMaterializer(schema); - } - - @Override - public ReadContext init(InitContext context) { - return new ReadContext(context.getFileSchema()); - } -} - diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleRecord.java deleted file mode 100644 index 5f97c88..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleRecord.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.*; - -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.BinaryNode; - -public class SimpleRecord { - public static final int TAB_SIZE = 2; - protected final List values; - - public SimpleRecord() { - this.values = new ArrayList(); - } - - public void add(String name, Object value) { - values.add(new NameValue(name,value)); - } - - public List getValues() { - return Collections.unmodifiableList(values); - } - - public String toString() { - return values.toString(); - } - - public void prettyPrint() { - prettyPrint(new PrintWriter(System.out,true)); - } - - public void prettyPrint(PrintWriter out) { - prettyPrint(out, 0); - } - - public void prettyPrint(PrintWriter out, int depth) { - for (NameValue value : values) { - out.print(Strings.repeat(".", depth)); - - out.print(value.getName()); - Object val = value.getValue(); - if (val == null) { - out.print(" = "); - out.print(""); - } else if (byte[].class == val.getClass()) { - out.print(" = "); - out.print(new BinaryNode((byte[]) val).asText()); - } else if (short[].class == val.getClass()) { - out.print(" = "); - out.print(Arrays.toString((short[])val)); - } else if (int[].class == val.getClass()) { - out.print(" = "); - out.print(Arrays.toString((int[])val)); - } else if (long[].class == val.getClass()) { - out.print(" = "); - out.print(Arrays.toString((long[])val)); - } else if (float[].class == val.getClass()) { - out.print(" = "); - out.print(Arrays.toString((float[])val)); - } else if (double[].class == val.getClass()) { - out.print(" = "); - out.print(Arrays.toString((double[])val)); - } else if (boolean[].class == val.getClass()) { - out.print(" = "); - out.print(Arrays.toString((boolean[])val)); - } else if (val.getClass().isArray()) { - out.print(" = "); - out.print(Arrays.deepToString((Object[])val)); - } else if (SimpleRecord.class.isAssignableFrom(val.getClass())) { - out.println(":"); - ((SimpleRecord)val).prettyPrint(out, depth+1); - continue; - } else { - out.print(" = "); - out.print(String.valueOf(val)); - } - - out.println(); - } - } - - public void prettyPrintJson(PrintWriter out) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - out.write(mapper.writeValueAsString(this.toJsonObject())); - } - - protected Object toJsonObject() { - Map result = Maps.newLinkedHashMap(); - for (NameValue value : values) { - result.put(value.getName(), toJsonValue(value.getValue())); - } - return result; - } - - protected static Object toJsonValue(Object val) { - if (SimpleRecord.class.isAssignableFrom(val.getClass())) { - return ((SimpleRecord) val).toJsonObject(); - } else if (byte[].class == val.getClass()) { - return new BinaryNode((byte[]) val); - } else { - return val; - } - } - - public static final class NameValue { - private final String name; - private final Object value; - - public NameValue(String name, Object value) { - this.name = name; - this.value = value; - } - - public String toString() { - return name + ": " + value; - } - - public String getName() { - return name; - } - - public Object getValue() { - return value; - } - } -} - diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java b/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java deleted file mode 100644 index 5f78444..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import java.nio.CharBuffer; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.Converter; -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.Type; - -/** - * - * - * @author - */ -public class SimpleRecordConverter extends GroupConverter { - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder(); - - private final Converter converters[]; - private final String name; - private final SimpleRecordConverter parent; - protected SimpleRecord record; - - public SimpleRecordConverter(GroupType schema) { - this(schema, null, null); - } - - public SimpleRecordConverter(GroupType schema, String name, SimpleRecordConverter parent) { - this.converters = new Converter[schema.getFieldCount()]; - this.parent = parent; - this.name = name; - - int i = 0; - for (Type field: schema.getFields()) { - converters[i++] = createConverter(field); - } - } - - private Converter createConverter(Type field) { - OriginalType otype = field.getOriginalType(); - - if (field.isPrimitive()) { - if (otype != null) { - switch (otype) { - case MAP: break; - case LIST: break; - case UTF8: return new StringConverter(field.getName()); - case MAP_KEY_VALUE: break; - case ENUM: break; - } - } - - return new SimplePrimitiveConverter(field.getName()); - } - - GroupType groupType = field.asGroupType(); - if (otype != null) { - switch (otype) { - case MAP: return new SimpleMapRecordConverter(groupType, field.getName(), this); - case LIST: return new SimpleListRecordConverter(groupType, field.getName(), this); - } - } - return new SimpleRecordConverter(groupType, field.getName(), this); - } - - @Override - public Converter getConverter(int fieldIndex) { - return converters[fieldIndex]; - } - - SimpleRecord getCurrentRecord() { - return record; - } - - @Override - public void start() { - record = new SimpleRecord(); - } - - @Override - public void end() { - if (parent != null) { - parent.getCurrentRecord().add(name, record); - } - } - - private class SimplePrimitiveConverter extends PrimitiveConverter { - protected final String name; - - public SimplePrimitiveConverter(String name) { - this.name = name; - } - - @Override - public void addBinary(Binary value) { - record.add(name, value.getBytes()); - } - - @Override - public void addBoolean(boolean value) { - record.add(name, value); - } - - @Override - public void addDouble(double value) { - record.add(name, value); - } - - @Override - public void addFloat(float value) { - record.add(name, value); - } - - @Override - public void addInt(int value) { - record.add(name, value); - } - - @Override - public void addLong(long value) { - record.add(name, value); - } - } - - private class StringConverter extends SimplePrimitiveConverter { - public StringConverter(String name) { - super(name); - } - - @Override - public void addBinary(Binary value) { - record.add(name, value.toStringUsingUTF8()); - } - } -} - diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java b/src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java deleted file mode 100644 index 98c41e8..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleRecordMaterializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.tools.read; - -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class SimpleRecordMaterializer extends RecordMaterializer { - public final SimpleRecordConverter root; - - public SimpleRecordMaterializer(MessageType schema) { - this.root = new SimpleRecordConverter(schema); - } - - @Override - public SimpleRecord getCurrentRecord() { - return root.getCurrentRecord(); - } - - @Override - public GroupConverter getRootConverter() { - return root; - } -} - From ab98e37852909ede231f8cff5735152e03e40f22 Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Fri, 8 Dec 2017 15:11:31 +0100 Subject: [PATCH 5/6] Use druid 0.11.1-SNAPSHOT --- build.sbt | 5 +- .../spark/parquet/ParquetInputRowParser.java | 18 ++--- .../indexer/spark/SparkDruidIndexer.scala | 70 ++++++++++--------- 3 files changed, 50 insertions(+), 43 deletions(-) diff --git a/build.sbt b/build.sbt index fc2744c..3b65e19 100644 --- a/build.sbt +++ b/build.sbt @@ -24,7 +24,7 @@ homepage := Some(url("https://github.com/metamx/druid-spark-batch")) crossScalaVersions := Seq("2.11.7", "2.10.6") releaseIgnoreUntrackedFiles := true -val druid_version = "0.11.0" +val druid_version = "0.11.1-SNAPSHOT" // This is just used here for Path, so anything that doesn't break spark should be fine val hadoop_version = "2.7.3" val spark_version = "2.1.0" @@ -36,7 +36,7 @@ libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version exclude("com.google.guava", "guava") libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version exclude("com.google.guava", "guava") libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version exclude("com.google.guava", "guava") -libraryDependencies += "org.apache.parquet" % "parquet-tools" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-avro" % parquet_version exclude("com.google.guava", "guava") val sparkDep = ("org.apache.spark" %% "spark-core" % spark_version exclude("org.roaringbitmap", "RoaringBitmap") @@ -106,6 +106,7 @@ libraryDependencies += "io.druid" % "druid-processing" % druid_version % "provid libraryDependencies += "io.druid" % "druid-server" % druid_version % "provided" libraryDependencies += "io.druid" % "druid-indexing-service" % druid_version % "provided" libraryDependencies += "io.druid" % "druid-indexing-hadoop" % druid_version % "provided" +libraryDependencies += "io.druid.extensions" % "druid-avro-extensions" % druid_version % "provided" libraryDependencies += "org.joda" % "joda-convert" % "1.8.1" % "provided" // Prevents intellij silliness and sbt warnings diff --git a/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java index 79dd655..f11fb49 100644 --- a/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java +++ b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java @@ -3,15 +3,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.InputRow; +import io.druid.data.input.avro.AvroParsers; import io.druid.data.input.impl.InputRowParser; -import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.ParseSpec; -import org.apache.parquet.tools.read.SimpleParquetRecord; -import org.apache.parquet.tools.read.SimpleRecord; +import io.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.avro.generic.GenericRecord; -public class ParquetInputRowParser implements InputRowParser { +import java.util.List; + +public class ParquetInputRowParser implements InputRowParser { private final ParseSpec parseSpec; - private final MapInputRowParser mapParser; + private final ObjectFlattener avroFlattener; @JsonCreator public ParquetInputRowParser( @@ -19,7 +21,7 @@ public ParquetInputRowParser( ) { this.parseSpec = parseSpec; - this.mapParser = new MapInputRowParser(this.parseSpec); + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); } @JsonProperty @@ -36,10 +38,10 @@ public ParquetInputRowParser withParseSpec(ParseSpec parseSpec) } @Override - public InputRow parse(SimpleRecord input) + public List parseBatch(GenericRecord input) { // We should really create a ParquetBasedInputRow that does not need an intermediate map but accesses // the SimpleRecord directly... - return mapParser.parse(SimpleParquetRecord.toJson(input)); + return AvroParsers.parseGenericRecord(input, parseSpec, avroFlattener); } } diff --git a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala index 1897985..96e1fd8 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala @@ -48,15 +48,17 @@ import io.druid.segment.column.ColumnConfig import io.druid.segment.incremental.{IncrementalIndex, IncrementalIndexSchema} import io.druid.segment.indexing.DataSchema import io.druid.segment.loading.DataSegmentPusher +import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory import io.druid.server.DruidNode import io.druid.timeline.DataSegment import io.druid.timeline.partition.{HashBasedNumberedShardSpec, NoneShardSpec, ShardSpec} +import org.apache.avro.generic.GenericRecord import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job +import org.apache.parquet.avro.AvroReadSupport import org.apache.parquet.hadoop.ParquetInputFormat -import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{AccumulableInfo, SparkListener, SparkListenerApplicationEnd, SparkListenerStageCompleted} import org.apache.spark.storage.StorageLevel @@ -140,9 +142,9 @@ object SparkDruidIndexer { dataSchema.getDelegate.getParser match { case x: ParquetInputRowParser => val job = new Job() - ParquetInputFormat.setReadSupportClass(job, classOf[SimpleReadSupport]) - sc.newAPIHadoopFile(dataFiles.mkString(","), classOf[ParquetInputFormat[SimpleRecord]], - classOf[Void], classOf[SimpleRecord], job.getConfiguration).values.filter( + ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[GenericRecord]]) + sc.newAPIHadoopFile(dataFiles.mkString(","), classOf[ParquetInputFormat[GenericRecord]], + classOf[Void], classOf[GenericRecord], job.getConfiguration).values.filter( s => { val row = dataSchema.getDelegate.getParser match { case x: ParquetInputRowParser => x.parse(s) @@ -373,6 +375,27 @@ object SparkDruidIndexer { incrementalIndex }).map( incIndex => { + val progressIndicator = new ProgressIndicator { + override def stop(): Unit = { + logTrace("Stop") + } + + override def stopSection(s: String): Unit = { + logTrace(s"Stop [$s]") + } + + override def progress(): Unit = { + logTrace("Progress") + } + + override def startSection(s: String): Unit = { + logTrace(s"Start [$s]") + } + + override def start(): Unit = { + logTrace("Start") + } + } val adapter = new QueryableIndexIndexableAdapter( closer.register( StaticIndex.INDEX_IO.loadIndex( @@ -381,7 +404,9 @@ object SparkDruidIndexer { incIndex, timeInterval, tmpPersistDir, - indexSpec_passable.getDelegate + indexSpec_passable.getDelegate, + progressIndicator, + OffHeapMemorySegmentWriteOutMediumFactory.instance() ) ) ) @@ -390,37 +415,13 @@ object SparkDruidIndexer { adapter } ).toList + val file = finalStaticIndexer.merge( indices, true, aggs.map(_.getDelegate), tmpMergeDir, - indexSpec_passable.getDelegate, - new ProgressIndicator { - override def stop(): Unit = { - logTrace("Stop") - } - - override def stopSection(s: String): Unit = { - logTrace(s"Stop [$s]") - } - - override def progress(): Unit = { - logTrace("Progress") - } - - override def startSection(s: String): Unit = { - logTrace(s"Start [$s]") - } - - override def progressSection(s: String, s1: String): Unit = { - logTrace(s"Progress [$s]:[$s1]") - } - - override def start(): Unit = { - logTrace("Start") - } - } + indexSpec_passable.getDelegate ) val allDimensions: util.List[String] = indices .map(_.getDimensionNames) @@ -801,9 +802,12 @@ class DateBucketAndHashPartitioner(@transient var gran: Granularity, } object StaticIndex { - val INDEX_IO = new IndexIO(SerializedJsonStatic.mapper, new ColumnConfig { + val INDEX_IO = new IndexIO( + SerializedJsonStatic.mapper, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + new ColumnConfig { override def columnCacheSizeBytes(): Int = 1000000 }) - val INDEX_MERGER_V9 = new IndexMergerV9(SerializedJsonStatic.mapper, INDEX_IO) + val INDEX_MERGER_V9 = new IndexMergerV9(SerializedJsonStatic.mapper, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()) } From 3624573065292c97444d99f10ad0e13b112b99e7 Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Mon, 11 Dec 2017 17:05:26 +0100 Subject: [PATCH 6/6] Use latest druid and the avro flattener api with parquet --- bin/dcos_submit.sh | 2 +- build.sbt | 8 ++- .../spark/parquet/ParquetInputRowParser.java | 14 ++++- .../tools/read/SimpleParquetRecord.java | 15 ----- .../indexer/spark/SparkBatchIndexTask.scala | 28 ++++----- .../indexer/spark/SparkDruidIndexer.scala | 60 +++++++++---------- .../druid/indexer/spark/TaskIdGenerator.scala | 24 ++++++++ .../spark/TestScalaBatchIndexTask.scala | 4 +- .../indexer/spark/TestSparkDruidIndexer.scala | 57 ++++-------------- 9 files changed, 99 insertions(+), 113 deletions(-) delete mode 100644 src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java create mode 100644 src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala diff --git a/bin/dcos_submit.sh b/bin/dcos_submit.sh index fd31730..dff76c6 100644 --- a/bin/dcos_submit.sh +++ b/bin/dcos_submit.sh @@ -105,7 +105,7 @@ if [[ $submissionSuccess ]]; then echo "$taskMessage" DURATION=$SECONDS curl -s https://metrics-api.librato.com/v1/metrics -u $LIBRATO_USERNAME:$LIBRATO_TOKEN -H 'Content-Type: application/json' \ - -d "{\"gauges\":{\"jenkins.spark_jobs.$SPARK_APP_NAME.duration.s\":{\"value\":$DURATION,\"source\":\"jenkins\"}}}" + -d "{\"gauges\":{\"spark_jobs.$SPARK_APP_NAME.duration.s\":{\"value\":$DURATION,\"source\":\"jenkins\"}}}" exit 0; else echo "Job failed:" diff --git a/build.sbt b/build.sbt index 3b65e19..97834b4 100644 --- a/build.sbt +++ b/build.sbt @@ -28,9 +28,11 @@ val druid_version = "0.11.1-SNAPSHOT" // This is just used here for Path, so anything that doesn't break spark should be fine val hadoop_version = "2.7.3" val spark_version = "2.1.0" -val guava_version = "16.0.1" +val guava_version = "17.0" val mesos_version = "0.25.0" val parquet_version = "1.8.2" +val curator_version = "4.0.0" +val zookeeper_version = "3.4.10" libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version exclude("com.google.guava", "guava") libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version exclude("com.google.guava", "guava") @@ -110,11 +112,13 @@ libraryDependencies += "io.druid.extensions" % "druid-avro-extensions" % druid_v libraryDependencies += "org.joda" % "joda-convert" % "1.8.1" % "provided" // Prevents intellij silliness and sbt warnings -libraryDependencies += "com.google.guava" % "guava" % guava_version % "provided"// Prevents serde problems for guice exceptions +libraryDependencies += "com.google.guava" % "guava" % guava_version libraryDependencies += "com.sun.jersey" % "jersey-servlet" % "1.17.1" % "provided" libraryDependencies += "org.apache.mesos" % "mesos" % mesos_version % "provided" classifier "shaded-protobuf" +libraryDependencies += "org.apache.zookeeper" % "zookeeper" % zookeeper_version + releaseCrossBuild := true assemblyMergeStrategy in assembly := { diff --git a/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java index f11fb49..ed38d64 100644 --- a/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java +++ b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java @@ -2,14 +2,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.avro.AvroParsers; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.ParseSpec; import io.druid.java.util.common.parsers.ObjectFlattener; import org.apache.avro.generic.GenericRecord; -import java.util.List; +import java.util.Map; public class ParquetInputRowParser implements InputRowParser { private final ParseSpec parseSpec; @@ -38,10 +40,16 @@ public ParquetInputRowParser withParseSpec(ParseSpec parseSpec) } @Override - public List parseBatch(GenericRecord input) + public InputRow parse(GenericRecord input) { // We should really create a ParquetBasedInputRow that does not need an intermediate map but accesses // the SimpleRecord directly... - return AvroParsers.parseGenericRecord(input, parseSpec, avroFlattener); + MapBasedInputRow mapBasedInputRow = (MapBasedInputRow) AvroParsers.parseGenericRecord(input, parseSpec, avroFlattener); + Map event = mapBasedInputRow.getEvent(); + Map newMap = Maps.newHashMapWithExpectedSize(event.size()); + for (String key : mapBasedInputRow.getEvent().keySet()) { + newMap.put(key, event.get(key)); + } + return new MapBasedInputRow(mapBasedInputRow.getTimestampFromEpoch(), mapBasedInputRow.getDimensions(), newMap); } } diff --git a/src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java b/src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java deleted file mode 100644 index 49bbcc3..0000000 --- a/src/main/java/org/apache/parquet/tools/read/SimpleParquetRecord.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.parquet.tools.read; - -import com.google.common.collect.Maps; - -import java.util.Map; - -public class SimpleParquetRecord extends SimpleRecord { - public static Map toJson(SimpleRecord record) { - Map result = Maps.newLinkedHashMap(); - for (NameValue value : record.getValues()) { - result.put(value.getName(), toJsonValue(value.getValue())); - } - return result; - } -} diff --git a/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala b/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala index 17a685a..a00afcd 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala @@ -23,14 +23,15 @@ import java.io.{Closeable, File, IOException, PrintWriter} import java.nio.file.Files import java.util import java.util.{Objects, Properties} + import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} import com.google.common.base.{Preconditions, Strings} import com.google.common.collect.Iterables import com.google.common.io.Closer import io.druid.data.input.impl.ParseSpec -import io.druid.indexing.common.{TaskStatus, TaskToolbox} -import io.druid.indexing.common.actions.{LockTryAcquireAction, TaskActionClient} +import io.druid.indexing.common.actions.{LockListAction, LockTryAcquireAction, TaskActionClient} import io.druid.indexing.common.task.{AbstractTask, HadoopTask} +import io.druid.indexing.common.{TaskLockType, TaskStatus, TaskToolbox} import io.druid.java.util.common.JodaUtils import io.druid.java.util.common.granularity._ import io.druid.java.util.common.logger.Logger @@ -40,6 +41,7 @@ import io.druid.segment.indexing.DataSchema import io.druid.timeline.DataSegment import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.Interval + import scala.collection.JavaConversions._ @JsonCreator @@ -71,17 +73,11 @@ class SparkBatchIndexTask( @JsonProperty("buildV9Directly") buildV9Directly: Boolean = false ) extends HadoopTask( - if (id == null) { - AbstractTask - .makeId( - null, SparkBatchIndexTask.TASK_TYPE_BASE, dataSchema.getDataSource, - JodaUtils - .umbrellaInterval(JodaUtils.condenseIntervals(Preconditions.checkNotNull(intervals, "%s", "intervals"))) - ) - } - else { - id - }, + TaskIdGenerator.getOrMakeId( + id, SparkBatchIndexTask.TASK_TYPE_BASE, dataSchema.getDataSource, + JodaUtils + .umbrellaInterval(JodaUtils.condenseIntervals(Preconditions.checkNotNull(intervals, "%s", "intervals"))) + ), dataSchema.getDataSource, hadoopDependencyCoordinates, if (context == null) { @@ -147,10 +143,10 @@ class SparkBatchIndexTask( val classLoader = buildClassLoader(toolbox) val task = toolbox.getObjectMapper.writeValueAsString(this) log.debug("Sending task `%s`", task) - + val taskLocks = toolbox.getTaskActionClient.submit(new LockListAction) val result = HadoopTask.invokeForeignLoader[util.ArrayList[String], util.ArrayList[String]]( "io.druid.indexer.spark.Runner", - new util.ArrayList(List(task, Iterables.getOnlyElement(getTaskLocks(toolbox)).getVersion, outputPath)), + new util.ArrayList(List(task, Iterables.getOnlyElement(taskLocks).getVersion, outputPath)), classLoader ) toolbox.publishSegments(result.map(toolbox.getObjectMapper.readValue(_, classOf[DataSegment]))) @@ -196,7 +192,7 @@ class SparkBatchIndexTask( @throws(classOf[Exception]) override def isReady(taskActionClient: TaskActionClient): Boolean = taskActionClient - .submit(new LockTryAcquireAction(lockInterval)) != null + .submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, lockInterval)) != null @JsonProperty("id") override def getId = super.getId diff --git a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala index 96e1fd8..f1dc9b8 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala @@ -48,7 +48,6 @@ import io.druid.segment.column.ColumnConfig import io.druid.segment.incremental.{IncrementalIndex, IncrementalIndexSchema} import io.druid.segment.indexing.DataSchema import io.druid.segment.loading.DataSegmentPusher -import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory import io.druid.server.DruidNode import io.druid.timeline.DataSegment import io.druid.timeline.partition.{HashBasedNumberedShardSpec, NoneShardSpec, ShardSpec} @@ -62,7 +61,7 @@ import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{AccumulableInfo, SparkListener, SparkListenerApplicationEnd, SparkListenerStageCompleted} import org.apache.spark.storage.StorageLevel -import org.apache.spark.{Partitioner, SparkContext} +import org.apache.spark.{InterruptibleIterator, Partitioner, SparkContext} import org.joda.time.{DateTime, Interval} import scala.collection.JavaConversions._ @@ -235,7 +234,7 @@ object SparkDruidIndexer { ) } - logInfo("Starting uniqes") + logInfo("Starting uniques") val optionalDims: Option[Set[String]] = if (dataSchema.getDelegate.getParser.getParseSpec.getDimensionsSpec.hasCustomDimensions) { val parseSpec = dataSchema.getDelegate.getParser.getParseSpec Some(parseSpec.getDimensionsSpec.getDimensionNames.asScala.toSet) @@ -375,27 +374,6 @@ object SparkDruidIndexer { incrementalIndex }).map( incIndex => { - val progressIndicator = new ProgressIndicator { - override def stop(): Unit = { - logTrace("Stop") - } - - override def stopSection(s: String): Unit = { - logTrace(s"Stop [$s]") - } - - override def progress(): Unit = { - logTrace("Progress") - } - - override def startSection(s: String): Unit = { - logTrace(s"Start [$s]") - } - - override def start(): Unit = { - logTrace("Start") - } - } val adapter = new QueryableIndexIndexableAdapter( closer.register( StaticIndex.INDEX_IO.loadIndex( @@ -404,9 +382,7 @@ object SparkDruidIndexer { incIndex, timeInterval, tmpPersistDir, - indexSpec_passable.getDelegate, - progressIndicator, - OffHeapMemorySegmentWriteOutMediumFactory.instance() + indexSpec_passable.getDelegate ) ) ) @@ -421,7 +397,32 @@ object SparkDruidIndexer { true, aggs.map(_.getDelegate), tmpMergeDir, - indexSpec_passable.getDelegate + indexSpec_passable.getDelegate, + new ProgressIndicator { + override def stop(): Unit = { + logTrace("Stop") + } + + override def stopSection(s: String): Unit = { + logTrace(s"Stop [$s]") + } + + override def progress(): Unit = { + logTrace("Progress") + } + + override def startSection(s: String): Unit = { + logTrace(s"Start [$s]") + } + + override def start(): Unit = { + logTrace("Start") + } + + override def progressSection(s: String, s1: String):Unit = { + logTrace(s"Progress Section [$s] [$s1]") + } + } ) val allDimensions: util.List[String] = indices .map(_.getDimensionNames) @@ -804,10 +805,9 @@ class DateBucketAndHashPartitioner(@transient var gran: Granularity, object StaticIndex { val INDEX_IO = new IndexIO( SerializedJsonStatic.mapper, - OffHeapMemorySegmentWriteOutMediumFactory.instance(), new ColumnConfig { override def columnCacheSizeBytes(): Int = 1000000 }) - val INDEX_MERGER_V9 = new IndexMergerV9(SerializedJsonStatic.mapper, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()) + val INDEX_MERGER_V9 = new IndexMergerV9(SerializedJsonStatic.mapper, INDEX_IO) } diff --git a/src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala b/src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala new file mode 100644 index 0000000..30e9af1 --- /dev/null +++ b/src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala @@ -0,0 +1,24 @@ +package io.druid.indexer.spark + +import java.util + +import com.google.common.base.Joiner +import io.druid.java.util.common.DateTimes +import org.joda.time.Interval + +object TaskIdGenerator { + private val ID_JOINER = Joiner.on("_") + + def getOrMakeId(id: String, typeName: String, dataSource: String, interval: Interval): String = { + if (id != null) return id + val objects = new util.ArrayList[AnyRef] + objects.add(typeName) + objects.add(dataSource) + if (interval != null) { + objects.add(interval.getStart) + objects.add(interval.getEnd) + } + objects.add(DateTimes.nowUtc.toString) + ID_JOINER.join(objects) + } +} diff --git a/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala b/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala index 4bb20b1..d69bac7 100644 --- a/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala +++ b/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala @@ -20,6 +20,7 @@ package io.druid.indexer.spark import java.util.{Collections, Properties} + import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.google.inject.{Binder, Module} @@ -40,6 +41,7 @@ import io.druid.segment.indexing.DataSchema import io.druid.segment.indexing.granularity.{GranularitySpec, UniformGranularitySpec} import org.joda.time.Interval import org.scalatest.{FlatSpec, Matchers} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -61,7 +63,7 @@ object TestScalaBatchIndexTask val objectMapper = injector.getInstance(classOf[ObjectMapper]) val taskId = "taskId" val dataSource = "defaultDataSource" - val interval = Interval.parse("1992/2019") + val interval = Interval.parse("1992/1999") val dataFiles = Seq("file:/someFile") val parseSpec = new DelimitedParseSpec( new TimestampSpec("l_shipdate", "yyyy-MM-dd", null), diff --git a/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala b/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala index 4b02bba..6d5b518 100644 --- a/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala +++ b/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala @@ -26,13 +26,15 @@ import java.util import com.fasterxml.jackson.core.`type`.TypeReference import com.google.common.collect.ImmutableList import com.google.common.io.Closer +import io.druid.data.input.avro.AvroParseSpec import io.druid.data.input.impl.{DimensionsSpec, JSONParseSpec, StringDimensionSchema, TimestampSpec} import io.druid.java.util.common.JodaUtils import io.druid.indexer.spark.parquet.ParquetInputRowParser import io.druid.java.util.common.granularity.Granularities import io.druid.java.util.common.logger.Logger +import io.druid.java.util.common.parsers.{JSONPathFieldSpec, JSONPathSpec} import io.druid.java.util.common.{CompressionUtils, IAE} -import io.druid.query.aggregation.{CountAggregator, CountAggregatorFactory, LongSumAggregatorFactory} +import io.druid.query.aggregation.{CountAggregator, CountAggregatorFactory, DoubleSumAggregatorFactory, LongSumAggregatorFactory} import io.druid.segment.QueryableIndexIndexableAdapter import io.druid.segment.indexing.DataSchema import io.druid.timeline.DataSegment @@ -40,6 +42,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.{DateTime, Interval} import org.scalatest._ + import scala.collection.JavaConverters._ @@ -476,15 +479,13 @@ class TestSparkDruidIndexer extends FlatSpec with Matchers override def close(): Unit = sc.stop() } ) - val aggName = "sum_val" - val aggregatorFactory = new CountAggregatorFactory(aggName) - val parseSpec = new JSONParseSpec( - new TimestampSpec("l_shipdate", null, null), + val aggName = "l_extendedprice_sum" + val aggregatorFactory = new DoubleSumAggregatorFactory("l_extendedprice_sum", "l_extendedprice") + val parseSpec = new AvroParseSpec( + new TimestampSpec("l_shipdate", "millis", null), new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("l_suppkey")), null, null), - null, null ) - val dataSchema = new DataSchema( dataSource, objectMapper @@ -495,7 +496,7 @@ class TestSparkDruidIndexer extends FlatSpec with Matchers val loadResults = SparkDruidIndexer.loadData( data_files, new SerializedJson(dataSchema), - SparkBatchIndexTask.mapToSegmentIntervals(Seq(interval), Granularities.YEAR), + SparkBatchIndexTask.mapToSegmentIntervals(Seq(Interval.parse("1992/1993")), Granularities.YEAR), rowsPerPartition, rowsPerFlush, outDir.toString, @@ -503,7 +504,7 @@ class TestSparkDruidIndexer extends FlatSpec with Matchers buildV9Directly, sc ) - loadResults.length should be(7) + loadResults.length should be(1) loadResults(0) match { case segment => segment.getBinaryVersion should be(9) @@ -512,41 +513,7 @@ class TestSparkDruidIndexer extends FlatSpec with Matchers segment.getInterval.contains(interval) should be(false) segment.getSize should be > 0L segment.getDimensions.asScala.toSet should equal(Set("l_suppkey")) - segment.getMetrics.asScala.toSet should equal(Set("sum_val")) - val file = new File(segment.getLoadSpec.get("path").toString) - file.exists() should be(true) - val segDir = Files.createTempDirectory(outDir.toPath, "loadableSegment-%s" format segment.getIdentifier).toFile - val copyResult = CompressionUtils.unzip(file, segDir) - copyResult.size should be > 0L - copyResult.getFiles.asScala.map(_.getName).toSet should equal( - Set("00000.smoosh", "meta.smoosh", "version.bin", "factory.json") - ) - val index = StaticIndex.INDEX_IO.loadIndex(segDir) - try { - val qindex = new QueryableIndexIndexableAdapter(index) - qindex.getDimensionNames.asScala.toSet should equal(Set("l_suppkey")) - val dimVal = qindex.getDimValueLookup("l_suppkey").asScala - dimVal should not be 'Empty - dimVal should contain allOf("1883", "1989", "2073") - qindex.getMetricNames.asScala.toSet should equal(Set(aggName)) - qindex.getMetricType(aggName) should equal(aggregatorFactory.getTypeName) - qindex.getNumRows should be(11) - qindex.getRows.asScala.head.getMetrics()(0) should be(1) - index.getDataInterval.getEnd.getMillis should not be JodaUtils.MAX_INSTANT - } - finally { - index.close() - } - } - loadResults(1) match { - case segment => - segment.getBinaryVersion should be(9) - segment.getDataSource should equal(dataSource) - interval.contains(segment.getInterval) should be(true) - segment.getInterval.contains(interval) should be(false) - segment.getSize should be > 0L - segment.getDimensions.asScala.toSet should equal(Set("l_suppkey")) - segment.getMetrics.asScala.toSet should equal(Set("sum_val")) + segment.getMetrics.asScala.toSet should equal(Set("l_extendedprice_sum")) val file = new File(segment.getLoadSpec.get("path").toString) file.exists() should be(true) val segDir = Files.createTempDirectory(outDir.toPath, "loadableSegment-%s" format segment.getIdentifier).toFile @@ -565,7 +532,7 @@ class TestSparkDruidIndexer extends FlatSpec with Matchers qindex.getMetricNames.asScala.toSet should equal(Set(aggName)) qindex.getMetricType(aggName) should equal(aggregatorFactory.getTypeName) qindex.getNumRows should be(4) - qindex.getRows.asScala.head.getMetrics()(0) should be(1) + qindex.getRows.asScala.head.getMetrics()(0) should be(61998.31.toFloat) index.getDataInterval.getEnd.getMillis should not be JodaUtils.MAX_INSTANT } finally {