/ 中存储网

Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)

2016-09-13 13:29:35 来源:中存储网

前言

当一个应用向YARN集群提交作业后,此作业的多个任务由于负载不均衡、资源分布不均等原因都会导致各个任务运行完成的时间不一致,甚至会出现一个任务明显慢于同一作业的其它任务的情况。如果对这种情况不加优化,最慢的任务最终会拖慢整个作业的整体执行进度。好在mapreduce框架提供了任务推断执行机制,当有必要时就启动一个备份任务。最终会采用备份任务和原任务中率先执行完的结果作为最终结果。

由于具体分析推断执行机制,篇幅很长,所以我会分成几篇内容陆续介绍。

推断执行测试

本文在我自己搭建的集群(集群搭建可以参阅《Linux下Hadoop2.6.0集群环境的搭建》一文)上,执行wordcount例子,来验证mapreduce框架的推断机制。我们输入以下命令:

 
  1. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize=19 /wordcount/input/ /wordcount/output/result1  


任务划分的信息如下:

我们看到map任务被划分为10:

这一次测试并没有发生推断执行的情况,我们可以再执行一次上面的命令,最后看到的信息如下:

其中看到执行的map数量多了1个,成了11个,而且还出现了Killed map tasks=1的信息,这表示这次执行最终发生了推断执行。其中一个map任务增加了一个备份任务,当备份的map任务和原有的map任务中有一个率先完成了,那么会将另一个慢的map任务杀死。reduce任务也是类似,只不过Hadoop2.6.0的子项目hadoop-mapreduce-examples中自带的wordcount例子中,使用Job.setNumReduceTasks(int)这个API将reduce任务的数量控制为1个。在这里我们看到推断执行有时候会发生,而有时候却不会,这是为什么呢?

在《Hadoop2.6.0运行mapreduce之Uber模式验证》一文中我还简短提到,如果启用了Uber运行模式,推断执行会被取消。关于这些内部的实现原理需要我们从架构设计和源码角度进行剖析,因为我们还需要知道所以然。

mapreduce推断执行设计架构

在mapreduce中设计了Speculator接口作为推断执行的统一规范,DefaultSpeculator作为一种服务在实现了Speculator的同时继承了AbstractService,DefaultSpeculator是mapreduce的默认实现。DefaultSpeculator负责处理SpeculatorEvent事件,目前主要包括四种事件,分别是:

  • JOB_CREATE:作业刚刚被创建时触发的事件,并处理一些初始化工作。
  • ATTEMPT_START:一个任务实例TaskAttemptImpl启动时触发的事件,DefaultSpeculator将会使用内部的推断估算器(默认是LegacyTaskRuntimeEstimator)开启对此任务实例的监控。
  • ATTEMPT_STATUS_UPDATE:当任务实例的状态更新时触发的事件,DefaultSpeculator将会更新推断估算器对任务的监控信息;更新正在运行中的任务(维护在runningTasks中);任务的统计信息(这些统计信息用于跟踪长时间未汇报心跳的任务,并积极主动的进行推断执行,而不是等待任务超时)
  • TASK_CONTAINER_NEED_UPDATE:任务Container数量发生变化时触发的事件。

TaskRuntimeEstimator接口为推断执行提供了计算模型的规范,默认的实现类是LegacyTaskRuntimeEstimator,此外还有ExponentiallySmoothedTaskRuntimeEstimator。这里暂时不对其内容进行深入介绍,在后面会陆续展开。

Speculator的初始化和启动伴随着MRAppMaster的初始化与启动。

接下来我们以Speculator接口的默认实现DefaultSpeculator为例,逐步分析其初始化、启动、推断执行等内容的工作原理。

Speculator的初始化

Speculator是MRAppMaster的子组件、子服务,所以也需要初始化。有经验的Hadoop工程师,想必知道当mapreduce作业提交给ResourceManager后,由RM负责向NodeManger通信启动一个Container用于执行MRAppMaster。启动MRAppMaster实际也是通过调用其main方法,其中会调用MRAppMaster实例的serviceInit方法,其中与Speculator有关的代码实现见代码清单1。

