refactor(spark-lesson): 完成实验五

This commit is contained in:
dev_xulongjin 2025-04-22 17:16:30 +08:00
parent c5a46a98e0
commit ffb978cdb5

View File

@ -0,0 +1,58 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** 统计每日新增用户 */
object DailyNewUsers {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象存储应用程序的配置信息
val conf = new SparkConf()
.setMaster("local") // 启动本地化计算
.setAppName("Spark-DailyNewUsers") // 设置应用程序名称
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取用户访问数据文件
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102/input/data/user_logs.txt")
// 将每一行数据转化为 (日期, 用户) 二元组
val dateUserRDD: RDD[(String, String)] = linesRDD.map(line => {
val Array(date, user) = line.split(",")
(date, user)
})
// 获取日期与用户的映射并按照日期排序
val sortedDateUserRDD: RDD[(String, List[String])] = dateUserRDD
.groupByKey()
.mapValues(_.toList)
.sortBy(_._1)
// 使用set保存已出现的用户
var seenUsers: Set[String] = Set()
// 计算每日新增用户
val dailyNewUsersRDD: RDD[(String, List[String])] = sortedDateUserRDD.map { case (date, users) =>
val uniqueUsers = users.toSet // 去重
val newUsers = uniqueUsers.diff(seenUsers) // 筛选新增用户
seenUsers = seenUsers.union(uniqueUsers) // 更新已出现用户集合
(date, newUsers.toList)
}
// 保存结果到HDFS
// dailyNewUsersRDD.saveAsTextFile("hdfs://192.168.182.100:9000/output/dailyNewUsers")
// 打印结果
dailyNewUsersRDD.collect().foreach { case (date, newUsers) =>
println(s"日期:$date")
newUsers.foreach(user => println(s"新增用户:$user"))
println("********************")
}
// 停止SparkContext
sc.stop()
}
}