sck940210 发表于 2015-10-28 21:26

你不知道的Spark,将来会替代Hadoop?

获奖详情:
http://bbs.chinaunix.net/thread-4203439-1-1.html

随着大数据概念的普及以及大数据技术的逐渐成熟,越来越多来自不同领域的大小企业开始拥抱大数据。大数据时代真正即将到来。而选择大数据将遭遇的第一个问题就是技术选型。面对由Hadoop、Spark、Hive、HBase、Storm、Kafka、Flume等等众多开源工具构成的生态系统,技术人员往往感到迷茫。
Spark作为数据处理技术中的新贵,因其性能高、开发效率高、高容错等优点越来越受到技术人员的关注,而且Spark为批处理(Spark Core)、交互式(Spark SQL)、流式处理(Spark Streaming)、机器学习(MLlib)、图计算(GraphX)提供了一个统一的数据处理平台。但是Spark也存在一些缺点,例如还不够稳定、任务调度还不够完善、scala语言学习成本较高等等。关于Spark,你有什么看法呢?


讨论话题:(欢迎大家贴图讨论,能用代码讲话的就别用汉字了:em02:)
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
2. 有人说Spark将来会替代Hadoop,你又怎么看?

讨论时间:2015年10月29日——11月29日


奖品设置:
活动结束后,我们将会选取4位讨论精彩的兄弟,送《Spark快速大数据分析》图书一本。


作者:[美] Holden Karau Andy Konwinski
      Patrick Wendell
   [加] Matei Zaharia   
译者:王道远
出版时间:2015年 9 月第 1 版


内 容 提 要:
  本书由Spark开发者及核心成员共同打造,讲解了网络大数据时代应运而生的、能高效迅捷地分析处理数据的工具——Spark,它带领读者快速掌握用Spark收集、计算、简化和保存海量数据的方法,学会交互、迭代和增量式分析,解决分区、数据本地化和自定义序列化等问题。

试读样章:




yybmsrs 发表于 2015-10-29 18:58

本帖最后由 yybmsrs 于 2015-10-29 18:59 编辑

1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
差不多,shuffle的时候中间数据放到内存,放不下才写文件。同时spark的抽象程度更高,不仅仅只有map和reduce。

2. 有人说Spark将来会替代Hadoop,你又怎么看?
spark是个生态系统,hadoop也是一个生态系统,不存在谁取代谁,只能说spark会取代mr。其他如hdfs,yarn都是
通用的。


另外,这两个问题实在是没什么可以多说的,虽然我很想要这本书:em16: :em16: :em16:

wenhq 发表于 2015-10-29 19:14

1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
这个我赞同,因为spark属于那种内存计算型的架构,是对mapreduce不足的改进。
2. 有人说Spark将来会替代Hadoop,你又怎么看?
我觉得不会,Spark框架的底层存储可以选用HDFS,也可以用其他的。但是Spark 运行的模式里有Standalone,Yarn,Mesos。 其中Yarn也是Hadoop的组件。
而且Hadoop组件很多。不是那么随便可以替代的。。

demilich 发表于 2015-10-30 09:05

1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
- 基本同意
- Hadoop,主要用于离线大数据分析;那么Spark主要用于准实时性的大数据分析;而Storm用于实时大数据分析

2. 有人说Spark将来会替代Hadoop,你又怎么看?
- 目前看起来不会,Hadoop是一个体系,一个生态系统,里面包含了很多子系统。而且两者的基础也不完全一样,想完全替代,目前看不出这个趋势 ..

heguangwu 发表于 2015-10-30 10:04

本帖最后由 heguangwu 于 2015-10-30 15:44 编辑