代码清单1 MRAppMaster的serviceInit方法中创建Speculator的代码
  1. if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)  
  2.     || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {  
  3.   //optional service to speculate on task attempts' progress  
  4.   speculator = createSpeculator(conf, context);  
  5.   addIfService(speculator);  
  6. }  
  7. speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);  
  8. dispatcher.register(Speculator.EventType.class,  
  9.     speculatorEventDispatcher);  

代码清单1所示代码的执行步骤如下:

  1. 当启用map任务推断(这里的MRJobConfig.MAP_SPECULATIVE实际由参数mapreduce.map.speculative控制,默认是true)或者启用reduce任务推断(这里的MRJobConfig.REDUCE_SPECULATIVE实际由参数mapreduce.reduce.speculative控制,默认是true)时调用createSpeculator方法创建推断服务。最后将Speculator添加为MRAppMaster的子服务。
  2. 向调度器dispatcher注册推断事件与推断事件的处理器SpeculatorEventDispatcher,以便触发了推断事件后交由SpeculatorEventDispatcher作进一步处理。

createSpeculator方法(见代码清单2)创建的推断服务的实现类默认是DefaultSpeculator,用户也可以通过参数yarn.app.mapreduce.am.job.speculator.class(即MRJobConfig.MR_AM_JOB_SPECULATOR)指定推断服务的实现类。

