diff --git a/bin/dcos_submit.sh b/bin/dcos_submit.sh new file mode 100644 index 0000000..dff76c6 --- /dev/null +++ b/bin/dcos_submit.sh @@ -0,0 +1,121 @@ +#!/usr/bin/shs + +set -x + +# Define target environment +if [ "$environment" = "staging" ]; then SUFFIX="-staging"; fi + + +# SPARK_DCOS_URL the spark service url +# SPARK_MASTER spark master url, including port +# SPARK_ARGS spark arguments, as a json array's content +# SPARK_JARS list of urls where to download jars from +# SPARK_CLASS_NAME class name of the spark app +# SPARK_APP_NAME spark app name for display +# SPARK_DRIVER_CORES driver cores +# SPARK_DRIVER_MEMORY driver memory +# SPARK_CORES_MAX max available cores on mesos +# SPARK_EXECUTOR_MEMORY executor memory +# SPARK_CASSANDRA_HOST cassandra host name +# SPARK_CASSANDRA_USER cassandra user name +# SPARK_CASSANDRA_PASSWORD cassandra password +# SPARK_POSTGRES_HOST postgres host +# SPARK_POSTGRES_USER postgres user +# SPARK_POSTGRES_PASSWORD postgres password + +SPARK_DCOS_URL="mydcosurl" +SPARK_MASTER="mysparkmaster" +SPARK_ARGS="" +SPARK_JARS="myjarurl" +SPARK_CLASS_NAME="$jobclass" +SPARK_APP_NAME="SparkJobs$SUFFIX" +SPARK_DRIVER_CORES="2" +SPARK_DRIVER_MEMORY="1024" +SPARK_CORES_MAX="10" +SPARK_EXECUTOR_MEMORY="2g" +SPARK_CASSANDRA_HOST="xxx" +DCOS_ACS_TOKEN="read acs token" +SPARK_DOCKER_IMAGE="mesosphere/spark:1.1.0-2.1.1-hadoop-2.6" + +if [ "$environment" = "staging" ]; then +SPARK_CASSANDRA_USER="xxx" +SPARK_CASSANDRA_PASSWORD="xxxx" +SPARK_POSTGRES_HOST="xxxx" +SPARK_POSTGRES_USER="xxxx" +SPARK_POSTGRES_PASSWORD="xxxx" +else +SPARK_CASSANDRA_USER="xxx" +SPARK_CASSANDRA_PASSWORD="xxxx" +SPARK_POSTGRES_HOST="xxxx" +SPARK_POSTGRES_USER="xxxx" +SPARK_POSTGRES_PASSWORD="xxxx" +fi + +generate_payload() { + cat <<-EOF +{ + "action" : "CreateSubmissionRequest", + "appArgs" : [ $SPARK_ARGS ], + "appResource" : "$SPARK_JARS", + "clientSparkVersion" : "2.1.1", + "environmentVariables" : { + "SPARK_ENV_LOADED" : "1" + }, + "mainClass" : "$SPARK_CLASS_NAME", + "sparkProperties" : { + "spark.jars" : "$SPARK_JARS", + "spark.driver.supervise" : "false", + "spark.app.name" : "$SPARK_APP_NAME", + "spark.eventLog.enabled": "true", + "spark.submit.deployMode" : "cluster", + "spark.master" : "spark://${SPARK_MASTER}", + "spark.driver.cores" : "${SPARK_DRIVER_CORES}", + "spark.driver.memory" : "${SPARK_DRIVER_MEMORY}", + "spark.cores.max" : "${SPARK_CORES_MAX}", + "spark.executor.memory" : "${SPARK_EXECUTOR_MEMORY}", + "spark.postgres.hostname" : "${SPARK_POSTGRES_HOST}", + "spark.postgres.username" : "${SPARK_POSTGRES_USER}", + "spark.postgres.password" : "${SPARK_POSTGRES_PASSWORD}", + "spark.mesos.executor.docker.image": "${SPARK_DOCKER_IMAGE}", + "spark.driver.supervise": false, + "spark.ssl.noCertVerification": true + } +} +EOF +} + +acsHeader="Authorization: token=$DCOS_ACS_TOKEN" + +submission=$(curl -sS -X POST "${SPARK_DCOS_URL}/v1/submissions/create" --header "$acsHeader" --header "Content-Type:application/json;charset=UTF-8" --data "$(generate_payload)") +submissionSuccess=$(echo $submission | jq -r 'select(.success)') + +if [[ $submissionSuccess ]]; then + SECONDS=0 + + submissionId=$(echo $submission | jq -rj '.submissionId') + # Wait for the job to finish + until test "$(curl --header "$acsHeader" -sS $SPARK_DCOS_URL/v1/submissions/status/$submissionId | jq -r '.driverState')" != "RUNNING"; do sleep 10; echo "Waiting..."; done + lastStatus=$(curl --header "$acsHeader" -sS $SPARK_DCOS_URL/v1/submissions/status/$submissionId | jq -r '.') + echo $lastStatus + taskState=$(echo $lastStatus | jq -r '.message' | grep state | cut -d " " -f 2) + taskMessage=$(echo $lastStatus | jq -r '.message' | grep message | cut -d " " -f 2) + + if [ "$taskState" = "TASK_FINISHED" ] || [ "$taskState" = "TASK_OK" ]; then + echo "Job succeeded." + echo "$taskMessage" + DURATION=$SECONDS + curl -s https://metrics-api.librato.com/v1/metrics -u $LIBRATO_USERNAME:$LIBRATO_TOKEN -H 'Content-Type: application/json' \ + -d "{\"gauges\":{\"spark_jobs.$SPARK_APP_NAME.duration.s\":{\"value\":$DURATION,\"source\":\"jenkins\"}}}" + exit 0; + else + echo "Job failed:" + echo "$taskMessage" + # TODO: notify people! + exit 1; + fi +else + echo "Submission failed ! Exiting..." + echo $submission | jq -r '.message' + exit 1 +fi + diff --git a/build.sbt b/build.sbt index 8ff15ae..97834b4 100644 --- a/build.sbt +++ b/build.sbt @@ -24,12 +24,21 @@ homepage := Some(url("https://github.com/metamx/druid-spark-batch")) crossScalaVersions := Seq("2.11.7", "2.10.6") releaseIgnoreUntrackedFiles := true -val druid_version = "0.11.0-SNAPSHOT" +val druid_version = "0.11.1-SNAPSHOT" // This is just used here for Path, so anything that doesn't break spark should be fine val hadoop_version = "2.7.3" val spark_version = "2.1.0" -val guava_version = "16.0.1" +val guava_version = "17.0" val mesos_version = "0.25.0" +val parquet_version = "1.8.2" +val curator_version = "4.0.0" +val zookeeper_version = "3.4.10" + +libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version exclude("com.google.guava", "guava") +libraryDependencies += "org.apache.parquet" % "parquet-avro" % parquet_version exclude("com.google.guava", "guava") val sparkDep = ("org.apache.spark" %% "spark-core" % spark_version exclude("org.roaringbitmap", "RoaringBitmap") @@ -99,13 +108,17 @@ libraryDependencies += "io.druid" % "druid-processing" % druid_version % "provid libraryDependencies += "io.druid" % "druid-server" % druid_version % "provided" libraryDependencies += "io.druid" % "druid-indexing-service" % druid_version % "provided" libraryDependencies += "io.druid" % "druid-indexing-hadoop" % druid_version % "provided" +libraryDependencies += "io.druid.extensions" % "druid-avro-extensions" % druid_version % "provided" + libraryDependencies += "org.joda" % "joda-convert" % "1.8.1" % "provided" // Prevents intellij silliness and sbt warnings -libraryDependencies += "com.google.guava" % "guava" % guava_version % "provided"// Prevents serde problems for guice exceptions +libraryDependencies += "com.google.guava" % "guava" % guava_version libraryDependencies += "com.sun.jersey" % "jersey-servlet" % "1.17.1" % "provided" libraryDependencies += "org.apache.mesos" % "mesos" % mesos_version % "provided" classifier "shaded-protobuf" +libraryDependencies += "org.apache.zookeeper" % "zookeeper" % zookeeper_version + releaseCrossBuild := true assemblyMergeStrategy in assembly := { @@ -143,7 +156,8 @@ resolvers += "JitPack.IO" at "https://jitpack.io" publishMavenStyle := true //TODO: remove this before moving to druid.io -publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local") +//publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local") +publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository"))) pomIncludeRepository := { _ => false } pomExtra := ( @@ -163,5 +177,5 @@ pomExtra := ( testOptions += Tests.Argument(TestFrameworks.JUnit, "-Duser.timezone=UTC") // WTF SBT? -javaOptions in Test += "-Duser.timezone=UTC" +javaOptions in Test += "-Duser.timezone=UTC" fork in Test := true diff --git a/project/plugins.sbt b/project/plugins.sbt index 652b260..6ac98bb 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,3 +4,5 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") + diff --git a/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java new file mode 100644 index 0000000..ed38d64 --- /dev/null +++ b/src/main/java/io/druid/indexer/spark/parquet/ParquetInputRowParser.java @@ -0,0 +1,55 @@ +package io.druid.indexer.spark.parquet; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.avro.AvroParsers; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import io.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.avro.generic.GenericRecord; + +import java.util.Map; + +public class ParquetInputRowParser implements InputRowParser { + private final ParseSpec parseSpec; + private final ObjectFlattener avroFlattener; + + @JsonCreator + public ParquetInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec + ) + { + this.parseSpec = parseSpec; + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @Override + public ParquetInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new ParquetInputRowParser(parseSpec); + } + + @Override + public InputRow parse(GenericRecord input) + { + // We should really create a ParquetBasedInputRow that does not need an intermediate map but accesses + // the SimpleRecord directly... + MapBasedInputRow mapBasedInputRow = (MapBasedInputRow) AvroParsers.parseGenericRecord(input, parseSpec, avroFlattener); + Map event = mapBasedInputRow.getEvent(); + Map newMap = Maps.newHashMapWithExpectedSize(event.size()); + for (String key : mapBasedInputRow.getEvent().keySet()) { + newMap.put(key, event.get(key)); + } + return new MapBasedInputRow(mapBasedInputRow.getTimestampFromEpoch(), mapBasedInputRow.getDimensions(), newMap); + } +} diff --git a/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala b/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala index 17a685a..a00afcd 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala @@ -23,14 +23,15 @@ import java.io.{Closeable, File, IOException, PrintWriter} import java.nio.file.Files import java.util import java.util.{Objects, Properties} + import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} import com.google.common.base.{Preconditions, Strings} import com.google.common.collect.Iterables import com.google.common.io.Closer import io.druid.data.input.impl.ParseSpec -import io.druid.indexing.common.{TaskStatus, TaskToolbox} -import io.druid.indexing.common.actions.{LockTryAcquireAction, TaskActionClient} +import io.druid.indexing.common.actions.{LockListAction, LockTryAcquireAction, TaskActionClient} import io.druid.indexing.common.task.{AbstractTask, HadoopTask} +import io.druid.indexing.common.{TaskLockType, TaskStatus, TaskToolbox} import io.druid.java.util.common.JodaUtils import io.druid.java.util.common.granularity._ import io.druid.java.util.common.logger.Logger @@ -40,6 +41,7 @@ import io.druid.segment.indexing.DataSchema import io.druid.timeline.DataSegment import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.Interval + import scala.collection.JavaConversions._ @JsonCreator @@ -71,17 +73,11 @@ class SparkBatchIndexTask( @JsonProperty("buildV9Directly") buildV9Directly: Boolean = false ) extends HadoopTask( - if (id == null) { - AbstractTask - .makeId( - null, SparkBatchIndexTask.TASK_TYPE_BASE, dataSchema.getDataSource, - JodaUtils - .umbrellaInterval(JodaUtils.condenseIntervals(Preconditions.checkNotNull(intervals, "%s", "intervals"))) - ) - } - else { - id - }, + TaskIdGenerator.getOrMakeId( + id, SparkBatchIndexTask.TASK_TYPE_BASE, dataSchema.getDataSource, + JodaUtils + .umbrellaInterval(JodaUtils.condenseIntervals(Preconditions.checkNotNull(intervals, "%s", "intervals"))) + ), dataSchema.getDataSource, hadoopDependencyCoordinates, if (context == null) { @@ -147,10 +143,10 @@ class SparkBatchIndexTask( val classLoader = buildClassLoader(toolbox) val task = toolbox.getObjectMapper.writeValueAsString(this) log.debug("Sending task `%s`", task) - + val taskLocks = toolbox.getTaskActionClient.submit(new LockListAction) val result = HadoopTask.invokeForeignLoader[util.ArrayList[String], util.ArrayList[String]]( "io.druid.indexer.spark.Runner", - new util.ArrayList(List(task, Iterables.getOnlyElement(getTaskLocks(toolbox)).getVersion, outputPath)), + new util.ArrayList(List(task, Iterables.getOnlyElement(taskLocks).getVersion, outputPath)), classLoader ) toolbox.publishSegments(result.map(toolbox.getObjectMapper.readValue(_, classOf[DataSegment]))) @@ -196,7 +192,7 @@ class SparkBatchIndexTask( @throws(classOf[Exception]) override def isReady(taskActionClient: TaskActionClient): Boolean = taskActionClient - .submit(new LockTryAcquireAction(lockInterval)) != null + .submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, lockInterval)) != null @JsonProperty("id") override def getId = super.getId diff --git a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala index c5e16b6..f1dc9b8 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala @@ -36,6 +36,7 @@ import io.druid.data.input.impl._ import io.druid.guice.annotations.{Json, Self} import io.druid.guice.{GuiceInjectors, JsonConfigProvider} import io.druid.indexer.HadoopyStringInputRowParser +import io.druid.indexer.spark.parquet.ParquetInputRowParser import io.druid.initialization.Initialization import io.druid.java.util.common.{IAE, ISE} import io.druid.java.util.common.granularity.Granularity @@ -50,13 +51,17 @@ import io.druid.segment.loading.DataSegmentPusher import io.druid.server.DruidNode import io.druid.timeline.DataSegment import io.druid.timeline.partition.{HashBasedNumberedShardSpec, NoneShardSpec, ShardSpec} +import org.apache.avro.generic.GenericRecord import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.Job +import org.apache.parquet.avro.AvroReadSupport +import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{AccumulableInfo, SparkListener, SparkListenerApplicationEnd, SparkListenerStageCompleted} import org.apache.spark.storage.StorageLevel -import org.apache.spark.{Partitioner, SparkContext} +import org.apache.spark.{InterruptibleIterator, Partitioner, SparkContext} import org.joda.time.{DateTime, Interval} import scala.collection.JavaConversions._ @@ -130,58 +135,106 @@ object SparkDruidIndexer { logInfo(s"Splitting [$totalGZSize] gz bytes into [$startingPartitions] partitions") + // Hadoopify the data so it doesn't look so silly in Spark's DAG - val baseData = sc.textFile(dataFiles mkString ",") - // Input data is probably in gigantic files, so redistribute - .filter( - s => { - val row = dataSchema.getDelegate.getParser match { - case x: StringInputRowParser => x.parse(s) - case x: HadoopyStringInputRowParser => x.parse(s) - case x => - logTrace( - "Could not figure out how to handle class " + - s"[${x.getClass.getCanonicalName}]. " + - "Hoping it can handle string input" + val baseData = + dataSchema.getDelegate.getParser match { + case x: ParquetInputRowParser => + val job = new Job() + ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[GenericRecord]]) + sc.newAPIHadoopFile(dataFiles.mkString(","), classOf[ParquetInputFormat[GenericRecord]], + classOf[Void], classOf[GenericRecord], job.getConfiguration).values.filter( + s => { + val row = dataSchema.getDelegate.getParser match { + case x: ParquetInputRowParser => x.parse(s) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + x.asInstanceOf[InputRowParser[Any]].parse(s) + } + passableIntervals.exists(_.contains(row.getTimestamp)) + }).repartition(startingPartitions) + // Persist the strings only rather than the event map + // We have to do the parsing twice this way, but serde of the map is killer as well + .persist(StorageLevel.DISK_ONLY) + .mapPartitions( + it => { + val i = dataSchema.getDelegate.getParser match { + case x: ParquetInputRowParser => it.map(x.parse) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + it.map(x.asInstanceOf[InputRowParser[Any]].parse) + } + val queryGran = dataSchema.getDelegate.getGranularitySpec.getQueryGranularity + val segmentGran = dataSchema.getDelegate.getGranularitySpec.getSegmentGranularity + i.map( + r => { + var k: Long = queryGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + if (k < 0) { + // Example: AllGranularity + k = segmentGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + } + k -> r.asInstanceOf[MapBasedInputRow].getEvent + } + ) + } ) - x.asInstanceOf[InputRowParser[Any]].parse(s) - } - passableIntervals.exists(_.contains(row.getTimestamp)) - } - ) - .repartition(startingPartitions) - // Persist the strings only rather than the event map - // We have to do the parsing twice this way, but serde of the map is killer as well - .persist(StorageLevel.DISK_ONLY) - .mapPartitions( - it => { - val i = dataSchema.getDelegate.getParser match { - case x: StringInputRowParser => it.map(x.parse) - case x: HadoopyStringInputRowParser => it.map(x.parse) - case x => - logTrace( - "Could not figure out how to handle class " + - s"[${x.getClass.getCanonicalName}]. " + - "Hoping it can handle string input" - ) - it.map(x.asInstanceOf[InputRowParser[Any]].parse) - } - val queryGran = dataSchema.getDelegate.getGranularitySpec.getQueryGranularity - val segmentGran = dataSchema.getDelegate.getGranularitySpec.getSegmentGranularity - i.map( - r => { - var k: Long = queryGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis - if (k < 0) { - // Example: AllGranularity - k = segmentGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + case x => + sc.textFile(dataFiles mkString ",").filter( + s => { + val row = dataSchema.getDelegate.getParser match { + case x: StringInputRowParser => x.parse(s) + case x: HadoopyStringInputRowParser => x.parse(s) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + x.asInstanceOf[InputRowParser[Any]].parse(s) } - k -> r.asInstanceOf[MapBasedInputRow].getEvent - } - ) - } - ) + passableIntervals.exists(_.contains(row.getTimestamp)) + }).repartition(startingPartitions) + // Persist the strings only rather than the event map + // We have to do the parsing twice this way, but serde of the map is killer as well + .persist(StorageLevel.DISK_ONLY) + .mapPartitions( + it => { + val i = dataSchema.getDelegate.getParser match { + case x: StringInputRowParser => it.map(x.parse) + case x: HadoopyStringInputRowParser => it.map(x.parse) + case x => + logTrace( + "Could not figure out how to handle class " + + s"[${x.getClass.getCanonicalName}]. " + + "Hoping it can handle string input" + ) + it.map(x.asInstanceOf[InputRowParser[Any]].parse) + } + val queryGran = dataSchema.getDelegate.getGranularitySpec.getQueryGranularity + val segmentGran = dataSchema.getDelegate.getGranularitySpec.getSegmentGranularity + i.map( + r => { + var k: Long = queryGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + if (k < 0) { + // Example: AllGranularity + k = segmentGran.bucketStart(new DateTime(r.getTimestampFromEpoch)).getMillis + } + k -> r.asInstanceOf[MapBasedInputRow].getEvent + } + ) + } + ) + } - logInfo("Starting uniqes") + logInfo("Starting uniques") val optionalDims: Option[Set[String]] = if (dataSchema.getDelegate.getParser.getParseSpec.getDimensionsSpec.hasCustomDimensions) { val parseSpec = dataSchema.getDelegate.getParser.getParseSpec Some(parseSpec.getDimensionsSpec.getDimensionNames.asScala.toSet) @@ -338,6 +391,7 @@ object SparkDruidIndexer { adapter } ).toList + val file = finalStaticIndexer.merge( indices, true, @@ -361,13 +415,13 @@ object SparkDruidIndexer { logTrace(s"Start [$s]") } - override def progressSection(s: String, s1: String): Unit = { - logTrace(s"Progress [$s]:[$s1]") - } - override def start(): Unit = { logTrace("Start") } + + override def progressSection(s: String, s1: String):Unit = { + logTrace(s"Progress Section [$s] [$s1]") + } } ) val allDimensions: util.List[String] = indices @@ -749,7 +803,9 @@ class DateBucketAndHashPartitioner(@transient var gran: Granularity, } object StaticIndex { - val INDEX_IO = new IndexIO(SerializedJsonStatic.mapper, new ColumnConfig { + val INDEX_IO = new IndexIO( + SerializedJsonStatic.mapper, + new ColumnConfig { override def columnCacheSizeBytes(): Int = 1000000 }) diff --git a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala index 5c7176f..ddc1d8e 100644 --- a/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala +++ b/src/main/scala/io/druid/indexer/spark/SparkDruidIndexerModule.scala @@ -25,6 +25,9 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.google.inject.Binder import io.druid.initialization.DruidModule import java.util + +import io.druid.indexer.spark.parquet.ParquetInputRowParser + import scala.collection.JavaConverters._ class SparkDruidIndexerModule extends DruidModule @@ -32,6 +35,9 @@ class SparkDruidIndexerModule extends DruidModule override def getJacksonModules: util.List[_ <: Module] = { List( new SimpleModule("SparkDruidIndexer") + .registerSubtypes( + new NamedType(classOf[ParquetInputRowParser], "parquet") + ) .registerSubtypes( new NamedType(classOf[SparkBatchIndexTask], "index_spark") ) diff --git a/src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala b/src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala new file mode 100644 index 0000000..30e9af1 --- /dev/null +++ b/src/main/scala/io/druid/indexer/spark/TaskIdGenerator.scala @@ -0,0 +1,24 @@ +package io.druid.indexer.spark + +import java.util + +import com.google.common.base.Joiner +import io.druid.java.util.common.DateTimes +import org.joda.time.Interval + +object TaskIdGenerator { + private val ID_JOINER = Joiner.on("_") + + def getOrMakeId(id: String, typeName: String, dataSource: String, interval: Interval): String = { + if (id != null) return id + val objects = new util.ArrayList[AnyRef] + objects.add(typeName) + objects.add(dataSource) + if (interval != null) { + objects.add(interval.getStart) + objects.add(interval.getEnd) + } + objects.add(DateTimes.nowUtc.toString) + ID_JOINER.join(objects) + } +} diff --git a/src/test/resources/index_spark_2.10_parquet_spec.json b/src/test/resources/index_spark_2.10_parquet_spec.json new file mode 100644 index 0000000..a6a22a9 --- /dev/null +++ b/src/test/resources/index_spark_2.10_parquet_spec.json @@ -0,0 +1,136 @@ +{ + "type": "index_spark", + "id": "taskId", + "dataSchema": { + "dataSource": "defaultDataSource", + "parser": { + "type": "string", + "parseSpec": { + "format": "parquet", + "timestampSpec": { + "column": "l_shipdate", + "format": "yyyy-MM-dd", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_returnflag", + "l_linestatus", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "dimensionExclusions": [ + "l_commitdate", + "l_receiptdate", + "l_tax", + "l_quantity", + "count", + "l_extendedprice", + "l_shipdate", + "l_discount" + ], + "spatialDimensions": [] + }, + "delimiter": "|", + "listDelimiter": ",", + "columns": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ] + }, + "encoding": "UTF-8" + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "longSum", + "name": "L_QUANTITY_longSum", + "fieldName": "l_quantity" + }, + { + "type": "doubleSum", + "name": "L_EXTENDEDPRICE_doubleSum", + "fieldName": "l_extendedprice" + }, + { + "type": "doubleSum", + "name": "L_DISCOUNT_doubleSum", + "fieldName": "l_discount" + }, + { + "type": "doubleSum", + "name": "L_TAX_doubleSum", + "fieldName": "l_tax" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "YEAR", + "queryGranularity": { + "type": "duration", + "duration": 86400000, + "origin": "1970-01-01T00:00:00.000Z" + }, + "intervals": [ + "1992-01-01T00:00:00.000Z/1999-01-01T00:00:00.000Z" + ] + } + }, + "intervals": [ + "1992-01-01T00:00:00.000Z/1999-01-01T00:00:00.000Z" + ], + "paths": [ + "file:/someFile" + ], + "targetPartitionSize": 8139, + "maxRowsInMemory": 389, + "properties": { + "some.property": "someValue", + "java.util.logging.manager": "org.apache.logging.log4j.jul.LogManager", + "user.timezone": "UTC", + "org.jboss.logging.provider": "log4j2", + "file.encoding": "UTF-8", + "druid.processing.columnCache.sizeBytes": "1000000000" + }, + "master": "local[999]", + "context": {}, + "indexSpec": { + "bitmap": { + "type": "concise" + }, + "dimensionCompression": null, + "metricCompression": null + }, + "classpathPrefix": "somePrefix.jar", + "hadoopDependencyCoordinates": [ + "org.apache.spark:spark-core_2.10:1.6.1-mmx0" + ], + "groupId": "taskId", + "dataSource": "defaultDataSource", + "resource": { + "availabilityGroup": "taskId", + "requiredCapacity": 1 + } +} diff --git a/src/test/resources/lineitems.parquet b/src/test/resources/lineitems.parquet new file mode 100644 index 0000000..84becd9 Binary files /dev/null and b/src/test/resources/lineitems.parquet differ diff --git a/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala b/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala index 4d03b82..d69bac7 100644 --- a/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala +++ b/src/test/scala/io/druid/indexer/spark/TestScalaBatchIndexTask.scala @@ -20,6 +20,7 @@ package io.druid.indexer.spark import java.util.{Collections, Properties} + import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.google.inject.{Binder, Module} @@ -40,6 +41,7 @@ import io.druid.segment.indexing.DataSchema import io.druid.segment.indexing.granularity.{GranularitySpec, UniformGranularitySpec} import org.joda.time.Interval import org.scalatest.{FlatSpec, Matchers} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -114,9 +116,7 @@ object TestScalaBatchIndexTask "l_shipmode", "l_comment" ) - ), - false, - 0 + ) ) val outPath = "file:/tmp/foo" val rowsPerPartition: Long = 8139L diff --git a/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala b/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala index b69b0f1..6d5b518 100644 --- a/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala +++ b/src/test/scala/io/druid/indexer/spark/TestSparkDruidIndexer.scala @@ -22,19 +22,27 @@ package io.druid.indexer.spark import java.io.{Closeable, File} import java.nio.file.Files import java.util + +import com.fasterxml.jackson.core.`type`.TypeReference import com.google.common.collect.ImmutableList import com.google.common.io.Closer +import io.druid.data.input.avro.AvroParseSpec import io.druid.data.input.impl.{DimensionsSpec, JSONParseSpec, StringDimensionSchema, TimestampSpec} import io.druid.java.util.common.JodaUtils +import io.druid.indexer.spark.parquet.ParquetInputRowParser import io.druid.java.util.common.granularity.Granularities import io.druid.java.util.common.logger.Logger +import io.druid.java.util.common.parsers.{JSONPathFieldSpec, JSONPathSpec} import io.druid.java.util.common.{CompressionUtils, IAE} -import io.druid.query.aggregation.LongSumAggregatorFactory +import io.druid.query.aggregation.{CountAggregator, CountAggregatorFactory, DoubleSumAggregatorFactory, LongSumAggregatorFactory} import io.druid.segment.QueryableIndexIndexableAdapter +import io.druid.segment.indexing.DataSchema +import io.druid.timeline.DataSegment import org.apache.commons.io.FileUtils import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.{DateTime, Interval} import org.scalatest._ + import scala.collection.JavaConverters._ @@ -437,6 +445,105 @@ class TestSparkDruidIndexer extends FlatSpec with Matchers } ) should be(2) } + + it should "return proper DataSegments from parquet" in { + val data_files = Seq(this.getClass.getResource("/lineitems.parquet").toString) + val closer = Closer.create() + val outDir = Files.createTempDirectory("segments").toFile + (outDir.mkdirs() || outDir.exists()) && outDir.isDirectory should be(true) + closer.register( + new Closeable() + { + override def close(): Unit = FileUtils.deleteDirectory(outDir) + } + ) + try { + val conf = new SparkConf() + .setAppName("Simple Application") + .setMaster("local[4]") + .set("user.timezone", "UTC") + .set("file.encoding", "UTF-8") + .set("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager") + .set("org.jboss.logging.provider", "slf4j") + .set("druid.processing.columnCache.sizeBytes", "1000000000") + .set("spark.driver.host", "localhost") + .set("spark.executor.userClassPathFirst", "true") + .set("spark.driver.userClassPathFirst", "true") + .set("spark.kryo.referenceTracking", "false") + .registerKryoClasses(SparkBatchIndexTask.getKryoClasses()) + + val sc = new SparkContext(conf) + closer.register( + new Closeable + { + override def close(): Unit = sc.stop() + } + ) + val aggName = "l_extendedprice_sum" + val aggregatorFactory = new DoubleSumAggregatorFactory("l_extendedprice_sum", "l_extendedprice") + val parseSpec = new AvroParseSpec( + new TimestampSpec("l_shipdate", "millis", null), + new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("l_suppkey")), null, null), + null + ) + val dataSchema = new DataSchema( + dataSource, + objectMapper + .convertValue(new ParquetInputRowParser(parseSpec), new TypeReference[java.util.Map[String, Any]]() {}), + Seq(aggregatorFactory).toArray, + granSpec, + objectMapper) + val loadResults = SparkDruidIndexer.loadData( + data_files, + new SerializedJson(dataSchema), + SparkBatchIndexTask.mapToSegmentIntervals(Seq(Interval.parse("1992/1993")), Granularities.YEAR), + rowsPerPartition, + rowsPerFlush, + outDir.toString, + indexSpec, + buildV9Directly, + sc + ) + loadResults.length should be(1) + loadResults(0) match { + case segment => + segment.getBinaryVersion should be(9) + segment.getDataSource should equal(dataSource) + interval.contains(segment.getInterval) should be(true) + segment.getInterval.contains(interval) should be(false) + segment.getSize should be > 0L + segment.getDimensions.asScala.toSet should equal(Set("l_suppkey")) + segment.getMetrics.asScala.toSet should equal(Set("l_extendedprice_sum")) + val file = new File(segment.getLoadSpec.get("path").toString) + file.exists() should be(true) + val segDir = Files.createTempDirectory(outDir.toPath, "loadableSegment-%s" format segment.getIdentifier).toFile + val copyResult = CompressionUtils.unzip(file, segDir) + copyResult.size should be > 0L + copyResult.getFiles.asScala.map(_.getName).toSet should equal( + Set("00000.smoosh", "meta.smoosh", "version.bin", "factory.json") + ) + val index = StaticIndex.INDEX_IO.loadIndex(segDir) + try { + val qindex = new QueryableIndexIndexableAdapter(index) + qindex.getDimensionNames.asScala.toSet should equal(Set("l_suppkey")) + val dimVal = qindex.getDimValueLookup("l_suppkey").asScala + dimVal should not be 'Empty + dimVal should contain allOf("1807", "2150", "5133", "5405") + qindex.getMetricNames.asScala.toSet should equal(Set(aggName)) + qindex.getMetricType(aggName) should equal(aggregatorFactory.getTypeName) + qindex.getNumRows should be(4) + qindex.getRows.asScala.head.getMetrics()(0) should be(61998.31.toFloat) + index.getDataInterval.getEnd.getMillis should not be JodaUtils.MAX_INSTANT + } + finally { + index.close() + } + } + } + finally { + closer.close() + } + } } object StaticTestSparkDruidIndexer