1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
    Spark技术来自超算,主要的核心思想是RDD,通过记录每一个步骤之间的计算关系而不是像MapReduce那样保存实际的数据来实现故障恢复,所以Spark的故障恢复其实是要重算的,其计算过程确实有点像MapReduce,MapReduce在经过一个阶段的MapReduce后会将数据保存到HDFS,相当于做了一次checkpoint,这其实是一个很大的消耗,也是性能的一大瓶颈,但好处是恢复起来非常快,Spark也借鉴了这种思想,中间其实也可以做持久化的RDD,等同于checkpoint,但多数情况下不需要这个步骤,后面可以直接继续跟计算操作,这样看Spark无论从容灾还是计算模型都不是MapReduce,只是借鉴了MapReduce很多思想,所以不能说Spark仅仅是内存版的MapReduce。在改进MapReduce模型上,而Hadoop的其它计算框架如Tez/impala是改进mapreduce的阶段划分的单一性(只有map和reduce),将计算拓扑图做成DAG的方式(这个本质是是借鉴了并行数据库的思想),虽然故障恢复很麻烦,但速度快了不少。我们看一下spark的协议栈,从下图可以看出,spark其实不仅仅是一种简单的计算模型,而是一个通用计算平台,上面开发各种库可以实现不同的技术模型,如流计算、机器学习

   而很久之前MapReduce也想做一个通用的计算平台(事实上也有很多人在上面做了一些如机器学习等),但其计算模型的简单低效不适合做成通用平台。从另外一个方面来说,spark也是mapreduce的一种重要补充,而不是简单的替代,毕竟现在内存资源还是比较宝贵的,MapReduce消耗的磁盘资源相对而言廉价多了,而且有些计算并不那么着急,5个小时计算出来和10个小时计算出来结果影响不大这种就适合MR


2. 有人说Spark将来会替代Hadoop,你又怎么看?
   其实我上面也讲述了部分我的观点,现在不是谁替换谁,而是spark是mapreduce的一种重要补充,最后的情况不是one size fit all,而是不同场景会采用不同的计算模型,如要求低时延的会采用storm,对吞吐量要求高的会采用spark stream,重要的实时计算采用spark,而中小数据交互式计算可能会采用Tez,超大规模的非实时的计算还是会采用MapReduce,正如下面这个图一样

      最后贴一个hadoop ecosystem的介绍的ppt,虽然是有点老,但思路非常清晰,个人也比较认同这个观点:

nail78 发表于 2015-10-30 10:27

1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
    这样说有点简单,spark提供了集群的分布式内存抽象,也就是所说的RDD,spark提供了RDD的两类操作,转换和动作,转换包括map,flatMap,filter,union,sample,join,groupByKey,reduceByKey,sortByKey等等,动作是返回结果,包括collect,reduce,count等,抽象层次更高,功能更多,调用更灵活。所处理的数据都是放在内存中,速度更快。
    mapreduce则抽象层次比较低,只有map,reduce两个基本功能。

2. 有人说Spark将来会替代Hadoop,你又怎么看?
    hadoop是一个生态系统,主要包括HDFS,mapredeuce,适合处理海量离线数据,他的分布是基于磁盘和IO的。
    spark的分布处理是基于内存的,速度更快。
    spark的出现,解决方案又多了一种选择,spark是可以架在hadoop和yarn上的,hadoop的生态中有很多部分,spark可以替代hadoop的一些功能,二者是可并存的。

jieforest 发表于 2015-10-30 12:58

不错,支持

renxiao2003 发表于 2015-10-30 16:09

新东西太多了。

hiyachen 发表于 2015-10-30 20:52

1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
    (1)spark与以MapReduce为出发点的hadoop有几乎同样的效果,绝不是同样的处理方式。以下简单介绍不同之处。   
    Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
   

   (2) 对于spark的部署,请参见我的博文:http://blog.chinaunix.net/uid-7374279-id-5200921.html
以下是部分介绍:
先从比较简单的说起,所谓的没有ha是指master节点没有ha。

组成cluster的两大元素即Master和Worker。slave worker可以有1到多个,这些worker都处于active状态。

Driver Application可以运行在Cluster之内,也可以在cluster之外运行,先从简单的讲起即Driver Application独立于Cluster。那么这样的整体框架如下图所示,由driver,master和多个slave worker来共同组成整个的运行环境。

执行顺序
步骤1 运行master

$SPARK_HOME/sbin/start_master.sh

在 start_master.sh 中最关键的一句就是

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

检测Master的jvm进程

root 23438 1 67 22:57 pts/0 00:00:05 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080

Master的日志在$SPARK_HOME/logs目录下
步骤2 运行worker,可以启动多个

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

worker运行时,需要注册到指定的master url,这里就是spark://localhost:7077.

Master侧收到RegisterWorker通知,其处理代码如下

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
      workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) {
      sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
      val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerUiPort, publicAddress) if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
      } else {
          val workerAddress = worker.actor.path.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress)
          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
      }
      }
    }