代码清单2 创建推断器

  1. protected Speculator createSpeculator(Configuration conf,  
  2.     final AppContext context) {  
  3.   return callWithJobClassLoader(conf, new Action<Speculator>() {  
  4.     public Speculator call(Configuration conf) {  
  5.       Class<? extends Speculator> speculatorClass;  
  6.       try {  
  7.         speculatorClass  
  8.             // "yarn.mapreduce.job.speculator.class"  
  9.             = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,  
  10.                             DefaultSpeculator.class,  
  11.                             Speculator.class);  
  12.         Constructor<? extends Speculator> speculatorConstructor  
  13.             = speculatorClass.getConstructor  
  14.                  (Configuration.class, AppContext.class);  
  15.         Speculator result = speculatorConstructor.newInstance(conf, context);  
  16.         return result;  
  17.       } catch (InstantiationException ex) {  
  18.         LOG.error("Can't make a speculator -- check "  
  19.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
  20.         throw new YarnRuntimeException(ex);  
  21.       } catch (IllegalAccessException ex) {  
  22.         LOG.error("Can't make a speculator -- check "  
  23.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
  24.         throw new YarnRuntimeException(ex);  
  25.       } catch (InvocationTargetException ex) {  
  26.         LOG.error("Can't make a speculator -- check "  
  27.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
  28.         throw new YarnRuntimeException(ex);  
  29.       } catch (NoSuchMethodException ex) {  
  30.         LOG.error("Can't make a speculator -- check "  
  31.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
  32.         throw new YarnRuntimeException(ex);  
  33.       }  
  34.     }  
  35.   });  
  36. }  

根据代码清单2,我们知道createSpeculator方法通过反射调用了DefaultSpeculator的构造器来实例化任务推断器。DefaultSpeculator的构造器如下:

  1. public DefaultSpeculator(Configuration conf, AppContext context) {  
  2.   this(conf, context, context.getClock());  
  3. }  
  4. public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {  
  5.   this(conf, context, getEstimator(conf, context), clock);  
  6. }  


上述第一个构造器调用了第二个构造器,而第二个构造器中首先调用了getEstimator方法(见代码清单3)用于获取推断估算器。

代码清单3 获取推断估算器
  1. static private TaskRuntimeEstimator getEstimator  
  2.     (Configuration conf, AppContext context) {  
  3.   TaskRuntimeEstimator estimator;  
  4.     
  5.   try {  
  6.     // "yarn.mapreduce.job.task.runtime.estimator.class"  
  7.     Class<? extends TaskRuntimeEstimator> estimatorClass  
  8.         = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,  
  9.                         LegacyTaskRuntimeEstimator.class,  
  10.                         TaskRuntimeEstimator.class);  
  11.     Constructor<? extends TaskRuntimeEstimator> estimatorConstructor  
  12.         = estimatorClass.getConstructor();  
  13.     estimator = estimatorConstructor.newInstance();  
  14.     estimator.contextualize(conf, context);  
  15.   } catch (InstantiationException ex) {  
  16.     LOG.error("Can't make a speculation runtime estimator", ex);  
  17.     throw new YarnRuntimeException(ex);  
  18.   } catch (IllegalAccessException ex) {  
  19.     LOG.error("Can't make a speculation runtime estimator", ex);  
  20.     throw new YarnRuntimeException(ex);  
  21.   } catch (InvocationTargetException ex) {  
  22.     LOG.error("Can't make a speculation runtime estimator", ex);  
  23.     throw new YarnRuntimeException(ex);  
  24.   } catch (NoSuchMethodException ex) {  
  25.     LOG.error("Can't make a speculation runtime estimator", ex);  
  26.     throw new YarnRuntimeException(ex);  
  27.   }  
  28.     
  29. return estimator;  
  30. }  

根据代码清单3可以看出推断估算器可以通过参数yarn.app.mapreduce.am.job.task.estimator.class(即MRJobConfig.MR_AM_TASK_ESTIMATOR)进行指定,如果没有指定,则默认使用LegacyTaskRuntimeEstimator。实例化LegacyTaskRuntimeEstimator后,还调用其父类StartEndTimesBase的contextualize方法(见代码清单4)进行上下文的初始化,实际就是将当前作业添加到map任务统计列表、reduce任务统计列表,并设置作业与其慢任务阈值(mapreduce.job.speculative.slowtaskthreshold)之间的映射关系。

代码清单4 StartEndTimesBase的初始化

  1. @Override  
  2. public void contextualize(Configuration conf, AppContext context) {  
  3.   this.context = context;  
  4.   Map<JobId, Job> allJobs = context.getAllJobs();  
  5.   for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {  
  6.     final Job job = entry.getValue();  
  7.     mapperStatistics.put(job, new DataStatistics());  
  8.     reducerStatistics.put(job, new DataStatistics());  
  9.     slowTaskRelativeTresholds.put  
  10.         (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));  
  11.   }  
  12. }  


创建并初始化推断估算器后,会再次调用DefaultSpeculator的最后一个构造器,实现如下:

  1. public DefaultSpeculator  
  2.     (Configuration conf, AppContext context,  
  3.      TaskRuntimeEstimator estimator, Clock clock) {  
  4.   super(DefaultSpeculator.class.getName());  
  5.   this.conf = conf;  
  6.   this.context = context;  
  7.   this.estimator = estimator;  
  8.   this.clock = clock;  
  9.   this.eventHandler = context.getEventHandler();  
  10. }  

至此,我们介绍完了DefaultSpeculator的初始化过程。

Speculator的启动

MRAppMaster在启动的时候,会调用其serviceStart方法,其中涉及启动Speculator的部分见代码清单5。

代码清单5 启动MRAppMaster时涉及Speculator的部分

  1. if (job.isUber()) {  
  2.   speculatorEventDispatcher.disableSpeculation();  
  3.   LOG.info("MRAppMaster uberizing job " + job.getID()  
  4.       + " in local container ("uber-AM") on node "  
  5.       + nmHost + ":" + nmPort + ".");  
  6. } else {  
  7.   // send init to speculator only for non-uber jobs.   
  8.   // This won't yet start as dispatcher isn't started yet.  
  9.   dispatcher.getEventHandler().handle(  
  10.       new SpeculatorEvent(job.getID(), clock.getTime()));  
  11.   LOG.info("MRAppMaster launching normal, non-uberized, multi-container "  
  12.       + "job " + job.getID() + ".");  
  13. }  

分析代码清单5,其中与任务推断有关的逻辑如下:

  • 如果采用了Uber运行模式,则会调用SpeculatorEventDispatcher的disableSpeculation方法(见代码清单6),使得任务推断失效。
  • 如果未采用Uber运行模式,则会向调度器主动发送一个SpeculatorEvent事件。此处构造SpeculatorEvent事件的代码如下;
  1. public SpeculatorEvent(JobId jobID, long timestamp) {  
  2.   super(Speculator.EventType.JOB_CREATE, timestamp);  
  3.   this.jobID = jobID;  
  4. }  
由此可见,在启动MRAppMaster的阶段,创建的SpeculatorEvent事件的类型是Speculator.EventType.JOB_CREATE。SpeculatorEventDispatcher的handle方法会被调度器执行,用以处理SpeculatorEvent事件,其代码实现见代码清单6。
代码清单6 SpeculatorEventDispatcher的实现
  1. private class SpeculatorEventDispatcher implements  
  2.     EventHandler<SpeculatorEvent> {  
  3.   private final Configuration conf;  
  4.   private volatile boolean disabled;  
  5.   public SpeculatorEventDispatcher(Configuration config) {  
  6.     this.conf = config;  
  7.   }  
  8.   @Override  
  9.   public void handle(final SpeculatorEvent event) {  
  10.     if (disabled) {  
  11.       return;  
  12.     }  
  13.     TaskId tId = event.getTaskID();  
  14.     TaskType tType = null;  
  15.     /* event's TaskId will be null if the event type is JOB_CREATE or 
  16.      * ATTEMPT_STATUS_UPDATE 
  17.      */  
  18.     if (tId != null) {  
  19.       tType = tId.getTaskType();   
  20.     }  
  21.     boolean shouldMapSpec =  
  22.             conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);  
  23.     boolean shouldReduceSpec =  
  24.             conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);  
  25.     /* The point of the following is to allow the MAP and REDUCE speculative 
  26.      * config values to be independent: 
  27.      * IF spec-exec is turned on for maps AND the task is a map task 
  28.      * OR IF spec-exec is turned on for reduces AND the task is a reduce task 
  29.      * THEN call the speculator to handle the event. 
  30.      */  
  31.     if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))  
  32.       || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {  
  33.       // Speculator IS enabled, direct the event to there.  
  34.       callWithJobClassLoader(conf, new Action<Void>() {  
  35.         public Void call(Configuration conf) {  
  36.           speculator.handle(event);  
  37.           return null;  
  38.         }  
  39.       });  
  40.     }  
  41.   }  
  42.   public void disableSpeculation() {  
  43.     disabled = true;  
  44.   }  
  45. }  

