Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parquet simple record as a source for the druid spark indexer #111

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions bin/dcos_submit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/shs

set -x

# Define target environment
if [ "$environment" = "staging" ]; then SUFFIX="-staging"; fi


# SPARK_DCOS_URL the spark service url
# SPARK_MASTER spark master url, including port
# SPARK_ARGS spark arguments, as a json array's content
# SPARK_JARS list of urls where to download jars from
# SPARK_CLASS_NAME class name of the spark app
# SPARK_APP_NAME spark app name for display
# SPARK_DRIVER_CORES driver cores
# SPARK_DRIVER_MEMORY driver memory
# SPARK_CORES_MAX max available cores on mesos
# SPARK_EXECUTOR_MEMORY executor memory
# SPARK_CASSANDRA_HOST cassandra host name
# SPARK_CASSANDRA_USER cassandra user name
# SPARK_CASSANDRA_PASSWORD cassandra password
# SPARK_POSTGRES_HOST postgres host
# SPARK_POSTGRES_USER postgres user
# SPARK_POSTGRES_PASSWORD postgres password

SPARK_DCOS_URL="mydcosurl"
SPARK_MASTER="mysparkmaster"
SPARK_ARGS=""
SPARK_JARS="myjarurl"
SPARK_CLASS_NAME="$jobclass"
SPARK_APP_NAME="SparkJobs$SUFFIX"
SPARK_DRIVER_CORES="2"
SPARK_DRIVER_MEMORY="1024"
SPARK_CORES_MAX="10"
SPARK_EXECUTOR_MEMORY="2g"
SPARK_CASSANDRA_HOST="xxx"
DCOS_ACS_TOKEN="read acs token"
SPARK_DOCKER_IMAGE="mesosphere/spark:1.1.0-2.1.1-hadoop-2.6"

if [ "$environment" = "staging" ]; then
SPARK_CASSANDRA_USER="xxx"
SPARK_CASSANDRA_PASSWORD="xxxx"
SPARK_POSTGRES_HOST="xxxx"
SPARK_POSTGRES_USER="xxxx"
SPARK_POSTGRES_PASSWORD="xxxx"
else
SPARK_CASSANDRA_USER="xxx"
SPARK_CASSANDRA_PASSWORD="xxxx"
SPARK_POSTGRES_HOST="xxxx"
SPARK_POSTGRES_USER="xxxx"
SPARK_POSTGRES_PASSWORD="xxxx"
fi

generate_payload() {
cat <<-EOF
{
"action" : "CreateSubmissionRequest",
"appArgs" : [ $SPARK_ARGS ],
"appResource" : "$SPARK_JARS",
"clientSparkVersion" : "2.1.1",
"environmentVariables" : {
"SPARK_ENV_LOADED" : "1"
},
"mainClass" : "$SPARK_CLASS_NAME",
"sparkProperties" : {
"spark.jars" : "$SPARK_JARS",
"spark.driver.supervise" : "false",
"spark.app.name" : "$SPARK_APP_NAME",
"spark.eventLog.enabled": "true",
"spark.submit.deployMode" : "cluster",
"spark.master" : "spark://${SPARK_MASTER}",
"spark.driver.cores" : "${SPARK_DRIVER_CORES}",
"spark.driver.memory" : "${SPARK_DRIVER_MEMORY}",
"spark.cores.max" : "${SPARK_CORES_MAX}",
"spark.executor.memory" : "${SPARK_EXECUTOR_MEMORY}",
"spark.postgres.hostname" : "${SPARK_POSTGRES_HOST}",
"spark.postgres.username" : "${SPARK_POSTGRES_USER}",
"spark.postgres.password" : "${SPARK_POSTGRES_PASSWORD}",
"spark.mesos.executor.docker.image": "${SPARK_DOCKER_IMAGE}",
"spark.driver.supervise": false,
"spark.ssl.noCertVerification": true
}
}
EOF
}

acsHeader="Authorization: token=$DCOS_ACS_TOKEN"

submission=$(curl -sS -X POST "${SPARK_DCOS_URL}/v1/submissions/create" --header "$acsHeader" --header "Content-Type:application/json;charset=UTF-8" --data "$(generate_payload)")
submissionSuccess=$(echo $submission | jq -r 'select(.success)')

if [[ $submissionSuccess ]]; then
SECONDS=0

