键值对

Spark为键值对RDD提供一些转有方法如join(),reduceByKey()等。因此需要将数据转换成为键值对RDD才能应用上述方法。

创建

部分数据格式在读取时会直接返回为键值对,否则使用map()函数传入自定义方法或者lambda表达式来将数据转换成为键值对(Python中原生支持元组,Scala和Java中需要使用Tuple*类来创建n元组)

转化操作

行动操作

分区

控制数据分布以获得最小的网络开销可以提高整体性能。

Spark中所有的键值对RDD都可以进行分区。允许通过自定义的Partitioner对象来控制RDD分区方式。

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://")    //读取文件并转化为Pair RDD
                        .partitionBy(new HashPartitioner(100))    //构造100分区
                        .persist()      //持久化

partitionBy()是转化操作,因此应对其输出进行持久化。

通过RDD的partitioner属性来获取RDD的分区方式

基本所有Map,GroupBy操作都能从分区中受益

数据读取与保存

文件

支持存储在本地文件或者分布式文件系统中的数据,支持以下种类:
– 文本文件
– JSON
– CSV
– SequenceFiles 一种用于键值对的常见Hadoop文件格式
– Protocol buffers 一种快速,节约空间的跨语言格式
– 对象文件 将Spark中作业数据存储下来让共享的代码读取,依赖于Java序列化

结构化数据

Apache Hive

需要将Hive的配置文件复制到Spark的conf目录下。创建HiveContext对象并传入SparkContext,即可用Hql查询Hive数据,并以行组成的RDD形式返回数据。

import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc) //sc 即 SparkContext
val rows = hiveCtx.sql("SELECT * FROM XXX")
val firstRow = rows.first() //返回RDD

数据库

Spark可以从任何支持JDBC的数据库中获取数据。构建一个org.apache.spark.rdd.jdbcRDD,将sc传入即可。

Cassandra
HBase
Elasticsearch