SpeculatorEventDispatcher的实现告诉我们当启用map或者reduce任务推断时,将异步调用Speculator的handle方法处理SpeculatorEvent事件。以默认的DefaultSpeculator的handle方法为例,来看看其实现,代码如下:

  1. @Override  
  2. public void handle(SpeculatorEvent event) {  
  3.   processSpeculatorEvent(event);  
  4. }  


上述代码实际代理执行了processSpeculatorEvent方法(见代码清单7)

代码清单7 DefaultSpeculator处理SpeculatorEvent事件的实现

  1. private synchronized void processSpeculatorEvent(SpeculatorEvent event) {  
  2.   switch (event.getType()) {  
  3.     case ATTEMPT_STATUS_UPDATE:  
  4.       statusUpdate(event.getReportedStatus(), event.getTimestamp());  
  5.       break;  
  6.     case TASK_CONTAINER_NEED_UPDATE:  
  7.     {  
  8.       AtomicInteger need = containerNeed(event.getTaskID());  
  9.       need.addAndGet(event.containersNeededChange());  
  10.       break;  
  11.     }  
  12.     case ATTEMPT_START:  
  13.     {  
  14.       LOG.info("ATTEMPT_START " + event.getTaskID());  
  15.       estimator.enrollAttempt  
  16.           (event.getReportedStatus(), event.getTimestamp());  
  17.       break;  
  18.     }  
  19.       
  20.     case JOB_CREATE:  
  21.     {  
  22.       LOG.info("JOB_CREATE " + event.getJobID());  
  23.       estimator.contextualize(getConfig(), context);  
  24.       break;  
  25.     }  
  26.   }  
  27. }  

当DefaultSpeculator收到类型为JOB_CREATE的SpeculatorEvent事件时会匹配执行以下代码:

  1. case JOB_CREATE:  
  2. {  
  3.   LOG.info("JOB_CREATE " + event.getJobID());  
  4.   estimator.contextualize(getConfig(), context);  
  5.   break;  
  6. }  

