Master在收到RegisterDriver的请求之后,会发送LaunchDriver给worker,要求worker启动一个Driver的jvm process
Driver Application在新生成的JVM进程中运行开始时会注册到master中,发送RegisterApplication给Master
Master发送LaunchExecutor给Worker,要求Worker启动执行ExecutorBackend的JVM Process
一当ExecutorBackend启动完毕,Driver Application就可以将任务提交到ExecutorBackend上面执行,即LaunchTask指令
提交侧的代码,详见deploy/Client.scala
driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. val env = Map[String, String]()
System.getenv().foreach{case (k, v) => env(k) = v}
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath" val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val javaOptionsConf = "spark.driver.extraJavaOptions" val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) {
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"
}
}
为了使用zookeeper,Master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下选项。
System property Meaning
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).
实现HA的原理
zookeeper提供了一个Leader Election机制,利用这个机制,可以实现HA功能,具体请参考 zookeeper recipes
在Spark中没有直接使用zookeeper的api,而是使用了 curator ,curator对zookeeper做了相应的封装,在使用上更为友好。
小结
步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是YARN要解决的问题。对了,从本质上来说standalone是yarn的一个 简化版本。
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
Spark is:
1). fast/快速
2). a general processing engine/处理引擎
3). compatible with hadoop data/与hadoop格式数据兼容,也就是说兼容hadoop生态系统的产品,如hive,hbase等
4). designed to perform both batch processing(similar to mapreduce) and new workloads(streaming, interactive queries and machine learning)/批处理(与mapreduce相似),流处理,交互式查询和机器学习。
以上从faq中第一个答案中抽出来的,大致就可以了解spark是个什么玩意,可以做些什么事情了。
关于Spark这里有篇论文,3w.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (发不了链接,把3w换成www)
看到Introduction第二段第一、二句:
Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they lack abstractions for leveraging distributed memory. This makes them inefficient for an important class of emerging applications: those that reuse intermediate
results across multiple computations.
随便翻译一下,中文组织能力不是很强,见笑
尽管当前框架提供了大量访问集群计算资源的抽象层(接口),但却缺乏利用分布式内存的抽象层(接口)。对于某些类型的应用却不是那么高效,这些应用需要重复利用分布式计算的中间值。
可见spark解决hadoop的痛点是,hadoop的计算数据都是要存hdfs中的,也就是要写到硬盘中,而有些应用要重复利用这些数据,那么要重复从硬盘读取速度自然会比较慢。spark数据模型时rdd,论文中将得非常详细,这里就不瞎扯了。