键值对
Spark为键值对RDD提供一些转有方法如join()
,reduceByKey()
等。因此需要将数据转换成为键值对RDD才能应用上述方法。
创建
部分数据格式在读取时会直接返回为键值对,否则使用map()
函数传入自定义方法或者lambda表达式来将数据转换成为键值对(Python中原生支持元组,Scala和Java中需要使用Tuple*
类来创建n元组)
转化操作
行动操作
分区
控制数据分布以获得最小的网络开销可以提高整体性能。
Spark中所有的键值对RDD都可以进行分区。允许通过自定义的Partitioner
对象来控制RDD分区方式。
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://") //读取文件并转化为Pair RDD
.partitionBy(new HashPartitioner(100)) //构造100分区
.persist() //持久化
partitionBy()
是转化操作,因此应对其输出进行持久化。
通过RDD的partitioner
属性来获取RDD的分区方式
基本所有Map,GroupBy操作都能从分区中受益
数据读取与保存
文件
支持存储在本地文件或者分布式文件系统中的数据,支持以下种类:
– 文本文件
– JSON
– CSV
– SequenceFiles 一种用于键值对的常见Hadoop文件格式
– Protocol buffers 一种快速,节约空间的跨语言格式
– 对象文件 将Spark中作业数据存储下来让共享的代码读取,依赖于Java序列化
结构化数据
Apache Hive
需要将Hive的配置文件复制到Spark的conf目录下。创建HiveContext对象并传入SparkContext,即可用Hql查询Hive数据,并以行组成的RDD形式返回数据。
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc) //sc 即 SparkContext
val rows = hiveCtx.sql("SELECT * FROM XXX")
val firstRow = rows.first() //返回RDD
数据库
Spark可以从任何支持JDBC的数据库中获取数据。构建一个org.apache.spark.rdd.jdbcRDD
,将sc传入即可。
Cassandra
HBase
Elasticsearch