这里实际也调用了StartEndTimesBase的contextualize方法(见代码清单4),不再赘述。

由于DefaultSpeculator也是MRAppMaster的子组件之一,所以在启动MRAppMaster(调用MRAppMaster的serviceStart)的过程中,也会调用DefaultSpeculator的serviceStart方法(见代码清单8)启动DefaultSpeculator。

代码清单8 启动DefaultSpeculator

  1. protected void serviceStart() throws Exception {  
  2.   Runnable speculationBackgroundCore  
  3.       = new Runnable() {  
  4.           @Override  
  5.           public void run() {  
  6.             while (!stopped && !Thread.currentThread().isInterrupted()) {  
  7.               long backgroundRunStartTime = clock.getTime();  
  8.               try {  
  9.                 int speculations = computeSpeculations();  
  10.                 long mininumRecomp  
  11.                     = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE  
  12.                                        : SOONEST_RETRY_AFTER_NO_SPECULATE;  
  13.                 long wait = Math.max(mininumRecomp,  
  14.                       clock.getTime() - backgroundRunStartTime);  
  15.                 if (speculations > 0) {  
  16.                   LOG.info("We launched " + speculations  
  17.                       + " speculations.  Sleeping " + wait + " milliseconds.");  
  18.                 }  
  19.                 Object pollResult  
  20.                     = scanControl.poll(wait, TimeUnit.MILLISECONDS);  
  21.               } catch (InterruptedException e) {  
  22.                 if (!stopped) {  
  23.                   LOG.error("Background thread returning, interrupted", e);  
  24.                 }  
  25.                 return;  
  26.               }  
  27.             }  
  28.           }  
  29.         };  
  30.   speculationBackgroundThread = new Thread  
  31.       (speculationBackgroundCore, "DefaultSpeculator background processing");  
  32.   speculationBackgroundThread.start();  
  33.   super.serviceStart();  
  34. }  

启动DefaultSpeculator的主要目的是启动一个线程不断推断执行进行估算,步骤如下:

  1. 创建了匿名的实现类speculationBackgroundCore,用于在单独的线程中对推断执行进行估算。
  2. 创建Thread并启动线程。
speculationBackgroundCore中调用的computeSpeculations方法用于计算推断调度执行的map和reduce任务数量,其实现如下:
  1. private int computeSpeculations() {  
  2.   // We'll try to issue one map and one reduce speculation per job per run  
  3.   return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();  
  4. }  

computeSpeculations方法返回的结果等于maybeScheduleAMapSpeculation方法(用于推断需要重新分配Container的map任务数量)和maybeScheduleAReduceSpeculation方法(用于推断需要重新分配Container的reduce任务数量)返回值的和。maybeScheduleAMapSpeculation的实现如下:

  1. private int maybeScheduleAMapSpeculation() {  
  2.   return maybeScheduleASpeculation(TaskType.MAP);  
  3. }  
  4. private int maybeScheduleAReduceSpeculation() {  
  5.   return maybeScheduleASpeculation(TaskType.REDUCE);  
  6. }  
