From 2ba45b64b3aa770c378cfa9085e180e14377d356 Mon Sep 17 00:00:00 2001 From: dev_xulongjin Date: Fri, 23 May 2025 09:31:07 +0800 Subject: [PATCH] =?UTF-8?q?feat(spark-lesson):=20=E6=B7=BB=E5=8A=A0=20Spar?= =?UTF-8?q?k=20Streaming=20=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增数据文件 data.txt 用于 Spark Streaming 测试 - 实现 SparkStreamingTest1.scala,演示从文件读取数据进行单词计数- 添加 StreamingKafka.scala,实现 Spark Streaming 与 Kafka 集成的单词计数 --- .../date_20250520/SparkStreamingTest1.scala | 26 ++++++ .../scala/date_20250520/StreamingKafka.scala | 84 +++++++++++++++++++ .../main/scala/date_20250520/input/data.txt | 4 + 3 files changed, 114 insertions(+) create mode 100644 spark-lesson/src/main/scala/date_20250520/SparkStreamingTest1.scala create mode 100644 spark-lesson/src/main/scala/date_20250520/StreamingKafka.scala create mode 100644 spark-lesson/src/main/scala/date_20250520/input/data.txt diff --git a/spark-lesson/src/main/scala/date_20250520/SparkStreamingTest1.scala b/spark-lesson/src/main/scala/date_20250520/SparkStreamingTest1.scala new file mode 100644 index 0000000..f4f0a66 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250520/SparkStreamingTest1.scala @@ -0,0 +1,26 @@ +package date_20250520 + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} +import org.apache.spark.streaming.{Seconds, StreamingContext} + +object SparkStreamingTest1 { + def main(args: Array[String]): Unit = { + val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") + + val ssc = new StreamingContext(conf, Seconds(5)) + + // val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", port = 9999) + + val data: DStream[String] = ssc.textFileStream("spark-lesson/src/main/scala/date_20250520/input/") + + data + .flatMap(_.split(" ")) + .map(x => (x, 1)) + .reduceByKey(_ + _) + .print() + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/spark-lesson/src/main/scala/date_20250520/StreamingKafka.scala b/spark-lesson/src/main/scala/date_20250520/StreamingKafka.scala new file mode 100644 index 0000000..2b33cd2 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250520/StreamingKafka.scala @@ -0,0 +1,84 @@ +package date_20250520 + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.spark.streaming._ +import org.apache.spark.SparkConf +import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies} +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.log4j.{Level, Logger} +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe + +/** + * Spark Streaming整合Kafka实现单词计数 + */ +object StreamingKafka { + //所有org包名只输出ERROR级别的日志,如果导入其他包,那么只需要再新创建一行写入包名即可 + + System.setProperty("HADOOP_USER_NAME","root") + Logger.getLogger("org").setLevel(Level.ERROR) + + def main(args: Array[String]) { + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName("StreamingKafkaWordCount") + + //创建Spark Streaming上下文,并以1秒内收到的数据作为一个批次 + val ssc = new StreamingContext(conf, Seconds(1)) + //设置检查点目录,因为需要用检查点记录历史批次处理的结果数据 + ssc.checkpoint("hdfs://hadoop102:8020/spark-ck") + + //设置输入流的Kafka主题,可以设置多个 + val kafkaTopics = Array("topictest") + + //Kafka配置属性 + val kafkaParams = Map[String, Object]( + //Kafka Broker服务器的连接地址 + "bootstrap.servers" -> "hadoop102:9092", + //设置反序列化key的程序类,与生产者对应 + "key.deserializer" -> classOf[StringDeserializer], + //设置反序列化value的程序类,与生产者对应 + "value.deserializer" -> classOf[StringDeserializer], + //设置消费者组ID,ID相同的消费者属于同一个消费者组 + "group.id" -> "1", + //Kafka不自动提交偏移量(默认为true),由Spark管理 + "enable.auto.commit" -> (false: java.lang.Boolean) + ) + + //创建输入DStream + val inputStream: InputDStream[ConsumerRecord[String, String]] = + KafkaUtils.createDirectStream[String, String]( + ssc, + LocationStrategies.PreferConsistent, + Subscribe[String, String](kafkaTopics, kafkaParams) + ) + + //对接收到的一个DStream进行解析,取出消息记录的key和value + val linesDStream = inputStream.map(record => (record.key, record.value)) + //默认情况下,消息内容存放在value中,取出value的值 + val wordsDStream = linesDStream.map(_._2) + val word = wordsDStream.flatMap(_.split(" ")) + val pair = word.map(x => (x, 1)) + + //更新每个单词的数量,实现按批次累加 + val result: DStream[(String, Int)] = pair.updateStateByKey(updateFunc) + //默认打印DStream中每个RDD中的前10个元素到控制台 + result.print() + + ssc.start + ssc.awaitTermination + } + + /** + * 定义状态更新函数,按批次累加单词数量 + * + * @param:values 当前批次单词出现的次数,相当于Seq(1, 1, 1) + * @param:state 上一批次累加的结果,因为有可能没有值,所以用Option类型 + */ + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) //累加当前批次单词的数量 + val previousCount = state.getOrElse(0) //获取上一批次单词的数量,默认值为0 + Some(currentCount + previousCount) //求和 + } + +} diff --git a/spark-lesson/src/main/scala/date_20250520/input/data.txt b/spark-lesson/src/main/scala/date_20250520/input/data.txt new file mode 100644 index 0000000..f5ace92 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250520/input/data.txt @@ -0,0 +1,4 @@ +flink kafka +flink spark +spark python java +netcat java \ No newline at end of file