From 6ca57600c0b2389f1be9ffad1613866c7422ed23 Mon Sep 17 00:00:00 2001 From: dev_xulongjin Date: Tue, 29 Apr 2025 17:17:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(date=5F20250429):=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=AE=9E=E9=AA=8C7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 SparkAggFuntion.scala 文件,实现用户数统计 - 新增 sparksqlkeywords.scala 文件,处理关键词统计 - 新增 sparksqlwordcount.scala 文件,完成词频统计 --- .../scala/date_20250429/SparkAggFuntion.scala | 47 ++++++++++++ .../date_20250429/sparksqlkeywords.scala | 73 +++++++++++++++++++ .../date_20250429/sparksqlwordcount.scala | 35 +++++++++ 3 files changed, 155 insertions(+) create mode 100644 spark-lesson/src/main/scala/date_20250429/SparkAggFuntion.scala create mode 100644 spark-lesson/src/main/scala/date_20250429/sparksqlkeywords.scala create mode 100644 spark-lesson/src/main/scala/date_20250429/sparksqlwordcount.scala diff --git a/spark-lesson/src/main/scala/date_20250429/SparkAggFuntion.scala b/spark-lesson/src/main/scala/date_20250429/SparkAggFuntion.scala new file mode 100644 index 0000000..4c7f2f5 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250429/SparkAggFuntion.scala @@ -0,0 +1,47 @@ +package date_20250429 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SparkSession} + +object SparkAggFuntion { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("sparkAggFuntion") + .master("local[*]") + .getOrCreate() + + import org.apache.spark.sql.functions._ + + val arr = Array( + "2024-11-01,0001", + "2024-11-01,0002", + "2024-11-01,0001", + "2024-11-02,0003", + "2024-11-02,0003", + "2024-11-02,0004", + "2024-11-02,0001", + "2024-11-02,0001", + "2024-11-02,0004", + "2024-11-03,0002", + "2024-11-03,0003", + "2024-11-03,0001", + "2024-11-03,0005", + "2024-11-03,0003", + "2024-11-03,0003", + "2024-11-04,0004" + ) + + val rowRDD: RDD[Row] = spark.sparkContext.makeRDD(arr) + .map(line => Row(line.split(",")(0), line.split(",")(1).toInt) + ) + val structType: StructType = StructType(Array(StructField("date", StringType, true), + StructField("userid", IntegerType, true))) + + val df = spark.createDataFrame(rowRDD,structType) + + df.groupBy("date") + .agg(countDistinct("userid") as "count") + .show() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250429/sparksqlkeywords.scala b/spark-lesson/src/main/scala/date_20250429/sparksqlkeywords.scala new file mode 100644 index 0000000..830be39 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250429/sparksqlkeywords.scala @@ -0,0 +1,73 @@ +package date_20250429 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ +import scala.collection.mutable.ListBuffer + + +object sparksqlkeywords { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("") + .master("local[*]") + .getOrCreate() + + val linesRDD: RDD[String] = spark.sparkContext + .textFile("hdfs://hadoop102:8020/input/data_7/keywords.txt") + + val tupleRDD: RDD[((String, String), String)] = linesRDD.map(line => { + val date = line.split(",")(0) + val user = line.split(",")(1) + val keyword = line.split(",")(2) + + ((date, keyword), user) + + }) + + val groupedRDD: RDD[((String, String), Iterable[String])] = tupleRDD.groupByKey() + + val uvRDD: RDD[((String, String), Int)] = groupedRDD.map(line => { + val dateAndKeyword: (String, String) = line._1 + + val users: Iterator[String] = line._2.iterator + + val distinctUsers = new ListBuffer[String]() + while (users.hasNext) { + val user = users.next + if (!distinctUsers.contains(user)) { + distinctUsers += user + } + + } + + val uv = distinctUsers.size + + (dateAndKeyword, uv) + }) + + val rowRDD: RDD[Row] = uvRDD.map(line => { + Row( + line._1._1, + line._1._2, + line._2.toInt + ) + }) + val structType: StructType = StructType(Array( + StructField("date", StringType, true), + StructField("keyword", StringType, true), + StructField("uv", IntegerType, true) + )) + + val df = spark.createDataFrame(rowRDD,structType) + + df.createTempView("date_keyword_uv") + + spark.sql( + """ + |SELECT * FROM date_keyword_uv + |""".stripMargin).show() + + spark.close() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250429/sparksqlwordcount.scala b/spark-lesson/src/main/scala/date_20250429/sparksqlwordcount.scala new file mode 100644 index 0000000..37335ab --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250429/sparksqlwordcount.scala @@ -0,0 +1,35 @@ +package date_20250429 + +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} + + +object sparksqlwordcount { + def main(args: Array[String]): Unit = { + val session = SparkSession.builder() + .appName("WordCount") + .master("local[*]") + .getOrCreate() + + val lines: Dataset[String] = + session.read.textFile("hdfs://hadoop102:8020/input/data_7/wordcount03.txt") + + import session.implicits._ + val words: Dataset[String] = lines.flatMap(_.split(" ")) + + val df: DataFrame = words.withColumnRenamed("value", "word") + + df.createTempView("v_words") + val result: DataFrame = + session.sql( + """ + |SELECT word,count(1) num + |FROM v_words + |GROUP BY word + |ORDER BY num DESC + |""".stripMargin) + + result.show() + session.close() + + } +} \ No newline at end of file