累加器

提供将工作节点中的值聚合到驱动器城区的语法。累加器的常见用途是调试时对时间进行技术。累加器用法如下:
– 调用SparkContext.accumulator(initialValue)创造一个初始值的累加器。返回一个org.apache.spark.Accumulator[T]对象,其中T是初始值的类型。
– Spark闭包里的执行器代码可以使用累加器代码(+=)来改变值
– 驱动器程序可以调用累加器的value属性。(通过value()或者setValue()来方位)

在行动操作中绝对可靠,在转化操作中可能会多次更新导致错误(1.2版本)

自定义累加器

只要操作满足交换律和结合律,就可以任何操作来代替数值上的加法。

广播变量

可以将高效的将只读值广播到各个工作节点上,如特征向量。

广播变量是类型为spark.broadcast.Broadcast[T]的对象。变量只会被发送到工作节点上一次,采用BT相同的P2P机制。

val s = sc.broadcast()
  1. 通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象(适用于任何可序列化对象)
  2. 通过value属性访问对象
  3. 该对象为只读值

在工作节点上执行修改操作时,仅影响在本工作节点上的这个变量

广播的优化

选择序列化速度快的对象,或者换用更快的序列化库

基于分区操作

基于分区操作可以避免为每个数据元素进行重复配置。如可以新建一个连接池同时供个各个工作节点使用

val contactsContactLists = validSigns.distinct().mapPartitions{
    signs =>
    val mapper = createMapper()
    val client = new HttpClient()
    client.start() //创建HTTP请求
    signs.map {sign =>
        createExchangeForSign(sign) // 获取响应
    }.map{ case (signm exchange) =>
        (sign, readExchangeCallLog(mapper, exchange))
    }.filter(x => x._2 != null) //删除空值
}

基于分区操作RDD时,Spark会为函数提供该分区中的元素的迭代器,返回值也是迭代器。

管道

Spark 在RDD上提供pipe()方法允许从Unix标准流中获取数据输出。

val distScript = './src/xxx' //脚本位置
val distScriptName = 'xxx' //脚本名称
sc.addFile(distScript) //向SparkContext中添加文件允许其在工作节点被加载

val distance = contantsContactLists.values.flatMap(x => x.map(
    y => s"y.a,y.b,y.c,y.d" //使格式能被脚本解析
)).pipe(Seq(
    SparkFiles.get(distScriptName)
))
println(distances.collect().toList)

数值RDD的操作

方法 含义
count() RDD中元素的个数
mean() 元素的平均值
sum() 总和
max(),min() 最大最小值
variance() 方差
sampleVariance() 采样中计算出的方差
stdev() 标准差
sampleStdev 采样的标准差