步骤3 运行Spark-shell

MASTER=spark://localhost:7077 $SPARK_HOME/bin/spark-shell

spark-shell属于application,有关appliation的运行日志存储在 $SPARK_HOME/works 目录下

spark-shell作为application,在Master侧其处理的分支是RegisterApplication,具体处理代码如下。

case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else {
      logInfo("Registering app " + description.name)
      val app = createApplication(description, sender)
      registerApplication(app)
      logInfo("Registered app " + description.name + " with ID " + app.id)
      persistenceEngine.addApplication(app)
      sender ! RegisteredApplication(app.id, masterUrl)
      schedule()
      }
    }

每当有新的application注册到master,master都要调度schedule函数将application发送到相应的 worker,在对应的worker启动相应的ExecutorBackend. 具体代码请参考Master.scala中的schedule函数,代码就不再列出。
步骤4 结果检测

/opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080 root 23752 23745 21 23:00 pts/0 00:00:25 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.repl.Main
root 23986 23938 25 23:02 pts/2 00:00:03 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077 root 24047 23986 34 23:02 pts/2 00:00:04 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler 0 localhost 4 akka.tcp://sparkWorker@localhost:53568/user/Worker app-20140511230059-0000

从运行的进程之间的关系可以看出,worker和master之间的连接建立完毕之后,如果有新的driver application连接上master,master会要求worker启动相应的ExecutorBackend进程。此后若有什么Task需要运 行,则会运行在这些Executor之上。可以从以下的日志信息得出此结论,当然看源码亦可。

14/05/11 23:02:36 INFO Worker: Asked to launch executor app-20140511230059-0000/0 for Spark shell 14/05/11 23:02:36 INFO ExecutorRunner: Launch command: "/opt/java/bin/java" "-cp" ":/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler" "0" "localhost" "4" "akka.tcp://sparkWorker@localhost:53568/user/Worker" "app-20140511230059-0000"

worker中启动exectuor的相关源码见worker中的receive函数,相关代码如下

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) {
      logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else { try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
          }
      } catch { case e: Exception => {
            logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name)) if (executors.contains(appId + "/" + execId)) {
            executors(appId + "/" + execId).kill()
            executors -= appId + "/" + execId
            }
            masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
            }
          }
      }
      }

关于standalone的部署,需要详细研究的源码文件如下所列。

    deploy/master/Master.scala
    deploy/worker/worker.scala
    executor/CoarseGrainedExecutorBackend.scala

查看进程之间的父子关系,请用 "pstree"

使用下图来小结单Master的部署情况。

类的动态加载和反射

在谈部署Driver到Cluster上之前,我们先回顾一下java的一大特性“类的动态加载和反射机制”。本人不是一直写java代码出身,所以好多东西都是边用边学,难免挂一漏万。

所谓的反射,其实就是要解决在运行期实现类的动态加载。

来个简单的例子

package test; public class Demo {public Demo() {   System.out.println("Hi!");}@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {   Class clazz = Class.forName("test.Demo");   Demo demo = (Demo) clazz.newInstance();}
}

谈到这里,就自然想到了一个面试题,“谈一谈Class.forName和ClassLoader.loadClass的区别"。说到面试,我总是很没有信心,面试官都很屌的, :)。
在cluster中运行Driver Application

上一节之所以写到类的动态加载与反射都是为了谈这一节的内容奠定基础。

将Driver application部署到Cluster中,启动的时序大体如下图所示。

   首先启动Master,然后启动Worker
    使用”deploy.Client"将Driver Application提交到Cluster中

./bin/spark-class org.apache.spark.deploy.Client launch \
      \
   

    Master在收到RegisterDriver的请求之后,会发送LaunchDriver给worker,要求worker启动一个Driver的jvm process
    Driver Application在新生成的JVM进程中运行开始时会注册到master中,发送RegisterApplication给Master
    Master发送LaunchExecutor给Worker,要求Worker启动执行ExecutorBackend的JVM Process
    一当ExecutorBackend启动完毕,Driver Application就可以将任务提交到ExecutorBackend上面执行,即LaunchTask指令

提交侧的代码,详见deploy/Client.scala

driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would //       truncate filesystem paths similar to what YARN does. For now, we just require //       people call `addJar` assuming the jar is in the same directory. val env = Map()
      System.getenv().foreach{case (k, v) => env(k) = v}

      val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
      }

      val libraryPathConf = "spark.driver.extraLibraryPath" val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
      }

      val javaOptionsConf = "spark.driver.extraJavaOptions" val javaOpts = sys.props.get(javaOptionsConf)
      val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
          driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)

      val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)

      masterActor ! RequestSubmitDriver(driverDescription)

接收侧

从Deploy.client发送出来的消息被谁接收呢?答案比较明显,那就是Master。 Master.scala中的receive函数有专门针对RequestSubmitDriver的处理,具体代码如下

case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) {
      val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." sender ! SubmitDriverResponse(false, None, msg)
      } else {
      logInfo("Driver submitted " + description.command.mainClass)
      val driver = createDriver(description)
      persistenceEngine.addDriver(driver)
      waitingDrivers += driver
      drivers.add(driver)
      schedule() // TODO: It might be good to instead have the submission client poll the master to determine //       the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}")
      }
    }

SparkEnv

SparkEnv对于整个Spark的任务来说非常关键,不同的role在创建SparkEnv时传入的参数是不相同的,如Driver和Executor则存在重要区别。

在Executor.scala中,创建SparkEnv的代码如下所示

private val env = { if (!isLocal) {
      val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
      isDriver = false, isLocal = false)
      SparkEnv.set(_env)
      _env.metricsSystem.registerSource(executorSource)
      _env
    } else {
      SparkEnv.get }
}

Driver Application则会创建SparkContext,在SparkContext创建过程中,比较重要的一步就是生成SparkEnv,其代码如下

private val env = SparkEnv.create(conf,"",conf.get("spark.driver.host"),conf.get("spark.driver.port").toInt,isDriver = true,isLocal = isLocal,listenerBus = listenerBus)
SparkEnv.set(env)

Standalone模式下HA的实现

Spark在standalone模式下利用zookeeper来实现了HA机制,这里所说的HA是专门针对Master节点的,因为上面所有的分析可以看出Master是整个cluster中唯一可能出现单点失效的节点。

采用zookeeper之后,整个cluster的组成如下图所示。

为了使用zookeeper,Master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下选项。
System property         Meaning
spark.deploy.recoveryMode         Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url         The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir         The directory in ZooKeeper to store recovery state (default: /spark).
实现HA的原理
zookeeper提供了一个Leader Election机制,利用这个机制,可以实现HA功能,具体请参考 zookeeper recipes
在Spark中没有直接使用zookeeper的api,而是使用了 curator ,curator对zookeeper做了相应的封装,在使用上更为友好。
小结
步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是YARN要解决的问题。对了,从本质上来说standalone是yarn的一个 简化版本。
      

2. 有人说Spark将来会替代Hadoop,你又怎么看?
    未来会的,但现在还早。hadoop有些笨重,但历史悠久。许多企业投入精力不会轻易放弃的。
   

hiyachen 发表于 2015-10-30 20:58

接上篇:

Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合,如pageRank、K-Means等算法就非常适合内存迭代计算。Spark整个生态体系正逐渐完善中,GraphX 、 SparkSQL、 SparkStreaming 、 MLlib,等到Spark有了自己的数据仓库后,那就完全能与Hadoop生态体系相媲美。
Spark框架采用函数式编程语言Scala,Scala语言的面向对象、函数式、高并发模型等特点,使得Spark拥有了更高的灵活性及性能。如果你学过java,可能会对scala中的一些新概念表示陌生,如隐式转换、模式匹配、伴生类等,但一旦入门,你会感觉scala语言的简洁与强大。
Spark暂时不会取代Hadoop替代者有以下原因:
两者的侧重点不同,使用场景不同。Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的概念。RDD可以cache到内存中,那么每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。但是,我们也要看到spark的限制:内存。我认为Hadoop虽然费时,但是在OLAP等大规模数据的应用场景,还是受欢迎的。目前Hadoop涵盖了从数据收集、到分布式存储,再到分布式计算的各个领域,在各领域都有自己独特优势。
作为一种内存的迭代计算框架,Spark适用以下场景:
适用于迭代次数比较多的场景。迭代次数多的机器学习算法等。如pageRank、K-Means等。
Spark On Mesos环境
目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序
粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。
细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
页: [1] 2 3
查看完整版本: 你不知道的Spark,将来会替代Hadoop?