Chap 3 Spark计算模型
Spark 的编程范型在处理大数据时显得简单有效,<key, value> 的数据处理与传输模式也大获全胜。
Spark 依靠 Scala 的
函数式编程
Actor 通信模式
闭包
容器
泛型
借助统一资源分配调度框架 Mesos, 融合 MapReduce 和 Dryad 而产生
产生高效的大数据分布式处理框架
spark 尤其适合 迭代型 和 交互型 任务。
3.1 Spark 程序模型
了解 Spark 计算模型, by示例
1). SparkContext 中的 textFile函数 从 HDFS 读取 日志文件,输出变量file。
val file=sc.textFile("hdfs://xxx")
2). RDD 中的 filter函数 过滤带 “ERROR” 的行,输出 errors(errors也是一个RDD)。
val errors=file.filter(line=>line.contains("ERROR")
3). RDD 的 count函数 返回 “ERROR” 的行数:errors.count()。
RDD操作起来 与 Scala集合 类型没有太大差别,这就是 Spark追求的目标:像编写单机程序一样编写分布式程序,但它们的数据和运行模型有很大的不同,用户需要具备更强的系统把控能力和分布式系统知识。
从RDD的转换和存储角度看这个过程 :
用户程序对RDD通过多个函数进行操作,将RDD进行转换。
Block-Manager 管理 RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘。
RDD 中的 partition是一个逻辑数据块,对应相应的 物理块Block。
RDD 在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD之前的依赖转换关系。
3.2 RDD
3.2.1 RDD 简介
resilient distributed dataset : 弹性分布式数据集 -- RDD
分布式数据结构--RDD, 逻辑集中的实体,在集群中的多台机器上进行了数据分区。通过对多台机器上不同RDD分区的控制,就能够减少机器之间的数据重排(data shuffling)。
Spark提供了“partitionBy”运算符,能够通过集群中多台机器之间对 原始RDD 进行数据再分配来创建一个新的RDD。RDD是Spark的核心数据结构,通过RDD的依赖关系形成Spark的调度顺序。通过对RDD的操作形成整个Spark程序。
RDD 3 种创建方式
从外部存储创建RDD; -- 从 Hadoop文件系统 (Hadoop、Hive、Hbase)输入(如HDFS)创建。
从集合中创建RD; -- Spark主要提供了两中函数:parallelize 和 makeRDD。
从其他RDD创建; -- 从 父RDD 转换得到 新RDD / 更改 RDD 的 persistence。(例如 cache() 函数 )
RDD 2 种操作算子
Transformation -- (从一个 RDD 转换成 另一个 RDD, 需要等到有 Actions 操作时, 才真正触发运算)
Action -- (Action 算子 会触发 Spark 提交作业(Job), 并将数据输出到 Spark 系统)
RDD 内部属性
分区列表
计算每个分片的函数
对 父RDD 的依赖列表
对 Key-Value对 数据类型 RDD 的分区器, 控制分区策略和分区数
每个数据分区的地址列表 (如 HDFS 上的数据块的地址)
3.2.2 Spark 的数据存储
Spark 数据存储的核心是(RDD)。RDD 可被理解为一个大的数组(Array),但是这个数组是分布在集群上的。
逻辑上 RDD 的每个分区叫一个 Partition。
Spark 执行中,RDD 经历一个个的 Transfomation算子 之后,最后通过Action算子进行触发操作。
变换的输入和输出都是RDD。RDD 会被划分成很多的分区分布到集群的多个节点中。
分区是个逻辑概念。如果要迭代使用数据,可以调cache()函数缓存数据。
RDD的数据存储模型。
在物理上,RDD 对象实质是一个 元数据信息, 存储着 Block、Node 等映射关系。
RDD的每个分区对应的就是一个Block,Block可以存储在内存,当内存不够时可以存储到磁盘上。
每个 Block中 存储着 RDD所有数据项的一个子集,暴露给用户的可以是一个Block的迭代器(例如,用户可以通过mapPartitions获得分区迭代器进行操作),也可以是一个数据项(例如,通过map函数对每个数据项并行计算)。
如果是从 HDFS 等外部存储作为输入数据源,数据按照 HDFS中的数据 分布策略进行数据分区,HDFS中的一个Block对应Spark的一个分区。同时Spark支持重分区。 例如,支持Hash分区 和 Range分区(等分区策略。
3.3 Spark 算子分类及功能
Spark 算子的作用
算子 是 RDD 中定义的函数,可以对 RDD 中的数据 进行 转换 和 操作。
1)输入:-- 数据从外部数据空间输入Spark
在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
2)运行:
在Spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
3)输出:-- 存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合
程序运行结束数据会输出 Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。
Spark将常用的大数据操作都转化成为RDD的子类。
3 大类 算子
Value 数据类型 的 Transformation算子
KeyValue数据类型 的 Transfromation算子
Action算子,这类算子会触发SparkContext提交Job作业。
3.3.1 Value trans..算子
Value型算子可据 RDD变换算子的 输入分区 与 输出分区 关系分为以下几种类型:
1)输入分区与输出分区一对一型
2)输入分区与输出分区多对一型
3)输入分区与输出分区多对多型
4)输出分区为输入分区子集型
5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型。 Cache算子对RDD分区进行缓存
一、一对一
(1). map -- 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。
将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中的map算子相当于初始化一个RDD,新RDD叫作MapPartitionsRDD(this,sc.clean(f))。
(2). flatMap
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合。 内部创建FlatMappedRDD(this,sc.clean(f))
(3). mapPartitions
mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。 内部实现是生成MapPartitionsRDD。
二、多对一
(1) union 函数 保存所有元素。如果想去重,可以使用distinct()。++符号相当于uion函数操作。
(2) certesian
对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。
三、多对多
groupBy
四、输入分区为输出分区子集
filter
distinct
subtract
sample
takeSample
五、Cache型
cache -- cache 将 RDD element 从磁盘缓存到内存
persist --
参见 : http://www.jianshu.com/p/c8c71cdc163c
3.3.2 Key-V trans..算子
输入分区与输出分区一对一、聚集、连接操作。
一、一对一
mapValues
mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。
二、单个RDD或两个RDD聚集
(1)combineByKey
(2)reduceByKey
(3)partitionBy
(4)cogroup
三、连接
(1)join
join对两个需要连接的RDD进行cogroup函数操作。cogroup操作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作。
(2)leftOuterJoin和rightOuterJoin
LeftOuterJoin(左外连接)和RightOuterJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并返回结果。
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => (v, None))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
参见 : http://www.jianshu.com/p/ff7f016bf738
3.3.3 Actions 算子
在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行。
据Action算子的输出空间将Action算子进行分类:无输出、 HDFS、 Scala集合/数据类型。
无输出
foreach -- 对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。
foreach算子通过用户自定义函数对每个数据项进行操作。 本例中自定义函数为println。
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
HDFS
(1)saveAsTextFile
(2)saveAsObjectFile
Scala集合和数据类型
(1) collect
collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。 在这个数组上运用scala的函数式操作。
(2) collectAsMap
collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。
(3) reduceByKeyLocally
实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。
(4) lookup
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq
(5) count
count返回整个RDD的元素个数。
(6) top
top可返回最大的k个元素。
(7) reduce
reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。
reduceLeft先对两个元素<K,V>进行reduce函数操作,然后将结果和迭代器取出的下一个元素<k,V>进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。
在RDD中,先对每个分区中的所有元素<K,V>的集合分别进行reduceLeft。每个分区形成的结果相当于一个元素<K,V>,再对这个结果集合进行reduceleft操作。
(8) fold
fold 和 reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。
(9) aggregate
aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。
aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。
Spark 将应用程序整体翻译为一个 DAG 进行调度和执行.