feat(date_20250429): 完成实验7
- 新增 SparkAggFuntion.scala 文件,实现用户数统计 - 新增 sparksqlkeywords.scala 文件,处理关键词统计 - 新增 sparksqlwordcount.scala 文件,完成词频统计
This commit is contained in:
parent
ac7ab26d72
commit
6ca57600c0
@ -0,0 +1,47 @@
|
||||
package date_20250429
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
|
||||
object SparkAggFuntion {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val spark = SparkSession.builder()
|
||||
.appName("sparkAggFuntion")
|
||||
.master("local[*]")
|
||||
.getOrCreate()
|
||||
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
val arr = Array(
|
||||
"2024-11-01,0001",
|
||||
"2024-11-01,0002",
|
||||
"2024-11-01,0001",
|
||||
"2024-11-02,0003",
|
||||
"2024-11-02,0003",
|
||||
"2024-11-02,0004",
|
||||
"2024-11-02,0001",
|
||||
"2024-11-02,0001",
|
||||
"2024-11-02,0004",
|
||||
"2024-11-03,0002",
|
||||
"2024-11-03,0003",
|
||||
"2024-11-03,0001",
|
||||
"2024-11-03,0005",
|
||||
"2024-11-03,0003",
|
||||
"2024-11-03,0003",
|
||||
"2024-11-04,0004"
|
||||
)
|
||||
|
||||
val rowRDD: RDD[Row] = spark.sparkContext.makeRDD(arr)
|
||||
.map(line => Row(line.split(",")(0), line.split(",")(1).toInt)
|
||||
)
|
||||
val structType: StructType = StructType(Array(StructField("date", StringType, true),
|
||||
StructField("userid", IntegerType, true)))
|
||||
|
||||
val df = spark.createDataFrame(rowRDD,structType)
|
||||
|
||||
df.groupBy("date")
|
||||
.agg(countDistinct("userid") as "count")
|
||||
.show()
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package date_20250429
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.types._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
|
||||
object sparksqlkeywords {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val spark = SparkSession.builder()
|
||||
.appName("")
|
||||
.master("local[*]")
|
||||
.getOrCreate()
|
||||
|
||||
val linesRDD: RDD[String] = spark.sparkContext
|
||||
.textFile("hdfs://hadoop102:8020/input/data_7/keywords.txt")
|
||||
|
||||
val tupleRDD: RDD[((String, String), String)] = linesRDD.map(line => {
|
||||
val date = line.split(",")(0)
|
||||
val user = line.split(",")(1)
|
||||
val keyword = line.split(",")(2)
|
||||
|
||||
((date, keyword), user)
|
||||
|
||||
})
|
||||
|
||||
val groupedRDD: RDD[((String, String), Iterable[String])] = tupleRDD.groupByKey()
|
||||
|
||||
val uvRDD: RDD[((String, String), Int)] = groupedRDD.map(line => {
|
||||
val dateAndKeyword: (String, String) = line._1
|
||||
|
||||
val users: Iterator[String] = line._2.iterator
|
||||
|
||||
val distinctUsers = new ListBuffer[String]()
|
||||
while (users.hasNext) {
|
||||
val user = users.next
|
||||
if (!distinctUsers.contains(user)) {
|
||||
distinctUsers += user
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
val uv = distinctUsers.size
|
||||
|
||||
(dateAndKeyword, uv)
|
||||
})
|
||||
|
||||
val rowRDD: RDD[Row] = uvRDD.map(line => {
|
||||
Row(
|
||||
line._1._1,
|
||||
line._1._2,
|
||||
line._2.toInt
|
||||
)
|
||||
})
|
||||
val structType: StructType = StructType(Array(
|
||||
StructField("date", StringType, true),
|
||||
StructField("keyword", StringType, true),
|
||||
StructField("uv", IntegerType, true)
|
||||
))
|
||||
|
||||
val df = spark.createDataFrame(rowRDD,structType)
|
||||
|
||||
df.createTempView("date_keyword_uv")
|
||||
|
||||
spark.sql(
|
||||
"""
|
||||
|SELECT * FROM date_keyword_uv
|
||||
|""".stripMargin).show()
|
||||
|
||||
spark.close()
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package date_20250429
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
|
||||
|
||||
|
||||
object sparksqlwordcount {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val session = SparkSession.builder()
|
||||
.appName("WordCount")
|
||||
.master("local[*]")
|
||||
.getOrCreate()
|
||||
|
||||
val lines: Dataset[String] =
|
||||
session.read.textFile("hdfs://hadoop102:8020/input/data_7/wordcount03.txt")
|
||||
|
||||
import session.implicits._
|
||||
val words: Dataset[String] = lines.flatMap(_.split(" "))
|
||||
|
||||
val df: DataFrame = words.withColumnRenamed("value", "word")
|
||||
|
||||
df.createTempView("v_words")
|
||||
val result: DataFrame =
|
||||
session.sql(
|
||||
"""
|
||||
|SELECT word,count(1) num
|
||||
|FROM v_words
|
||||
|GROUP BY word
|
||||
|ORDER BY num DESC
|
||||
|""".stripMargin)
|
||||
|
||||
result.show()
|
||||
session.close()
|
||||
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user