累加器
提供将工作节点中的值聚合到驱动器城区的语法。累加器的常见用途是调试时对时间进行技术。累加器用法如下:
– 调用SparkContext.accumulator(initialValue)
创造一个初始值的累加器。返回一个org.apache.spark.Accumulator[T]
对象,其中T是初始值的类型。
– Spark闭包里的执行器代码可以使用累加器代码(+=
)来改变值
– 驱动器程序可以调用累加器的value属性。(通过value()
或者setValue()
来方位)
在行动操作中绝对可靠,在转化操作中可能会多次更新导致错误(1.2版本)
自定义累加器
只要操作满足交换律和结合律,就可以任何操作来代替数值上的加法。
广播变量
可以将高效的将只读值广播到各个工作节点上,如特征向量。
广播变量是类型为spark.broadcast.Broadcast[T]
的对象。变量只会被发送到工作节点上一次,采用BT相同的P2P机制。
val s = sc.broadcast()
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象(适用于任何可序列化对象)
- 通过value属性访问对象
- 该对象为只读值
在工作节点上执行修改操作时,仅影响在本工作节点上的这个变量
广播的优化
选择序列化速度快的对象,或者换用更快的序列化库
基于分区操作
基于分区操作可以避免为每个数据元素进行重复配置。如可以新建一个连接池同时供个各个工作节点使用
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper()
val client = new HttpClient()
client.start() //创建HTTP请求
signs.map {sign =>
createExchangeForSign(sign) // 获取响应
}.map{ case (signm exchange) =>
(sign, readExchangeCallLog(mapper, exchange))
}.filter(x => x._2 != null) //删除空值
}
基于分区操作RDD时,Spark会为函数提供该分区中的元素的迭代器,返回值也是迭代器。
管道
Spark 在RDD上提供pipe()
方法允许从Unix标准流中获取数据输出。
val distScript = './src/xxx' //脚本位置
val distScriptName = 'xxx' //脚本名称
sc.addFile(distScript) //向SparkContext中添加文件允许其在工作节点被加载
val distance = contantsContactLists.values.flatMap(x => x.map(
y => s"y.a,y.b,y.c,y.d" //使格式能被脚本解析
)).pipe(Seq(
SparkFiles.get(distScriptName)
))
println(distances.collect().toList)
数值RDD的操作
方法 | 含义 |
---|---|
count() | RDD中元素的个数 |
mean() | 元素的平均值 |
sum() | 总和 |
max(),min() | 最大最小值 |
variance() | 方差 |
sampleVariance() | 采样中计算出的方差 |
stdev() | 标准差 |
sampleStdev | 采样的标准差 |