Spark:2、Spark 集群部署

集群角色

从物理部署层面上来看,Spark主要分为两种类型的节点,Master节点和Worker节点,Master节点主要运行集群管理器的中心化部分,所承载的作用是分配Application到Worker节点,维护Worker节点,Driver,Application的状态。Worker节点负责具体的业务运行。

从Spark程序运行的层面来看,Spark主要分为驱动器节点和执行器节点。

Spark的部署模式有Local、Local-Cluster、Standalone、Yarn、Mesos。

安装部署Standalone模式

1) 解压spark

$ tar -zxvf /opt/software/installations/spark-2.2.0-bin-hadoop2.6.0.tgz -C /opt/software/

2) 进入spark安装目录下的conf目录中,重命名“.template”结尾的文件

$ mv docker.properties.template docker.properties
$ mv log4j.properties.template log4j.properties
$ mv metrics.properties.template metrics.properties
$ mv slaves.template slaves
$ mv spark-defaults.conf.template spark-defaults.conf
$ mv spark-env.sh.template spark-env.sh

3) 修改slaves

hadoop001
hadoop002
hadoop003

4) 修改spark-default.conf,用于配置Job History Server

spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop001:9000/directory
spark.eventLog.compress          true

5) 修改spark-env.sh

SPARK_MASTER_HOST= hadoop001
SPARK_MASTER_PORT=7077

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000
-Dspark.history.retainedApplications=3
-Dspark.history.fs.logDirectory=hdfs://hadoop001:9000/directory"

6) 分发配置好的spark安装包

$ scp -r spark-2.2.0-bin-hadoop2.6.0/ hadoop002:/opt/software/
$ scp -r spark-2.2.0-bin-hadoop2.6.0/ hadoop003:/opt/software/
$ sbin/start-all.sh

7) 启动spark(启动spark之前,确保HDFS已经启动)

$ bin/spark-shell \
--master spark://hadoop001:7077 \
--executor-memory 2g \
--total-executor-cores 2

Spark的高可用

Spark-Master的高可用,需要借助于Zookeeper,所以先确保Zookeeper启动完成。

高可用架构图:

1) 修改spark-env.sh文件(注释掉之前的:SPARK_MASTER_HOST=hadoop001)

# SPARK_MASTER_HOST=hadoop001
SPARK_MASTER_PORT=7077

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000
-Dspark.history.retainedApplications=3
-Dspark.history.fs.logDirectory=hdfs://hadoop001:9000/directory"

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop001:2181,hadoop002:2181,hadoop003:2181 -Dspark.deploy.zookeeper.dir=/spark"

2) 分发配置

$ scp -r spark-2.2.0-bin-hadoop2.6.0/conf hadoop002:/opt/software/spark-2.2.0-bin-hadoop2.6.0/
$ scp -r spark-2.2.0-bin-hadoop2.6.0/conf hadoop003:/opt/software/spark-2.2.0-bin-hadoop2.6.0

3) 按照如下规则执行启动脚本

第一台master机器

$ sbin/start-all.sh

第二台master机器

$ sbin/start-master.sh

4) client连接高可用的spark集群

$ bin/spark-shell –master spark://hadoop001:7077,hadoop002:7077

安装部署Yarn模式

1) 修改yarn-site.xml

<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

2) 修改spark-evn.sh

HADOOP_CONF_DIR=/opt/software/hadoop-2.6.0/etc/hadoop
YARN_CONF_DIR=/opt/software/hadoop-2.6.0/etc/hadoop

运行Spark任务

Standalone上运行

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop001:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/opt/software/spark-2.2.0-bin-hadoop2.6.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

参数说明

--master spark://hadoop001:7077 指定Master的地址
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个
--jars 添加任务所需的其他依赖

Yarn上运行

/opt/software/spark-2.2.0-bin-hadoop2.6.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/software/spark-2.2.0-bin-hadoop2.6.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

应用提交的方式

1) 打包完成后,可以使用bin/spark-submit脚本来提交应用,格式如下

bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

2) 解释

其中–master可以有如下URL形式

Spark-Shell交互式编程

启动Spark-Shell

bin/spark-shell \
--master spark://hadoop001:7077 \
--executor-memory 2g \
--total-executor-cores 2

如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。


Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可。

运行WordCount程序

随意创建一个单词本,例如words.txt,然后上传至HDFS中,使用spark进行操作。

举例

sc.textFile("hdfs://hadoop001:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop001:9000/output")

Spark概念总结

每个Spark应用都由一个驱动器程序(driver program)来发起集群上的各种 并行操作。驱动器程序包含应用的 main 函数,并且定义了集群上的分布式数据集,还对这 些分布式数据集应用了相关操作。

驱动器程序通过一个 SparkContext 对象来访问 Spark。这个对象代表对计算集群的一个连 接。shell 启动时已经自动创建了一个 SparkContext 对象,是一个叫作 sc 的变量。

驱动器程序一般要管理多个执行器(executor)节点。

您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------