submissionId=$(echo $submission | jq -rj '.submissionId')
# Wait for the job to finish
until test "$(curl --header "$acsHeader" -sS $SPARK_DCOS_URL/v1/submissions/status/$submissionId | jq -r '.driverState')" != "RUNNING"; do sleep 10; echo "Waiting..."; done
lastStatus=$(curl --header "$acsHeader" -sS $SPARK_DCOS_URL/v1/submissions/status/$submissionId | jq -r '.')
echo $lastStatus
taskState=$(echo $lastStatus | jq -r '.message' | grep state | cut -d " " -f 2)
taskMessage=$(echo $lastStatus | jq -r '.message' | grep message | cut -d " " -f 2)

if [ "$taskState" = "TASK_FINISHED" ] || [ "$taskState" = "TASK_OK" ]; then
echo "Job succeeded."
echo "$taskMessage"
DURATION=$SECONDS
curl -s https://metrics-api.librato.com/v1/metrics -u $LIBRATO_USERNAME:$LIBRATO_TOKEN -H 'Content-Type: application/json' \
-d "{\"gauges\":{\"spark_jobs.$SPARK_APP_NAME.duration.s\":{\"value\":$DURATION,\"source\":\"jenkins\"}}}"
exit 0;
else
echo "Job failed:"
echo "$taskMessage"
# TODO: notify people!
exit 1;
fi
else
echo "Submission failed ! Exiting..."
echo $submission | jq -r '.message'
exit 1
fi

24 changes: 19 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@ homepage := Some(url("https://github.com/metamx/druid-spark-batch"))
crossScalaVersions := Seq("2.11.7", "2.10.6")
releaseIgnoreUntrackedFiles := true

val druid_version = "0.11.0-SNAPSHOT"
val druid_version = "0.11.1-SNAPSHOT"
// This is just used here for Path, so anything that doesn't break spark should be fine
val hadoop_version = "2.7.3"
val spark_version = "2.1.0"
val guava_version = "16.0.1"
val guava_version = "17.0"
val mesos_version = "0.25.0"
val parquet_version = "1.8.2"
val curator_version = "4.0.0"
val zookeeper_version = "3.4.10"

libraryDependencies += "org.apache.parquet" % "parquet-common" % parquet_version exclude("com.google.guava", "guava")
libraryDependencies += "org.apache.parquet" % "parquet-encoding" % parquet_version exclude("com.google.guava", "guava")
libraryDependencies += "org.apache.parquet" % "parquet-column" % parquet_version exclude("com.google.guava", "guava")
libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % parquet_version exclude("com.google.guava", "guava")
libraryDependencies += "org.apache.parquet" % "parquet-avro" % parquet_version exclude("com.google.guava", "guava")

