From ffb978cdb53dba0a644fdaa336891ef76836d23e Mon Sep 17 00:00:00 2001 From: dev_xulongjin Date: Tue, 22 Apr 2025 17:16:30 +0800 Subject: [PATCH] =?UTF-8?q?refactor(spark-lesson):=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=AE=9E=E9=AA=8C=E4=BA=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/date_20250415/DailyNewUsers.scala | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100755 spark-lesson/src/main/scala/date_20250415/DailyNewUsers.scala diff --git a/spark-lesson/src/main/scala/date_20250415/DailyNewUsers.scala b/spark-lesson/src/main/scala/date_20250415/DailyNewUsers.scala new file mode 100755 index 0000000..dfb1525 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/DailyNewUsers.scala @@ -0,0 +1,58 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +/** 统计每日新增用户 */ +object DailyNewUsers { + + def main(args: Array[String]): Unit = { + + // 创建SparkConf对象,存储应用程序的配置信息 + val conf = new SparkConf() + .setMaster("local") // 启动本地化计算 + .setAppName("Spark-DailyNewUsers") // 设置应用程序名称 + + // 创建SparkContext对象 + val sc = new SparkContext(conf) + + // 读取用户访问数据文件 + val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102/input/data/user_logs.txt") + + // 将每一行数据转化为 (日期, 用户) 二元组 + val dateUserRDD: RDD[(String, String)] = linesRDD.map(line => { + val Array(date, user) = line.split(",") + (date, user) + }) + + // 获取日期与用户的映射,并按照日期排序 + val sortedDateUserRDD: RDD[(String, List[String])] = dateUserRDD + .groupByKey() + .mapValues(_.toList) + .sortBy(_._1) + + // 使用set保存已出现的用户 + var seenUsers: Set[String] = Set() + + // 计算每日新增用户 + val dailyNewUsersRDD: RDD[(String, List[String])] = sortedDateUserRDD.map { case (date, users) => + val uniqueUsers = users.toSet // 去重 + val newUsers = uniqueUsers.diff(seenUsers) // 筛选新增用户 + seenUsers = seenUsers.union(uniqueUsers) // 更新已出现用户集合 + (date, newUsers.toList) + } + + // 保存结果到HDFS +// dailyNewUsersRDD.saveAsTextFile("hdfs://192.168.182.100:9000/output/dailyNewUsers") + + // 打印结果 + dailyNewUsersRDD.collect().foreach { case (date, newUsers) => + println(s"日期:$date") + newUsers.foreach(user => println(s"新增用户:$user")) + println("********************") + } + + // 停止SparkContext + sc.stop() + } +} \ No newline at end of file