refactor(spark-lesson): 完成实验六

This commit is contained in:
dev_xulongjin 2025-04-23 09:14:47 +08:00
parent ffb978cdb5
commit 5a6475fb14
8 changed files with 240 additions and 0 deletions

View File

@ -13,6 +13,8 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.12.2</scala.version>
<mysqlconnect.version>8.0.33</mysqlconnect.version>
</properties>
<repositories>
@ -127,6 +129,13 @@
<version>2.5.1</version>
</dependency>
<!-- mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysqlconnect.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,39 @@
package date_20250422
import org.apache.spark.sql.SparkSession
object CsvSorter {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("CSV Sorter")
.master("local[*]") // 使用本地模式根据需要可以更改成集群模式
.getOrCreate()
// 读取 CSV 文件
val csvFilePath = "hdfs://hadoop102:8020/input/data_6/person.csv" // 替换为你的 CSV 文件路径
val df = spark.read
.option("header", "false") // CSV 文件没有头部因此设置为 false
.option("inferSchema", "true") // 自动推断列的数据类型
.csv(csvFilePath)
.toDF("id", "name", "age") // 指定列名
// 注册临时视图以便使用 Spark SQL
df.createOrReplaceTempView("csv_table")
// 使用 Spark SQL 进行排序
val sortedDf = spark.sql("SELECT * FROM csv_table ORDER BY name ASC")
// 显示排序后的结果
sortedDf.show()
sortedDf.select("id", "name", "age")
.write.save("hdfs://hadoop102:8020/output/data_6/user_sort")
// 停止 SparkSession
spark.stop()
}
}

View File

@ -0,0 +1,35 @@
package date_20250422
import org.apache.spark.sql.SparkSession
case class Person(id: Int, name: String, age: Int)
object Dataset_Return_DateF {
def main(args: Array[String]): Unit = {
//创建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLJDBC")
.master("local[*]")
.config("spark.testing.memory", "2147480000")
.getOrCreate()
val d1 = spark.read.textFile("hdfs://hadoop102:8020/input/data_6/person.txt")
import spark.implicits._
val personDataset = d1.map(line => {
val fields = line.split(",")
val id = fields(0).toInt
val name = fields(1)
val age = fields(2).toInt
Person(id, name, age)
})
// 将Dataset转为DataFrame
val pdf = personDataset.toDF()
// 在DataFrame上创建一个临时视图v_person
pdf.createTempView("v_person")
// 使用SparkSession对象执行SQL查询
val result = spark.sql("select * from v_person order by age desc")
// 调用show()方法输出结果数据
result.show()
}
}

View File

@ -0,0 +1,36 @@
package date_20250422
import org.apache.spark.sql.SparkSession
object JsonToHdfs {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("JsonToHdfs")
.master("local[*]") // 设置本地运行使用所有可用核心根据需要调整或移除
.getOrCreate()
// 读取 JSON 文件
val jsonFilePath = "hdfs://hadoop102:8020/input/data_6/people.json" // 替换为你的 JSON 文件路径
val df = spark.read.json(jsonFilePath)
// 注册临时视图以便使用 Spark SQL
df.createOrReplaceTempView("json_table")
// 使用 Spark SQL 执行查询
val queryResultDf = spark.sql("SELECT * FROM json_table") // 这里可以根据需要修改 SQL 查询
// 显示查询结果
queryResultDf.show()
// 将查询结果写入 HDFS
val hdfsOutputPath = "hdfs://hadoop102:8020/output/data_6/jsonToHdfs" // 替换为你的 HDFS 输出目录
// queryResultDf.write.mode("append").parquet(hdfsOutputPath) // 使用 append 模式写入避免覆盖现有数据
queryResultDf.write.mode("overwrite").parquet(hdfsOutputPath) // 使用 overwrite 模式写入覆盖现有数据
// 停止 SparkSession
spark.stop()
}
}

View File

@ -0,0 +1,31 @@
package date_20250422
import org.apache.spark.sql.SparkSession
object ParquetSQLExample {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Parquet Example")
.master("local[*]") // 本地运行
.getOrCreate()
// 读取 Parquet 文件
val ratingsDF = spark.read.parquet("hdfs://hadoop102:8020/input/data_6/users.parquet")
ratingsDF.show()
// 注册临时视图
ratingsDF.createOrReplaceTempView("users")
// 将查询结果写入到 HDFS 自定义目录
ratingsDF.select("name", "favorite_color", "favorite_numbers")
.write.save("hdfs://hadoop102:8020/output/data_6/users_output2")
// 停止 SparkSession
spark.stop()
}
}

View File

@ -0,0 +1,44 @@
package date_20250422
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/** 将RDD中的数据写入到MySQL* */
object SparkSQLJDBC {
def main(args: Array[String]): Unit = {
//创建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLJDBC")
.master("local[*]")
.getOrCreate()
//创建存放两条学生信息的RDD
val studentRDD = spark.sparkContext.parallelize(
Array("4 xiaoming 26", "5 xiaogang 27")
).map(_.split(" "))
//通过StructType指定每个字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true))
)
//将studentRDD映射为rowRDDrowRDD中的每个元素都为一个Row对象
val rowRDD = studentRDD.map(line =>
Row(line(0).toInt, line(1).trim, line(2).toInt)
)
//建立rowRDD和schema之间的对应关系
val studentDF = spark.createDataFrame(rowRDD, schema)
//将结果追加到MySQL的student表中
studentDF.write.mode("append") //保存模式为追加即在原来的表中追加数据
.format("jdbc")
//.option("url", "jdbc:mysql://192.168.182.100:3307/spark_db")
.option("url", "jdbc:mysql://hadoop102:3306/spark_db")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", "student") //表名
.option("user", "root")
.option("password", "123456")
.save()
}
}

View File

@ -0,0 +1,37 @@
package date_20250422
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkSQLReadFromMySQL {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Read from MySQL")
.config("spark.master", "local") // 使用 local 模式运行
.getOrCreate()
// 配置 MySQL 连接信息
val jdbcUrl = "jdbc:mysql://hadoop102:3306/spark_db?useSSL=false"
val connectionProperties = new Properties()
connectionProperties.setProperty("user", "root")
connectionProperties.setProperty("password", "123456")
connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver") // 添加 MySQL JDBC 驱动
// 读取 MySQL
val studentDf: DataFrame = spark.read.jdbc(jdbcUrl, "student", connectionProperties)
// 显示 student 表的内容
studentDf.show()
// 注册临时视图
studentDf.createOrReplaceTempView("student")
// 使用 SQL 语句查询
spark.sql("SELECT * FROM student").show()
// 停止 SparkSession
spark.stop()
}
}

View File

@ -0,0 +1,9 @@
show databases;
create database if not exists spark_db;
use spark_db;
create table student (id int,name varchar(225),age int);
select * from student;