带有状态的SparkStreaming单词计数程序

2/10/2017来源:ASP.NET技巧人气:1352

在另外一篇《SparkStreaming的入门级程序:WordCount》文章中,只是统计每一个批次的数据,是不带状态的单词计数程序,使用的是reduceByKey()方法,它只能统计当前批次的单词个数,而不会累加上一个批次的单词个数;而带有状态的单词计数程序会累加上个批次的单词个数,它使用的则是updateStateByKey()方法。

在pom.xml文件中引入一下依赖:

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
</dependencies>

StateFulSteamingWordCount.scala代码如下:

package streams.test

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import streams.LoggerLevels


/**
  * Created by SYJ on 2017/2/7.
  */
object TestStateFulStreamingWordCount {

  /**
    * 当前定义的函数,接收一个迭代器类型的参数。
    * 对Iterator[(String ,Seq[Int], Option[Int])]中泛型的解释:
    * SparkStreaming每隔一段时间就会产生一个批次的RDD,
    * 在调用该函数之前,会对当前批次的数据安装key进行分组,
    * 所以第一参数String指的就是分组的key,在本例中指的就是单词,
    * 而第二个参数Seq[Int]就是key所对应的value,在本例中指的就是
    * 由很多的数字1组成的集合。
    * 而第三个参数Option[Int]表示历史数据,Option本身就表示可能有
    * 数据,也可能没有数据,第一个批次是没有历史数据的,所以数据是0,
    * 从第二个批次开始就有数据,也就是每两个批次进行叠加后的值,
    * 所以Option[Int]表示初始值或者叠加后的值。
    */
  val updateFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
    /**
      * map方法接收一个元组t,
      * 其中t._1表示分组的key,这个key不变,
      * t._2.sum表示对Seq[Int]中的值求和,
      * t._3.getOrElse(0)表示获取每两个批
      * 次叠加后的值,初始值为0;
      */
    //it.map(t => (t._1, t._2.sum, t._3.getOrElse(0)))

    /**
      * 上面的方式不太好,因为t._1、t._2和t._3不太好识别,
      * 我们可以使用模式匹配,注意如果使用模式匹配的话,
      * map后面就不能使用小括号了,必须使用大括号。
      */
    it.map { case (x, y, z) => (x, y.sum + z.getOrElse(0)) }
  }

  def main(args: Array[String]) {
    LoggerLevels.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("TestStateFulStreamingWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    /**
      * 必须设置checkpoint目录,
      * 是为了避免历史数据丢失而导致
      * 新的数据和旧的数据无法做汇总或者聚合操作;
      * 如果程序在Spark集群上跑,通常保存到HDFS中,
      * 由于这里只是单机程序,所以就保存到本地磁盘上.
      */
    ssc.checkpoint("c://ck3")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 8888)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    /**
      * 如果要实现对每个批次的单词数量进行累加,
      * 可以使用updateStateByKey方法,它可以根据
      * key来更新状态,可以累加以前的数据。
      * 这个updateStateByKey方法要求传进去3个参数:
      * 第一个参数是自定义的累加函数,告诉它我们的累加逻辑;
      * 第二个参数是一个分区器,可以使用HashPartitioner,
      * 也可以定义自己的分区器;定义分区器的目的是为了
      * 避免数据倾斜(数据都集中到某些机器上面去了);
      * 第三个参数是一个Boolean值,表示其他的计算也要使用该分区器;
      */
    val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(
      updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)

    result.PRint()
    ssc.start()
    ssc.awaitTermination()
  }
}


由于SparkStreaming程序在运行的时候会在控制台打印很多info日志,影响我们对于实时统计结果的观察,所以我们在程序中通过如下代码来设置日志的输出级别:

package streams

import org.apache.log4j.{Logger, Level}
import org.apache.spark.Logging

object LoggerLevels extends Logging {

  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

启动SparkStreaming程序。

使用nc启动一个SocketServer,监听8888端口,输入几行单词试试:

观察控制台的SparkStreaming程序实时输出的单词计数结果: