全方位掌握Apache Spark 2.0七步走(二)

1964 查看

在上一篇普及过Spark的相关概念之后,让我们继续深入研究它的核心结构以及好用的API,本篇视频内容丰富,机(fan)智(qiang)的小伙伴不容错过。

前篇传送门:全方位掌握Apache Spark 2.0七步走(一)

三、Apache Spark的核心结构

为了更好地理解Spark的各个组件是如何交互的,在细节上抓住Spark的核心结构是很有必要的。所有关键词和概念在解释后才会变得生动形象,这篇Spark Summit大会上的培训视频对于你更快开启Spark之旅很有帮助:

视频链接:https://youtu.be/7ooZ4S7Ay6Y

四、DataFrames, Datasets 以及 Spark SQL

在步骤3中,你已经了解到弹性分布式数据集(RDDs)——它们构成了Spark的核心数据抽象概念,是其他所有更高层次数据抽象和API、包括DataFrame和数据集的基础。

在Spark2.0,在RDDs之上的DataFrame和数据集形成了核心的高层和结构化的分布式数据抽象。DataFrame在Spark里被叫做数据列(data column),它们可以执行组织数据的计划,以及数据处理或者描述运算、发布查询。数据集更进一步,提供了一个严格的编译时类型的安全保障,所以特定类型的错误在编译时就会被发现,而不是在运行时。

凭借数据结构和数据类型,Spark可以理解你将如何进行描述运算,哪些指定类型的列或者特定名称的字段将会访问你的数据,以及你将使用哪些特定操作的作用域。然后,Spark将会通过Spark 2.0’s Catalyst optimizer优化你的代码,通过Project Tungsten生成高效的字节代码。

DataFrame和数据集为多种高级编程语言提供了API,让你的代码更易读,以及支持高阶函数比如filter, sum, count, avg, min, max等等。不管你用Spark SQL还是Python、Java、Scala或者R来表达你的计算指令,底层的代码生成是完全一致的,因为所有的执行的计划都是通过同一Catalyst优化器。

例如,Scala的作用域专用代码或者它SQL里对应的相关查询会生成完全相同的代码。比如下方会有一个数据集Scala项目叫做Person,以及一个SQL表格“Person”。

// a dataset object Person with field names fname, lname, age, weight
// access using object notation
val seniorDS = peopleDS.filter(p=>p.age > 55)
// a dataframe with structure with named columns fname, lname, age, weight
// access using col name notation
Val seniorDF = peopleDF.where(peopleDF("age") > 55)
// equivalent Spark SQL code
val seniorDF = spark.sql("SELECT age from person where age > 35")

为什么Spark结构化数据很重要,为什么DataFrame、数据集、Spark SQL提供了一个高效的Spark编码方式,如果你希望了解这些,可以通过链接(https://youtu.be/1a4pgYzeFwE )的视频寻找答案。

五、GraphFrame的图形处理

尽管Spark有一个通用的基于RDD的图形处理库GraphX,可以优化分布式计算以及支持图形算法,它仍有一些挑战——没有Java和Python API,基于低层的RDD API。由于这些问题,它不能通过Project Tungsten 和Catalyst Optimizer享受到最近引入的性能优化。

相比之下,基于DataFrame的图处理库GraphFrames解决了所有问题:它提供了一个类似于GraphX的库但是有着更高的层级,更易读和可读的API,支持Java, Scala 和Python;可以保存和下载图形;利用了Spark2.0的底层性能和查询的优化。此外,它集成了GraphX。这意味着你可以无缝地将图处理库GraphFrames转换成等效的GraphX表示。

在下图中,这些城市有各个机场代号,所有顶点可以表示成DataFrame的排;同样地,所有边也可以看做DataFrame的排,它们有着各自的名字和类型的列。总的来说,这些DataFrame的顶点和边构成了一个图处理库GraphFrames。

// create a Vertices DataFrame
val vertices = spark.createDataFrame(List(("JFK", "New York", "NY"))).toDF("id", "city", "state")
// create a Edges DataFrame
val edges = spark.createDataFrame(List(("JFK", "SEA", 45, 1058923))).toDF("src", "dst", "delay", "tripID")
// create a GraphFrame and use its APIs
val airportGF = GraphFrame(vertices, edges)
// filter all vertices from the GraphFrame with delays greater an 30 mins
val delayDF = airportGF.edges.filter("delay > 30")
// Using PageRank algorithm, determine the Airport ranking of importance
val pageRanksGF = airportGF.pageRank.resetProbability(0.15).maxIter(5).run()
display(pageRanksGF.vertices.orderBy(desc("pagerank")))

使用GraphFrame可以表达三种强大的查询。首先是简单的SQL类型的关于点和边的查询,比如怎么样的路线可能会导致重大延迟。第二,图形类型查询,比如有多少顶点传入有多少边传出。第三,主题查询,通过提供一个结构化的模型或者路径的顶点和边,找到在图形中的数据集的模型。

此外,图处理库GraphFrames可以很轻松地支持GraphX所有图形算法。例如,使用PageRank找到所有重要的点,或者决定从起点到目的地的最短路径,或者执行一个广度优先搜索(BFS),或者为探索联络关系确定强联系的点。

在网络研讨会中(http://go.databricks.com/grap... )中,Spark的社区贡献者Joseph Bradley给大家介绍了使用图处理库GraphFrames进行图像处理的动机和易用性,以及基于DataFrame的API的好处。作为研讨会的一部分,你也将了解到使用图处理库GraphFrames的便捷,以及上述所有类型的查询和算法。

Apache Spark 2.0和许多Spark的组件,包括机器学习MLlib和Streaming,因为性能提升、易用性和高层次的抽象及结构,越来越倾向于提供等效的DataFrame API。在必要或者适合的用例中,你可以选择使用图处理库GraphFrames来代替GraphX。下图是一个GraphX和图处理库GraphFrames之间简洁的总结和比较。

图处理库GraphFrames必会发展得越来越快。新版本的GraphFrame将作为Spark的一个包和Spark2.0兼容。

作者:Jules S. Damji & Sameer Farooqui, Databricks.
文章来源:http://www.kdnuggets.com/2016...