val sparkDep = ("org.apache.spark" %% "spark-core" % spark_version
exclude("org.roaringbitmap", "RoaringBitmap")
Expand Down Expand Up @@ -99,13 +108,17 @@ libraryDependencies += "io.druid" % "druid-processing" % druid_version % "provid
libraryDependencies += "io.druid" % "druid-server" % druid_version % "provided"
libraryDependencies += "io.druid" % "druid-indexing-service" % druid_version % "provided"
libraryDependencies += "io.druid" % "druid-indexing-hadoop" % druid_version % "provided"
libraryDependencies += "io.druid.extensions" % "druid-avro-extensions" % druid_version % "provided"

libraryDependencies +=
"org.joda" % "joda-convert" % "1.8.1" % "provided" // Prevents intellij silliness and sbt warnings
libraryDependencies += "com.google.guava" % "guava" % guava_version % "provided"// Prevents serde problems for guice exceptions
libraryDependencies += "com.google.guava" % "guava" % guava_version
libraryDependencies += "com.sun.jersey" % "jersey-servlet" % "1.17.1" % "provided"

libraryDependencies += "org.apache.mesos" % "mesos" % mesos_version % "provided" classifier "shaded-protobuf"

libraryDependencies += "org.apache.zookeeper" % "zookeeper" % zookeeper_version

releaseCrossBuild := true

assemblyMergeStrategy in assembly := {
Expand Down Expand Up @@ -143,7 +156,8 @@ resolvers += "JitPack.IO" at "https://jitpack.io"
publishMavenStyle := true

//TODO: remove this before moving to druid.io
publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local")
//publishTo := Some("central-local" at "https://metamx.artifactoryonline.com/metamx/libs-releases-local")
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository")))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just putting a note here as a reminder to correct this before merge.

pomIncludeRepository := { _ => false }

pomExtra := (
Expand All @@ -163,5 +177,5 @@ pomExtra := (

testOptions += Tests.Argument(TestFrameworks.JUnit, "-Duser.timezone=UTC")
// WTF SBT?
javaOptions in Test += "-Duser.timezone=UTC"
javaOptions in Test += "-Duser.timezone=UTC"
fork in Test := true
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.druid.indexer.spark.parquet;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.avro.AvroParsers;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.avro.generic.GenericRecord;

import java.util.Map;

public class ParquetInputRowParser implements InputRowParser<GenericRecord> {
private final ParseSpec parseSpec;
private final ObjectFlattener<GenericRecord> avroFlattener;

@JsonCreator
public ParquetInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec
)
{
this.parseSpec = parseSpec;
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
}

@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}

@Override
public ParquetInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ParquetInputRowParser(parseSpec);
}

@Override
public InputRow parse(GenericRecord input)
{
// We should really create a ParquetBasedInputRow that does not need an intermediate map but accesses
// the SimpleRecord directly...
MapBasedInputRow mapBasedInputRow = (MapBasedInputRow) AvroParsers.parseGenericRecord(input, parseSpec, avroFlattener);
Map<String, Object> event = mapBasedInputRow.getEvent();
Map<String, Object> newMap = Maps.newHashMapWithExpectedSize(event.size());
for (String key : mapBasedInputRow.getEvent().keySet()) {
newMap.put(key, event.get(key));
}
return new MapBasedInputRow(mapBasedInputRow.getTimestampFromEpoch(), mapBasedInputRow.getDimensions(), newMap);
}
}
28 changes: 12 additions & 16 deletions src/main/scala/io/druid/indexer/spark/SparkBatchIndexTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import java.io.{Closeable, File, IOException, PrintWriter}
import java.nio.file.Files
import java.util
import java.util.{Objects, Properties}

import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.google.common.base.{Preconditions, Strings}
import com.google.common.collect.Iterables
import com.google.common.io.Closer
import io.druid.data.input.impl.ParseSpec
import io.druid.indexing.common.{TaskStatus, TaskToolbox}
import io.druid.indexing.common.actions.{LockTryAcquireAction, TaskActionClient}
import io.druid.indexing.common.actions.{LockListAction, LockTryAcquireAction, TaskActionClient}
import io.druid.indexing.common.task.{AbstractTask, HadoopTask}
import io.druid.indexing.common.{TaskLockType, TaskStatus, TaskToolbox}
import io.druid.java.util.common.JodaUtils
import io.druid.java.util.common.granularity._
import io.druid.java.util.common.logger.Logger
Expand All @@ -40,6 +41,7 @@ import io.druid.segment.indexing.DataSchema
import io.druid.timeline.DataSegment
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.Interval

import scala.collection.JavaConversions._

@JsonCreator
Expand Down Expand Up @@ -71,17 +73,11 @@ class SparkBatchIndexTask(
@JsonProperty("buildV9Directly")
buildV9Directly: Boolean = false
) extends HadoopTask(
if (id == null) {
AbstractTask
.makeId(
null, SparkBatchIndexTask.TASK_TYPE_BASE, dataSchema.getDataSource,
JodaUtils
.umbrellaInterval(JodaUtils.condenseIntervals(Preconditions.checkNotNull(intervals, "%s", "intervals")))
)
}
else {
id
},
TaskIdGenerator.getOrMakeId(
id, SparkBatchIndexTask.TASK_TYPE_BASE, dataSchema.getDataSource,
JodaUtils
.umbrellaInterval(JodaUtils.condenseIntervals(Preconditions.checkNotNull(intervals, "%s", "intervals")))
),
dataSchema.getDataSource,
hadoopDependencyCoordinates,
if (context == null) {
Expand Down Expand Up @@ -147,10 +143,10 @@ class SparkBatchIndexTask(
val classLoader = buildClassLoader(toolbox)
val task = toolbox.getObjectMapper.writeValueAsString(this)
log.debug("Sending task `%s`", task)

val taskLocks = toolbox.getTaskActionClient.submit(new LockListAction)
val result = HadoopTask.invokeForeignLoader[util.ArrayList[String], util.ArrayList[String]](
"io.druid.indexer.spark.Runner",
new util.ArrayList(List(task, Iterables.getOnlyElement(getTaskLocks(toolbox)).getVersion, outputPath)),
new util.ArrayList(List(task, Iterables.getOnlyElement(taskLocks).getVersion, outputPath)),
classLoader
)
toolbox.publishSegments(result.map(toolbox.getObjectMapper.readValue(_, classOf[DataSegment])))
Expand Down Expand Up @@ -196,7 +192,7 @@ class SparkBatchIndexTask(

@throws(classOf[Exception])
override def isReady(taskActionClient: TaskActionClient): Boolean = taskActionClient
.submit(new LockTryAcquireAction(lockInterval)) != null
.submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, lockInterval)) != null

@JsonProperty("id")
override def getId = super.getId
Expand Down
Loading