maybeScheduleAMapSpeculation和maybeScheduleAReduceSpeculation实际都调用了maybeScheduleASpeculation方法,其实现见代码清单9。
代码清单9 maybeScheduleASpeculation用于计算map或者reduce任务推断调度的可能性
  1. private int maybeScheduleASpeculation(TaskType type) {  
  2.   int successes = 0;  
  3.   long now = clock.getTime();  
  4.   ConcurrentMap<JobId, AtomicInteger> containerNeeds  
  5.       = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;  
  6.   for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {  
  7.     // This race conditon is okay.  If we skip a speculation attempt we  
  8.     //  should have tried because the event that lowers the number of  
  9.     //  containers needed to zero hasn't come through, it will next time.  
  10.     // Also, if we miss the fact that the number of containers needed was  
  11.     //  zero but increased due to a failure it's not too bad to launch one  
  12.     //  container prematurely.  
  13.     if (jobEntry.getValue().get() > 0) {  
  14.       continue;  
  15.     }  
  16.     int numberSpeculationsAlready = 0;  
  17.     int numberRunningTasks = 0;  
  18.     // loop through the tasks of the kind  
  19.     Job job = context.getJob(jobEntry.getKey());  
  20.     Map<TaskId, Task> tasks = job.getTasks(type);  
  21.     int numberAllowedSpeculativeTasks  
  22.         = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,  
  23.                          PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());  
  24.     TaskId bestTaskID = null;  
  25.     long bestSpeculationValue = -1L;  
  26.     // this loop is potentially pricey.  
  27.     // TODO track the tasks that are potentially worth looking at  
  28.     for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {  
  29.       long mySpeculationValue = speculationValue(taskEntry.getKey(), now);  
  30.       if (mySpeculationValue == ALREADY_SPECULATING) {  
  31.         ++numberSpeculationsAlready;  
  32.       }  
  33.       if (mySpeculationValue != NOT_RUNNING) {  
  34.         ++numberRunningTasks;  
  35.       }  
  36.       if (mySpeculationValue > bestSpeculationValue) {  
  37.         bestTaskID = taskEntry.getKey();  
  38.         bestSpeculationValue = mySpeculationValue;  
  39.       }  
  40.     }  
  41.     numberAllowedSpeculativeTasks  
  42.         = (int) Math.max(numberAllowedSpeculativeTasks,  
  43.                          PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);  
  44.     // If we found a speculation target, fire it off  
  45.     if (bestTaskID != null  
  46.         && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {  
  47.       addSpeculativeAttempt(bestTaskID);  
  48.       ++successes;  
  49.     }  
  50.   }  
  51.   return successes;  
  52. }  
maybeScheduleASpeculation方法首先根据当前Task的类型(map或reduce)获取相应类型任务的需要分配Container数量的缓存containerNeeds,然后遍历containerNeeds。

遍历containerNeeds的执行步骤如下:

  1. 如果当前Job依然有未分配Container的Task,那么跳过当前循环,继续下一次循环。这说明如果当前Job的某一类型的Task依然存在未分配Container的,则不会进行任务推断;
  2. 从当前应用的上下文AppContext中获取Job,并获取此Job的所有的Task(map或者reduce);
  3. 计算允许执行推断的Task数量numberAllowedSpeculativeTasks(map或者reduce)。其中MINIMUM_ALLOWED_SPECULATIVE_TASKS的值是10,PROPORTION_TOTAL_TASKS_SPECULATABLE的值是0.01。numberAllowedSpeculativeTasks取MINIMUM_ALLOWED_SPECULATIVE_TASKS与PROPORTION_TOTAL_TASKS_SPECULATABLE*任务数量之积之间的最大值。因此我们知道,当Job的某一类型(map或者reduce)的Task的数量小于1100时,计算得到的numberAllowedSpeculativeTasks等于10,如果Job的某一类型(map或者reduce)的Task的数量大于等于1100时,numberAllowedSpeculativeTasks才会大于10。numberAllowedSpeculativeTasks变量可以有效防止大量任务同时启动备份任务所造成的资源浪费。
  4. 遍历Job对应的map任务或者reduce任务集合,调用speculationValue方法获取每一个Task的推断值。并在迭代完所有的map任务或者reduce任务后,获取这一任务集合中的推断值bestSpeculationValue最大的任务ID。
  5. 再次计算numberAllowedSpeculativeTasks,其中PROPORTION_RUNNING_TASKS_SPECULATABLE的值等于0.1,numberRunningTasks是处于运行中的Task。numberAllowedSpeculativeTasks取numberAllowedSpeculativeTasks与PROPORTION_RUNNING_TASKS_SPECULATABLE*numberRunningTasks之积之间的最大值。因此我们知道当Job的某一类型(map或者reduce)的正在运行中的Task的数量小于110时(假设第3步得到的numberAllowedSpeculativeTasks等于10),计算得到的numberAllowedSpeculativeTasks等于10,如果Job的某一类型(map或者reduce)的正在运行中的Task的数量大于等于110时,numberAllowedSpeculativeTasks才会大于10。
  6. 如果numberAllowedSpeculativeTasks大于numberSpeculationsAlready(已经推断执行过的Task数量),则调用addSpeculativeAttempt方法(见代码清单10)将第4步中选出的任务的任务ID添加到推断尝试中。
