From 05bbe2809a9e87dcf2f1b2c973176fd586bfb3e1 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Tue, 3 Mar 2015 09:06:17 +0000 Subject: [PATCH 01/18] Initial implementation of idle task tracker monitoring Conflicts: src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java --- .gitignore | 1 + configuration.md | 18 +++ .../apache/hadoop/mapred/MesosExecutor.java | 111 ++++++++++++++++-- .../apache/hadoop/mapred/MesosScheduler.java | 3 + .../apache/hadoop/mapred/MesosTracker.java | 76 +++++++++++- .../apache/hadoop/mapred/ResourcePolicy.java | 100 ++++++++-------- .../java/org/apache/mesos/hadoop/Utils.java | 20 +++- 7 files changed, 261 insertions(+), 68 deletions(-) diff --git a/.gitignore b/.gitignore index fc8468b..0a21ece 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ target .vimrc .idea/ *.iml +*-pom.xml diff --git a/configuration.md b/configuration.md index 24f9bfa..8990d58 100644 --- a/configuration.md +++ b/configuration.md @@ -161,6 +161,24 @@ default values. + + + mapred.mesos.tracker.idle.interval + 5 + + Internal (in seconds) to check for TaskTrackers that have idle + slots. Default is 5 seconds. + + + + mapred.mesos.tracker.idle.checks + 5 + + After this many successful idle checks (meaning all slots *are* idle) the + slots will be revoked from the TaskTracker. + + + mapred.mesos.metrics.enabled diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 79b8b28..1054de4 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -11,11 +11,21 @@ import java.io.*; +import java.lang.reflect.Field; +import java.lang.ReflectiveOperationException; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + public class MesosExecutor implements Executor { public static final Log LOG = LogFactory.getLog(MesosExecutor.class); private SlaveInfo slaveInfo; private TaskTracker taskTracker; + protected final ScheduledExecutorService timerScheduler = + Executors.newScheduledThreadPool(1); + public static void main(String[] args) { MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor()); System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1); @@ -37,10 +47,8 @@ private JobConf configure(final TaskInfo task) { conf.writeXml(writer); writer.flush(); String xml = writer.getBuffer().toString(); - String xmlFormatted = - org.apache.mesos.hadoop.Utils.formatXml(xml); LOG.info("XML Configuration received:\n" + - xmlFormatted); + org.apache.mesos.hadoop.Utils.formatXml(xml)); } catch (Exception e) { LOG.warn("Failed to output configuration as XML.", e); } @@ -123,14 +131,16 @@ public void run() { } @Override - public void killTask(ExecutorDriver driver, TaskID taskId) { + public void killTask(final ExecutorDriver driver, final TaskID taskId) { LOG.info("Killing task : " + taskId.getValue()); - try { - taskTracker.shutdown(); - } catch (IOException e) { - LOG.error("Failed to shutdown TaskTracker", e); - } catch (InterruptedException e) { - LOG.error("Failed to shutdown TaskTracker", e); + if (taskTracker != null) { + LOG.info("Revoking task tracker map/reduce slots"); + revokeSlots(); + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()); } } @@ -159,4 +169,85 @@ public void error(ExecutorDriver d, String message) { public void shutdown(ExecutorDriver d) { LOG.info("Executor asked to shutdown"); } + + public void revokeSlots() { + if (taskTracker == null) { + LOG.error("Task tracker is not initialized"); + return; + } + + int mapSlotsToRevoke = taskTracker.getJobConf().getInt("mapred.tasktracker.map.tasks.revoke", 0); + int reduceSlotsToRevoke = taskTracker.getJobConf().getInt("mapred.tasktracker.reduce.tasks.revoke", 0); + + int maxMapSlots = taskTracker.getMaxCurrentMapTasks() - mapSlotsToRevoke; + int maxReduceSlots = taskTracker.getMaxCurrentReduceTasks() - reduceSlotsToRevoke; + + // TODO(tarnfeld): Sanity check that it's safe for us to change the slots. + // Be sure there's nothing running and nothing in the launcher queue. + + // If we expect to have no slots, let's go ahead and terminate the task launchers + if (maxMapSlots == 0) { + try { + Field launcherField = taskTracker.getClass().getDeclaredField("mapLauncher"); + launcherField.setAccessible(true); + + // Kill the current map task launcher + ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)).interrupt(); + } catch (ReflectiveOperationException e) { + LOG.fatal("Failed updating map slots due to error with reflection", e); + } + } + + if (maxReduceSlots == 0) { + try { + Field launcherField = taskTracker.getClass().getDeclaredField("reduceLauncher"); + launcherField.setAccessible(true); + + // Kill the current reduce task launcher + ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)).interrupt(); + } catch (ReflectiveOperationException e) { + LOG.fatal("Failed updating reduce slots due to error with reflection", e); + } + } + + // Configure the new slot counts on the task tracker + taskTracker.setMaxMapSlots(maxMapSlots); + taskTracker.setMaxReduceSlots(maxReduceSlots); + + // If we have zero slots left, commit suicide when no jobs are running + if (maxMapSlots + maxReduceSlots == 0) { + scheduleSuicideTimer(); + } + } + + protected void scheduleSuicideTimer() { + timerScheduler.schedule(new Runnable() { + @Override + public void run() { + if (taskTracker == null) { + return; + } + + LOG.info("Checking to see if TaskTracker has no running jobs"); + int runningJobs = taskTracker.runningJobs.size(); + + // Check to see if the number of running jobs on the task tracker is zero + if (runningJobs == 0) { + LOG.warn("TaskTracker has zero jobs running, terminating"); + + try { + taskTracker.shutdown(); + } catch (IOException e) { + LOG.error("Failed to shutdown TaskTracker", e); + } catch (InterruptedException e) { + LOG.error("Failed to shutdown TaskTracker", e); + } + } + else { + LOG.info("TaskTracker has " + runningJobs + " jobs running"); + scheduleSuicideTimer(); + } + } + }, 1000, TimeUnit.MILLISECONDS); + } } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 3f42774..92c2a22 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -45,6 +45,9 @@ public class MesosScheduler extends TaskScheduler implements Scheduler { // giving up. public static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes public static final long PERIODIC_MS = 300000; // 5 minutes + public static final long DEFAULT_IDLE_CHECK_INTERVAL = 5; // 5 seconds + // Destroy task trackers after being idle for N idle checks + public static final long DEFAULT_IDLE_REVOCATION_CHECKS = 5; private SchedulerDriver driver; protected TaskScheduler taskScheduler; diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index dbc696f..73f5fb7 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -22,7 +22,12 @@ public class MesosTracker { public TaskID taskId; public long mapSlots; public long reduceSlots; + public volatile long idleCounter = 0; + public volatile long idleCheckInterval = 0; + public volatile long idleCheckMax = 0; public volatile boolean active = false; // Set once tracked by the JobTracker. + public volatile boolean stopped = false; + public volatile boolean killed = false; public volatile MesosScheduler scheduler; // Tracks Hadoop jobs running on the tracker. public Set jobs = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -35,11 +40,20 @@ public MesosTracker(HttpHost host, TaskID taskId, long mapSlots, this.mapSlots = mapSlots; this.reduceSlots = reduceSlots; this.scheduler = scheduler; + if (scheduler.metrics != null) { this.context = scheduler.metrics.trackerTimer.time(); } + this.idleCheckInterval = scheduler.conf.getLong("mapred.mesos.tracker.idle.interval", + MesosScheduler.DEFAULT_IDLE_CHECK_INTERVAL); + this.idleCheckMax = scheduler.conf.getLong("mapred.mesos.tracker.idle.checks", + MesosScheduler.DEFAULT_IDLE_REVOCATION_CHECKS); + scheduleStartupTimer(); + if (this.idleCheckInterval > 0 && this.idleCheckMax > 0) { + scheduleIdleCheck(); + } } protected void scheduleStartupTimer() { @@ -50,7 +64,6 @@ public void run() { // If the tracker activated while we were awaiting to acquire the // lock, start the periodic cleanup timer and return. schedulePeriodic(); - return; } @@ -59,7 +72,7 @@ public void run() { // Here we do a final check with the JobTracker to make sure this // TaskTracker is really not there before we kill it. final Collection taskTrackers = - MesosTracker.this.scheduler.jobTracker.taskTrackers(); + MesosTracker.this.scheduler.jobTracker.taskTrackers(); for (TaskTrackerStatus status : taskTrackers) { HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); @@ -79,6 +92,62 @@ public void run() { }, MesosScheduler.LAUNCH_TIMEOUT_MS, TimeUnit.MILLISECONDS); } + protected void scheduleIdleCheck() { + scheduler.scheduleTimer(new Runnable() { + @Override + public void run() { + // We're not interested if the task tracker has been stopped or slots + // have already been revoked. + if (MesosTracker.this.stopped || MesosTracker.this.killed) { + return; + } + + // If the task tracker isn't active, wait until it is active. + // TODO(tarnfeld): Do this based on some kind of lock/wait? + if (!MesosTracker.this.active) { + scheduleIdleCheck(); + return; + } + + boolean trackerIsIdle = false; + + // We're only interested in TaskTrackers which have jobs assigned to them + // but are completely idle. The MesosScheduler is in charge of destroying + // task trackers that are not handling any jobs, so we can leave those alone. + if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { + MesosTracker.this.scheduler.killTracker(MesosTracker.this); + return; + } + + long idleMapSlots = 0; + long idleReduceSlots = 0; + + Collection taskTrackers = scheduler.jobTracker.taskTrackers(); + for (TaskTrackerStatus status : taskTrackers) { + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (host.toString().equals(MesosTracker.this.host.toString())) { + idleMapSlots += status.getAvailableMapSlots(); + idleReduceSlots += status.getAvailableReduceSlots(); + break; + } + } + + trackerIsIdle = idleMapSlots == MesosTracker.this.mapSlots && + idleReduceSlots == MesosTracker.this.reduceSlots; + + if (trackerIsIdle) { + LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); + MesosTracker.this.idleCounter += 1; + } else { + LOG.debug("TaskTracker is no longer idle: " + MesosTracker.this.host); + MesosTracker.this.idleCounter = 0; + } + + scheduleIdleCheck(); + } + }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); + } + protected void schedulePeriodic() { scheduler.scheduleTimer(new Runnable() { @Override @@ -104,7 +173,8 @@ public void run() { } public void stop() { - active = true; + active = false; + stopped = true; if (context != null) { context.stop(); } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 2895563..acfefc0 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -1,6 +1,5 @@ package org.apache.hadoop.mapred; -import com.google.protobuf.ByteString; import org.apache.commons.httpclient.HttpHost; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -11,6 +10,7 @@ import org.apache.mesos.Protos.*; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.SchedulerDriver; +import com.google.protobuf.ByteString; import java.io.*; import java.util.*; @@ -323,10 +323,6 @@ public void resourceOffers(SchedulerDriver schedulerDriver, LOG.info("Launching task " + taskId.getValue() + " on " + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots); - // Add this tracker to Mesos tasks. - scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId, - mapSlots, reduceSlots, scheduler)); - List defaultJvmOpts = Arrays.asList( "-XX:+UseConcMarkSweepGC", "-XX:+CMSParallelRemarkEnabled", @@ -450,50 +446,58 @@ public void resourceOffers(SchedulerDriver schedulerDriver, overrides.set("mapred.task.tracker.report.address", reportAddress.getHostName() + ':' + reportAddress.getPort()); - overrides.setLong("mapred.tasktracker.map.tasks.maximum", - mapSlots); + overrides.setLong("mapred.tasktracker.map.tasks.maximum", mapSlots); + overrides.setLong("mapred.tasktracker.reduce.tasks.maximum", reduceSlots); - overrides.setLong("mapred.tasktracker.reduce.tasks.maximum", - reduceSlots); - - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - overrides.write(new DataOutputStream(baos)); - baos.flush(); - } catch (IOException e) { - LOG.warn("Failed to serialize configuration.", e); - System.exit(1); - } - - byte[] bytes = baos.toByteArray(); - - TaskInfo info = TaskInfo + // Build up the executor info + ExecutorInfo executor = ExecutorInfo .newBuilder() - .setName(taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) + .setExecutorId(ExecutorID.newBuilder().setValue( + "executor_" + taskId.getValue())) + .setName("Hadoop TaskTracker") + .setSource(taskId.getValue()) .addResources( Resource .newBuilder() .setName("cpus") .setType(Value.Type.SCALAR) .setRole(cpuRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskCpus - containerCpus))) + .setScalar(Value.Scalar.newBuilder().setValue(containerCpus))) .addResources( Resource .newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskMem - containerMem))) + .setScalar(Value.Scalar.newBuilder().setValue(taskMem))) .addResources( Resource .newBuilder() .setName("disk") .setType(Value.Type.SCALAR) .setRole(diskRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskDisk - containerDisk))) + .setScalar(Value.Scalar.newBuilder().setValue(containerDisk))) + .setCommand(commandInfo.build()) + .build(); + + ByteString taskData; + + try { + taskData = org.apache.mesos.hadoop.Utils.confToBytes(overrides); + } catch (IOException e) { + LOG.error("Caught exception serializing configuration"); + + // Skip this offer completely + schedulerDriver.declineOffer(offer.getId()); + continue; + } + + // Create the TaskTracker TaskInfo + TaskInfo trackerTaskInfo = TaskInfo + .newBuilder() + .setName("tasktracker_" + taskId.getValue()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId()) .addResources( Resource .newBuilder() @@ -509,33 +513,23 @@ public void resourceOffers(SchedulerDriver schedulerDriver, .addRange(Value.Range.newBuilder() .setBegin(reportAddress.getPort()) .setEnd(reportAddress.getPort())))) - .setExecutor( - ExecutorInfo + .addResources( + Resource .newBuilder() - .setExecutorId(ExecutorID.newBuilder().setValue( - "executor_" + taskId.getValue())) - .setName("Hadoop TaskTracker") - .setSource(taskId.getValue()) - .addResources( - Resource - .newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setRole(cpuRole) - .setScalar(Value.Scalar.newBuilder().setValue( - (containerCpus)))) - .addResources( - Resource - .newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(containerMem))) - .setCommand(commandInfo.build())) - .setData(ByteString.copyFrom(bytes)) + .setName("cpus") + .setType(Value.Type.SCALAR) + .setRole(cpuRole) + .setScalar(Value.Scalar.newBuilder().setValue(taskCpus - containerCpus))) + .setData(taskData) + .setExecutor(executor) .build(); - schedulerDriver.launchTasks(Arrays.asList(offer.getId()), Arrays.asList(info)); + // Add this tracker to Mesos tasks. + scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId, + mapSlots, reduceSlots, scheduler)); + + // Launch the task + schedulerDriver.launchTasks(Arrays.asList(offer.getId()), Arrays.asList(trackerTaskInfo)); neededMapSlots -= mapSlots; neededReduceSlots -= reduceSlots; diff --git a/src/main/java/org/apache/mesos/hadoop/Utils.java b/src/main/java/org/apache/mesos/hadoop/Utils.java index b8c325c..823a0e7 100644 --- a/src/main/java/org/apache/mesos/hadoop/Utils.java +++ b/src/main/java/org/apache/mesos/hadoop/Utils.java @@ -1,21 +1,37 @@ + package org.apache.mesos.hadoop; import javax.xml.transform.*; import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamSource; -import java.io.StringReader; -import java.io.StringWriter; +import java.io.*; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; public class Utils { + public static String formatXml(String source) throws TransformerException { Source xmlInput = new StreamSource(new StringReader(source)); StringWriter stringWriter = new StringWriter(); StreamResult xmlOutput = new StreamResult(stringWriter); + TransformerFactory transformerFactory = TransformerFactory.newInstance(); transformerFactory.setAttribute("indent-number", 2); + Transformer transformer = transformerFactory.newTransformer(); transformer.setOutputProperty(OutputKeys.INDENT, "yes"); transformer.transform(xmlInput, xmlOutput); + return xmlOutput.getWriter().toString(); } + + public static ByteString confToBytes(Configuration conf) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + conf.write(new DataOutputStream(baos)); + baos.flush(); + + byte[] bytes = baos.toByteArray(); + return ByteString.copyFrom(bytes); + } } From df3ae2954e851a478b9163385f2651c7479b2b38 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 17:25:02 +0100 Subject: [PATCH 02/18] Don't send duplicate task updates --- src/main/java/org/apache/hadoop/mapred/MesosExecutor.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 1054de4..d95aa5e 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -136,11 +136,6 @@ public void killTask(final ExecutorDriver driver, final TaskID taskId) { if (taskTracker != null) { LOG.info("Revoking task tracker map/reduce slots"); revokeSlots(); - - driver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(taskId) - .setState(TaskState.TASK_FINISHED) - .build()); } } From 1e73dd7afb04f27952d40847b0c32c3287784da1 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 17:25:08 +0100 Subject: [PATCH 03/18] Remove configurables --- src/main/java/org/apache/hadoop/mapred/MesosExecutor.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index d95aa5e..06a7c29 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -171,11 +171,8 @@ public void revokeSlots() { return; } - int mapSlotsToRevoke = taskTracker.getJobConf().getInt("mapred.tasktracker.map.tasks.revoke", 0); - int reduceSlotsToRevoke = taskTracker.getJobConf().getInt("mapred.tasktracker.reduce.tasks.revoke", 0); - - int maxMapSlots = taskTracker.getMaxCurrentMapTasks() - mapSlotsToRevoke; - int maxReduceSlots = taskTracker.getMaxCurrentReduceTasks() - reduceSlotsToRevoke; + int maxMapSlots = 0; + int maxReduceSlots = 0; // TODO(tarnfeld): Sanity check that it's safe for us to change the slots. // Be sure there's nothing running and nothing in the launcher queue. From 5c3b91a5eba85afb3ef7543a9946dcf8c38d7779 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 17:33:33 +0100 Subject: [PATCH 04/18] Don't try and schedule tasks on a killed task tracker --- .../org/apache/hadoop/mapred/MesosScheduler.java | 15 ++++++++++++++- .../org/apache/hadoop/mapred/MesosTracker.java | 6 ++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 92c2a22..4203659 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -248,6 +248,19 @@ public List assignTasks(TaskTracker taskTracker) LOG.info("Unknown/exited TaskTracker: " + tracker + ". "); return null; } + + MesosTracker mesosTracker = mesosTrackers.get(tracker); + + // Make sure we're not asked to assign tasks to any task trackers that have + // been stopped. This could happen while the task tracker has not been + // removed from the cluster e.g still in the heartbeat timeout period. + synchronized (this) { + if (mesosTracker.stopped) { + LOG.info("Asked to assign tasks to stopped tracker " + tracker + "."); + return null; + } + } + // Let the underlying task scheduler do the actual task scheduling. List tasks = taskScheduler.assignTasks(taskTracker); @@ -258,7 +271,7 @@ public List assignTasks(TaskTracker taskTracker) // Keep track of which TaskTracker contains which tasks. for (Task task : tasks) { - mesosTrackers.get(tracker).jobs.add(task.getJobID()); + mesosTracker.jobs.add(task.getJobID()); } return tasks; diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 73f5fb7..8213e09 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -27,7 +27,6 @@ public class MesosTracker { public volatile long idleCheckMax = 0; public volatile boolean active = false; // Set once tracked by the JobTracker. public volatile boolean stopped = false; - public volatile boolean killed = false; public volatile MesosScheduler scheduler; // Tracks Hadoop jobs running on the tracker. public Set jobs = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -96,9 +95,8 @@ protected void scheduleIdleCheck() { scheduler.scheduleTimer(new Runnable() { @Override public void run() { - // We're not interested if the task tracker has been stopped or slots - // have already been revoked. - if (MesosTracker.this.stopped || MesosTracker.this.killed) { + // We're not interested if the task tracker has been stopped. + if (MesosTracker.this.stopped) { return; } From a5a04e1aca83d67425b42b8ca29156669e5eced6 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 17:48:53 +0100 Subject: [PATCH 05/18] Disk defaults --- src/main/java/org/apache/hadoop/mapred/MesosScheduler.java | 3 ++- src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 4203659..3f1e63f 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -36,8 +36,9 @@ public class MesosScheduler extends TaskScheduler implements Scheduler { public static final double SLOT_CPUS_DEFAULT = 1; // 1 cores. public static final int SLOT_DISK_DEFAULT = 1024; // 1 GB. public static final int SLOT_JVM_HEAP_DEFAULT = 1024; // 1024MB. - public static final double TASKTRACKER_CPUS = 1.0; // 1 core. + public static final double TASKTRACKER_CPUS_DEFAULT = 1.0; // 1 core. public static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB. + public static final int TASKTRACKER_DISK_DEFAULT = 1024; // 1 GB. // The default behavior in Hadoop is to use 4 slots per TaskTracker: public static final int MAP_SLOTS_DEFAULT = 2; public static final int REDUCE_SLOTS_DEFAULT = 2; diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index acfefc0..2695517 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -64,11 +64,11 @@ public ResourcePolicy(MesosScheduler scheduler) { (MesosScheduler.JVM_MEM_OVERHEAD_PERCENT_DEFAULT + 1)); containerCpus = scheduler.conf.getFloat("mapred.mesos.tasktracker.cpus", - (float) MesosScheduler.TASKTRACKER_CPUS); + (float) MesosScheduler.TASKTRACKER_CPUS_DEFAULT); + containerDisk = scheduler.conf.getInt("mapred.mesos.tasktracker.disk", + MesosScheduler.TASKTRACKER_DISK_DEFAULT); containerMem = tasktrackerMem; - containerDisk = 0; - } public void computeNeededSlots(List jobsInProgress, From d7bc8fd975c019773b9a1e6eedf6423404f1785d Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 17:48:58 +0100 Subject: [PATCH 06/18] Remove the tasktracker_ prefix --- src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 2695517..05cf90b 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -495,7 +495,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, // Create the TaskTracker TaskInfo TaskInfo trackerTaskInfo = TaskInfo .newBuilder() - .setName("tasktracker_" + taskId.getValue()) + .setName(taskId.getValue()) .setTaskId(taskId) .setSlaveId(offer.getSlaveId()) .addResources( From aacabf47ef1e4e8055f9d1c08a085b01a3e9560d Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 19:28:29 +0100 Subject: [PATCH 07/18] Add in some logging around killing task trackers --- src/main/java/org/apache/hadoop/mapred/MesosTracker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 8213e09..b983753 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -113,6 +113,7 @@ public void run() { // but are completely idle. The MesosScheduler is in charge of destroying // task trackers that are not handling any jobs, so we can leave those alone. if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { + LOG.info("Killing idle tasktracker: " + MesosTracker.this.host); MesosTracker.this.scheduler.killTracker(MesosTracker.this); return; } From bd8f55cddeb084b280372ed0baaa6bdef7d2396f Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 19:28:43 +0100 Subject: [PATCH 08/18] Ensure we have enough slots for job cleanup --- src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 05cf90b..72481ed 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -78,6 +78,7 @@ public void computeNeededSlots(List jobsInProgress, int pendingReduces = 0; int runningMaps = 0; int runningReduces = 0; + for (JobInProgress progress : jobsInProgress) { // JobStatus.pendingMaps/Reduces may return the wrong value on // occasion. This seems to be safer. @@ -85,6 +86,12 @@ public void computeNeededSlots(List jobsInProgress, pendingReduces += scheduler.getPendingTasks(progress.getTasks(TaskType.REDUCE)); runningMaps += progress.runningMaps(); runningReduces += progress.runningReduces(); + + // If the task is waiting to launch the cleanup task, let us make sure we have + // capacity to run the task. + if (!progress.isCleanupLaunched()) { + pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.JOB_CLEANUP)); + } } // Mark active (heartbeated) TaskTrackers and compute idle slots. From 11c78f088af73250435afae7b5a94ebc99272701 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 17 Oct 2014 19:32:15 +0100 Subject: [PATCH 09/18] Send a TASK_FINISHED update once the slots are revoked We need to free up the resources (CPU) assigned to the task, so lets do that now. --- .../org/apache/hadoop/mapred/MesosExecutor.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 06a7c29..04b4380 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -134,8 +134,19 @@ public void run() { public void killTask(final ExecutorDriver driver, final TaskID taskId) { LOG.info("Killing task : " + taskId.getValue()); if (taskTracker != null) { - LOG.info("Revoking task tracker map/reduce slots"); - revokeSlots(); + LOG.info("Revoking task tracker map/reduce slots"); + revokeSlots(); + + // Send the TASK_FINISHED status + new Thread("TaskFinishedUpdate") { + @Override + public void run() { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()); + } + }.start(); } } From cdbeef050129ca25ca1aabe6cec335b44d40fc61 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Mon, 20 Oct 2014 19:34:58 +0100 Subject: [PATCH 10/18] Only log out if the tracker wasn't idle before --- src/main/java/org/apache/hadoop/mapred/MesosTracker.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index b983753..dd97828 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -138,7 +138,9 @@ public void run() { LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); MesosTracker.this.idleCounter += 1; } else { - LOG.debug("TaskTracker is no longer idle: " + MesosTracker.this.host); + if (MesosTracker.this.idleCounter > 0) { + LOG.info("TaskTracker is no longer idle: " + MesosTracker.this.host); + } MesosTracker.this.idleCounter = 0; } From 12c88d94979ad0b0eb50e40d4e59ecd1318f6f3a Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Fri, 14 Nov 2014 01:03:30 +0000 Subject: [PATCH 11/18] Ensure we trigger another scheduleIdleCheck(); after killing the tracker --- src/main/java/org/apache/hadoop/mapred/MesosTracker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index dd97828..021e0ea 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -115,6 +115,7 @@ public void run() { if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { LOG.info("Killing idle tasktracker: " + MesosTracker.this.host); MesosTracker.this.scheduler.killTracker(MesosTracker.this); + scheduleIdleCheck(); return; } From 9d17b13b820c38205f0a2f0d994bfe08aef88179 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Wed, 7 Jan 2015 17:29:01 +0000 Subject: [PATCH 12/18] Switch to checking for running tasks, not jobs In some cases when jobs fail, the runningJobs map is not updated correctly. --- .../java/org/apache/hadoop/mapred/MesosExecutor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 04b4380..6992dfd 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -231,12 +231,12 @@ public void run() { return; } - LOG.info("Checking to see if TaskTracker has no running jobs"); - int runningJobs = taskTracker.runningJobs.size(); + LOG.info("Checking to see if TaskTracker has no running tasks"); + int runningTasks = taskTracker.runningTasks.size(); // Check to see if the number of running jobs on the task tracker is zero - if (runningJobs == 0) { - LOG.warn("TaskTracker has zero jobs running, terminating"); + if (runningTasks == 0) { + LOG.warn("TaskTracker has zero tasks running, terminating"); try { taskTracker.shutdown(); @@ -247,7 +247,7 @@ public void run() { } } else { - LOG.info("TaskTracker has " + runningJobs + " jobs running"); + LOG.info("TaskTracker has " + runningTasks + " jobs running"); scheduleSuicideTimer(); } } From 078d6b90b0e54f14788fe7fc32803abc04bc2ee4 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Mon, 12 Jan 2015 14:30:51 +0000 Subject: [PATCH 13/18] Revert "Switch to checking for running tasks, not jobs" This reverts commit b4f9556b721d7017e2f996c9682628304715d93f. --- .../java/org/apache/hadoop/mapred/MesosExecutor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 6992dfd..04b4380 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -231,12 +231,12 @@ public void run() { return; } - LOG.info("Checking to see if TaskTracker has no running tasks"); - int runningTasks = taskTracker.runningTasks.size(); + LOG.info("Checking to see if TaskTracker has no running jobs"); + int runningJobs = taskTracker.runningJobs.size(); // Check to see if the number of running jobs on the task tracker is zero - if (runningTasks == 0) { - LOG.warn("TaskTracker has zero tasks running, terminating"); + if (runningJobs == 0) { + LOG.warn("TaskTracker has zero jobs running, terminating"); try { taskTracker.shutdown(); @@ -247,7 +247,7 @@ public void run() { } } else { - LOG.info("TaskTracker has " + runningTasks + " jobs running"); + LOG.info("TaskTracker has " + runningJobs + " jobs running"); scheduleSuicideTimer(); } } From e7eb4fb9d4b567f21ae860d1cd7da10fdf228fb0 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Sat, 28 Feb 2015 23:02:51 +0000 Subject: [PATCH 14/18] Make sure we call notifySlots() before killing the task launcher --- src/main/java/org/apache/hadoop/mapred/MesosExecutor.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 04b4380..e6e5e32 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -195,7 +195,9 @@ public void revokeSlots() { launcherField.setAccessible(true); // Kill the current map task launcher - ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)).interrupt(); + TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)); + launcher.notifySlots(); + launcher.interrupt(); } catch (ReflectiveOperationException e) { LOG.fatal("Failed updating map slots due to error with reflection", e); } @@ -207,7 +209,9 @@ public void revokeSlots() { launcherField.setAccessible(true); // Kill the current reduce task launcher - ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)).interrupt(); + TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)); + launcher.notifySlots(); + launcher.interrupt(); } catch (ReflectiveOperationException e) { LOG.fatal("Failed updating reduce slots due to error with reflection", e); } From 3fd9c2a6469908b7c59bd9b56ecc5c2535bd64e3 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Sat, 28 Feb 2015 23:03:08 +0000 Subject: [PATCH 15/18] Clean up brackets --- src/main/java/org/apache/hadoop/mapred/MesosExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index e6e5e32..4566271 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -222,7 +222,7 @@ public void revokeSlots() { taskTracker.setMaxReduceSlots(maxReduceSlots); // If we have zero slots left, commit suicide when no jobs are running - if (maxMapSlots + maxReduceSlots == 0) { + if ((maxMapSlots + maxReduceSlots) == 0) { scheduleSuicideTimer(); } } From c3f9540d3cf9a11e60283c72a9f5643e3ecedd71 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Sat, 28 Feb 2015 23:03:32 +0000 Subject: [PATCH 16/18] Push memory limits onto the tasks This now means that when slots are freed by the framework, not only will the CPU become available but so will some of the memory. --- .../java/org/apache/hadoop/mapred/ResourcePolicy.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 72481ed..3a52888 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -476,7 +476,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, .setName("mem") .setType(Value.Type.SCALAR) .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskMem))) + .setScalar(Value.Scalar.newBuilder().setValue(containerMem))) .addResources( Resource .newBuilder() @@ -527,6 +527,13 @@ public void resourceOffers(SchedulerDriver schedulerDriver, .setType(Value.Type.SCALAR) .setRole(cpuRole) .setScalar(Value.Scalar.newBuilder().setValue(taskCpus - containerCpus))) + .addResources( + Resource + .newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole(memRole) + .setScalar(Value.Scalar.newBuilder().setValue(taskMem - containerCpus))) .setData(taskData) .setExecutor(executor) .build(); From aa996536004ac123d4122278d43a5407eff588d8 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Sat, 28 Feb 2015 23:04:34 +0000 Subject: [PATCH 17/18] Make use of the isIdle() method on the TaskTracker Previously we would check the number of running jobs, however that sometimes returend incorrect values especially when dealing with failed jobs on the cluster. The result being some TaskTrackers never commit suicide. --- .../apache/hadoop/mapred/MesosExecutor.java | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 4566271..8ab2a4f 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -14,6 +14,7 @@ import java.lang.reflect.Field; import java.lang.ReflectiveOperationException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -235,12 +236,12 @@ public void run() { return; } - LOG.info("Checking to see if TaskTracker has no running jobs"); - int runningJobs = taskTracker.runningJobs.size(); + LOG.info("Checking to see if TaskTracker is idle"); - // Check to see if the number of running jobs on the task tracker is zero - if (runningJobs == 0) { - LOG.warn("TaskTracker has zero jobs running, terminating"); + // If the task tracker is idle, all tasks have finished and task output + // has been cleaned up. + if (taskTracker.isIdle()) { + LOG.warn("TaskTracker is idle, terminating"); try { taskTracker.shutdown(); @@ -251,7 +252,17 @@ public void run() { } } else { - LOG.info("TaskTracker has " + runningJobs + " jobs running"); + try { + Field field = taskTracker.getClass().getDeclaredField("tasksToCleanup"); + field.setAccessible(true); + BlockingQueue tasksToCleanup = ((BlockingQueue) field.get(taskTracker)); + LOG.info("TaskTracker has " + taskTracker.tasks.size() + + " running tasks and " + tasksToCleanup + + " tasks to clean up."); + } catch (ReflectiveOperationException e) { + LOG.fatal("Failed to get task counts from TaskTracker", e); + } + scheduleSuicideTimer(); } } From 8028724e320e75fda94eb112c1325623be707c06 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Sat, 28 Mar 2015 13:38:00 +0000 Subject: [PATCH 18/18] Bump to 0.1.0 --- README.md | 11 +++++------ pom.xml | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 38396b8..a9a4390 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Hadoop on Mesos #### Overview #### -To run _Hadoop on Mesos_ you need to add the `hadoop-mesos-0.0.9.jar` +To run _Hadoop on Mesos_ you need to add the `hadoop-mesos-0.1.0.jar` library to your Hadoop distribution (any distribution that uses protobuf > 2.5.0) and set some new configuration properties. Read on for details. @@ -23,13 +23,13 @@ install `libsnappy`. The [`snappy-java`][snappy-java] package also includes a b #### Build #### -You can build `hadoop-mesos-0.0.9.jar` using Maven: +You can build `hadoop-mesos-0.1.0.jar` using Maven: ```shell mvn package ``` -If successful, the JAR will be at `target/hadoop-mesos-0.0.9.jar`. +If successful, the JAR will be at `target/hadoop-mesos-0.1.0.jar`. > NOTE: If you want to build against a different version of Mesos than > the default you'll need to update `mesos-version` in `pom.xml`. @@ -51,10 +51,10 @@ tar zxf hadoop-2.5.0-cdh5.2.0.tar.gz > **Take note**, the extracted directory is `hadoop-2.5.0-cdh5.2.0`. -Now copy `hadoop-mesos-0.0.9.jar` into the `share/hadoop/common/lib` folder. +Now copy `hadoop-mesos-0.1.0.jar` into the `share/hadoop/common/lib` folder. ```shell -cp /path/to/hadoop-mesos-0.0.9.jar hadoop-2.5.0-cdh5.2.0/share/hadoop/common/lib/ +cp /path/to/hadoop-mesos-0.1.0.jar hadoop-2.5.0-cdh5.2.0/share/hadoop/common/lib/ ``` Since CDH5 includes both MRv1 and MRv2 (YARN) and is configured for YARN by @@ -183,5 +183,4 @@ This feature can be especially useful if your hadoop jobs have software dependen _Please email user@mesos.apache.org with questions!_ - ---------- diff --git a/pom.xml b/pom.xml index c8dd048..f9b0832 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.apache.mesos hadoop-mesos - 0.0.9 + 0.1.0 UTF-8