代码清单10 添加推断执行的尝试
  1. //Add attempt to a given Task.  
  2. protected void addSpeculativeAttempt(TaskId taskID) {  
  3.   LOG.info  
  4.       ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);  
  5.   eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));  
  6.   mayHaveSpeculated.add(taskID);  
  7. }  

根据代码清单10,我们看到推断执行尝试是通过发送类型为TaskEventType.T_ADD_SPEC_ATTEMPT的TaskEvent事件完成的。

估算任务的推断值

在分析代码清单9时,我故意跳过了speculationValue方法的分析。speculationValue方法(见代码清单11)主要用于估算每个任务的推断值。

代码清单11 估算任务的推断值

  1. private long speculationValue(TaskId taskID, long now) {  
  2.   Job job = context.getJob(taskID.getJobId());  
  3.   Task task = job.getTask(taskID);  
  4.   Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();  
  5.   long acceptableRuntime = Long.MIN_VALUE;  
  6.   long result = Long.MIN_VALUE;  
  7.   if (!mayHaveSpeculated.contains(taskID)) {  
  8.     acceptableRuntime = estimator.thresholdRuntime(taskID);  
  9.     if (acceptableRuntime == Long.MAX_VALUE) {  
  10.       return ON_SCHEDULE;  
  11.     }  
  12.   }  
  13.   TaskAttemptId runningTaskAttemptID = null;  
  14.   int numberRunningAttempts = 0;  
  15.   for (TaskAttempt taskAttempt : attempts.values()) {  
  16.     if (taskAttempt.getState() == TaskAttemptState.RUNNING  
  17.         || taskAttempt.getState() == TaskAttemptState.STARTING) {  
  18.       if (++numberRunningAttempts > 1) {  
  19.         return ALREADY_SPECULATING;  
  20.       }  
  21.       runningTaskAttemptID = taskAttempt.getID();  
  22.       long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);  
  23.       long taskAttemptStartTime  
  24.           = estimator.attemptEnrolledTime(runningTaskAttemptID);  
  25.       if (taskAttemptStartTime > now) {  
  26.         // This background process ran before we could process the task  
  27.         //  attempt status change that chronicles the attempt start  
  28.         return TOO_NEW;  
  29.       }  
  30.       long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;  
  31.       long estimatedReplacementEndTime  
  32.           = now + estimator.estimatedNewAttemptRuntime(taskID);  
  33.       float progress = taskAttempt.getProgress();  
  34.       TaskAttemptHistoryStatistics data =  
  35.           runningTaskAttemptStatistics.get(runningTaskAttemptID);  
  36.       if (data == null) {  
  37.         runningTaskAttemptStatistics.put(runningTaskAttemptID,  
  38.           new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));  
  39.       } else {  
  40.         if (estimatedRunTime == data.getEstimatedRunTime()  
  41.             && progress == data.getProgress()) {  
  42.           // Previous stats are same as same stats  
  43.           if (data.notHeartbeatedInAWhile(now)) {  
  44.             // Stats have stagnated for a while, simulate heart-beat.  
  45.             TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();  
  46.             taskAttemptStatus.id = runningTaskAttemptID;  
  47.             taskAttemptStatus.progress = progress;  
  48.             taskAttemptStatus.taskState = taskAttempt.getState();  
  49.             // Now simulate the heart-beat  
  50.             handleAttempt(taskAttemptStatus);  
  51.           }  
  52.         } else {  
  53.           // Stats have changed - update our data structure  
  54.           data.setEstimatedRunTime(estimatedRunTime);  
  55.           data.setProgress(progress);  
  56.           data.resetHeartBeatTime(now);  
  57.         }  
  58.       }  
  59.       if (estimatedEndTime < now) {  
  60.         return PROGRESS_IS_GOOD;  
  61.       }  
  62.       if (estimatedReplacementEndTime >= estimatedEndTime) {  
  63.         return TOO_LATE_TO_SPECULATE;  
  64.       }  
  65.       result = estimatedEndTime - estimatedReplacementEndTime;  
  66.     }  
  67.   }  
  68.   // If we are here, there's at most one task attempt.  
  69.   if (numberRunningAttempts == 0) {  
  70.     return NOT_RUNNING;  
  71.   }  
  72.   if (acceptableRuntime == Long.MIN_VALUE) {  
  73.     acceptableRuntime = estimator.thresholdRuntime(taskID);  
  74.     if (acceptableRuntime == Long.MAX_VALUE) {  
  75.       return ON_SCHEDULE;  
  76.     }  
  77.   }  
  78.   return result;  
  79. }  

speculationValue方法的执行步骤如下:

  1. 如果任务还没有被推断执行,那么调用estimator的thresholdRuntime方法获取任务可以接受的运行时长acceptableRuntime。如果acceptableRuntime等于Long.MAX_VALUE,则将ON_SCHEDULE作为返回值,ON_SCHEDULE的值是Long.MIN_VALUE,以此表示当前任务的推断值很小,即被推断尝试的可能最小。
  2. 如果任务的运行实例数大于1,则说明此任务已经发生了推断执行,因此返回ALREADY_SPECULATING。ALREADY_SPECULATING等于Long.MIN_VALUE + 1。
  3. 调用estimator的estimatedRuntime方法获取任务运行实例的估算运行时长estimatedRunTime。
  4. 调用estimator的attemptEnrolledTime方法获取任务实例开始运行的时间,此时间即为startTimes中缓存的start。这个值是在任务实例启动时导致DefaultSpeculator的processSpeculatorEvent方法处理Speculator.EventType.ATTEMPT_START类型的SpeculatorEvent事件时保存的。
  5. estimatedEndTime表示估算任务实例的运行结束时间,estimatedEndTime = estimatedRunTime + taskAttemptStartTime。
  6. 调用estimator的estimatedNewAttemptRuntime方法估算如果此时重新为任务启动一个实例,此实例运行结束的时间estimatedReplacementEndTime。
  7. 如果缓存中没有任务实例的历史统计信息,那么将estimatedRunTime、任务实例进度progress,当前时间封装为历史统计信息缓存起来。
  8. 如果缓存中存在任务实例的历史统计信息,如果缓存的estimatedRunTime和本次估算的estimatedRunTime一样并且缓存的实例进度progress和本次获取的任务实例进度progress一样,说明有一段时间没有收到心跳了,则模拟一次心跳。如果缓存的estimatedRunTime和本次估算的estimatedRunTime不一样或者缓存的实例进度progress和本次获取的任务实例进度progress不一样,那么将estimatedRunTime、任务实例进度progress,当前时间更新到任务实例的历史统计信息中。
  9. 如果estimatedEndTime小于当前时间,则说明任务实例的进度良好,返回PROGRESS_IS_GOOD,PROGRESS_IS_GOOD等于Long.MIN_VALUE + 3。
  10. 如果estimatedReplacementEndTime大于等于estimatedEndTime,则说明即便启动备份任务实例也无济于事,因为它的结束时间达不到节省作业总运行时长的作用。
  11. 计算本次估算的结果值result,它等于estimatedEndTime - estimatedReplacementEndTime,当这个差值越大表示备份任务实例运行后比原任务实例的结束时间就越早,因此调度执行的价值越大。
  12. 如果numberRunningAttempts等于0,则表示当前任务还没有启动任务实例,返回NOT_RUNNING,NOT_RUNNING等于Long.MIN_VALUE + 4。
  13. 重新计算acceptableRuntime,处理方式与第1步相同。
  14. 返回result。

总结

根据源码分析,我们知道DefaultSpeculator启动的线程会不时去计算作业的各个任务的推断值,即speculationValue方法计算的结果。从所有任务的推断值中选择值最大,也就是说价值最高的,为其配备一个备份任务。这里有个问题,Estimator推算用的各种统计和监控数据是从哪里来的呢?请看《Hadoop2.6.0运行mapreduce之推断(speculative)执行(下)》。
 作者博客:http://blog.csdn.net/beliefer/article/details/51249119