diff --git a/spark-lesson/pom.xml b/spark-lesson/pom.xml
index 435099f..6b79ccf 100644
--- a/spark-lesson/pom.xml
+++ b/spark-lesson/pom.xml
@@ -13,6 +13,8 @@
8
UTF-8
2.12.2
+ 8.0.33
+
@@ -127,6 +129,13 @@
2.5.1
+
+
+ mysql
+ mysql-connector-java
+ ${mysqlconnect.version}
+
+
diff --git a/spark-lesson/src/main/scala/date_20250422/CsvSorter.scala b/spark-lesson/src/main/scala/date_20250422/CsvSorter.scala
new file mode 100644
index 0000000..f2cb034
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/CsvSorter.scala
@@ -0,0 +1,39 @@
+package date_20250422
+
+import org.apache.spark.sql.SparkSession
+
+object CsvSorter {
+ def main(args: Array[String]): Unit = {
+
+ System.setProperty("HADOOP_USER_NAME", "root")
+
+ // 创建 SparkSession
+ val spark = SparkSession.builder()
+ .appName("CSV Sorter")
+ .master("local[*]") // 使用本地模式,根据需要可以更改成集群模式
+ .getOrCreate()
+
+ // 读取 CSV 文件
+ val csvFilePath = "hdfs://hadoop102:8020/input/data_6/person.csv" // 替换为你的 CSV 文件路径
+ val df = spark.read
+ .option("header", "false") // CSV 文件没有头部,因此设置为 false
+ .option("inferSchema", "true") // 自动推断列的数据类型
+ .csv(csvFilePath)
+ .toDF("id", "name", "age") // 指定列名
+
+ // 注册临时视图以便使用 Spark SQL
+ df.createOrReplaceTempView("csv_table")
+
+ // 使用 Spark SQL 进行排序
+ val sortedDf = spark.sql("SELECT * FROM csv_table ORDER BY name ASC")
+
+ // 显示排序后的结果
+ sortedDf.show()
+
+ sortedDf.select("id", "name", "age")
+ .write.save("hdfs://hadoop102:8020/output/data_6/user_sort")
+
+ // 停止 SparkSession
+ spark.stop()
+ }
+}
diff --git a/spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala b/spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala
new file mode 100644
index 0000000..13e5ee0
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala
@@ -0,0 +1,35 @@
+package date_20250422
+
+import org.apache.spark.sql.SparkSession
+
+case class Person(id: Int, name: String, age: Int)
+
+object Dataset_Return_DateF {
+ def main(args: Array[String]): Unit = {
+ //创建或得到SparkSession
+ val spark = SparkSession.builder()
+ .appName("SparkSQLJDBC")
+ .master("local[*]")
+ .config("spark.testing.memory", "2147480000")
+ .getOrCreate()
+
+ val d1 = spark.read.textFile("hdfs://hadoop102:8020/input/data_6/person.txt")
+ import spark.implicits._
+
+ val personDataset = d1.map(line => {
+ val fields = line.split(",")
+ val id = fields(0).toInt
+ val name = fields(1)
+ val age = fields(2).toInt
+ Person(id, name, age)
+ })
+ // 将Dataset转为DataFrame
+ val pdf = personDataset.toDF()
+ // 在DataFrame上创建一个临时视图“v_person”
+ pdf.createTempView("v_person")
+ // 使用SparkSession对象执行SQL查询
+ val result = spark.sql("select * from v_person order by age desc")
+ // 调用show()方法输出结果数据
+ result.show()
+ }
+}
\ No newline at end of file
diff --git a/spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala b/spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala
new file mode 100644
index 0000000..f622270
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala
@@ -0,0 +1,36 @@
+package date_20250422
+
+import org.apache.spark.sql.SparkSession
+
+object JsonToHdfs {
+ def main(args: Array[String]): Unit = {
+ System.setProperty("HADOOP_USER_NAME", "root")
+
+ // 创建 SparkSession
+ val spark = SparkSession.builder()
+ .appName("JsonToHdfs")
+ .master("local[*]") // 设置本地运行,使用所有可用核心。根据需要调整或移除。
+ .getOrCreate()
+
+ // 读取 JSON 文件
+ val jsonFilePath = "hdfs://hadoop102:8020/input/data_6/people.json" // 替换为你的 JSON 文件路径
+ val df = spark.read.json(jsonFilePath)
+
+ // 注册临时视图以便使用 Spark SQL
+ df.createOrReplaceTempView("json_table")
+
+ // 使用 Spark SQL 执行查询
+ val queryResultDf = spark.sql("SELECT * FROM json_table") // 这里可以根据需要修改 SQL 查询
+
+ // 显示查询结果
+ queryResultDf.show()
+
+ // 将查询结果写入 HDFS
+ val hdfsOutputPath = "hdfs://hadoop102:8020/output/data_6/jsonToHdfs" // 替换为你的 HDFS 输出目录
+// queryResultDf.write.mode("append").parquet(hdfsOutputPath) // 使用 append 模式写入,避免覆盖现有数据
+ queryResultDf.write.mode("overwrite").parquet(hdfsOutputPath) // 使用 overwrite 模式写入,覆盖现有数据
+
+ // 停止 SparkSession
+ spark.stop()
+ }
+}
diff --git a/spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala b/spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala
new file mode 100644
index 0000000..7ae2036
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala
@@ -0,0 +1,31 @@
+package date_20250422
+
+import org.apache.spark.sql.SparkSession
+
+object ParquetSQLExample {
+ def main(args: Array[String]): Unit = {
+
+ System.setProperty("HADOOP_USER_NAME","root")
+
+ // 创建 SparkSession
+ val spark = SparkSession.builder()
+ .appName("Spark SQL Parquet Example")
+ .master("local[*]") // 本地运行
+ .getOrCreate()
+
+ // 读取 Parquet 文件
+ val ratingsDF = spark.read.parquet("hdfs://hadoop102:8020/input/data_6/users.parquet")
+
+ ratingsDF.show()
+ // 注册临时视图
+ ratingsDF.createOrReplaceTempView("users")
+
+
+ // 将查询结果写入到 HDFS 自定义目录
+ ratingsDF.select("name", "favorite_color", "favorite_numbers")
+ .write.save("hdfs://hadoop102:8020/output/data_6/users_output2")
+
+ // 停止 SparkSession
+ spark.stop()
+ }
+}
\ No newline at end of file
diff --git a/spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala b/spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala
new file mode 100644
index 0000000..eaa3080
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala
@@ -0,0 +1,44 @@
+package date_20250422
+
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+/** 将RDD中的数据写入到MySQL* */
+
+object SparkSQLJDBC {
+
+ def main(args: Array[String]): Unit = {
+ //创建或得到SparkSession
+ val spark = SparkSession.builder()
+ .appName("SparkSQLJDBC")
+ .master("local[*]")
+ .getOrCreate()
+ //创建存放两条学生信息的RDD
+ val studentRDD = spark.sparkContext.parallelize(
+ Array("4 xiaoming 26", "5 xiaogang 27")
+ ).map(_.split(" "))
+ //通过StructType指定每个字段的schema
+ val schema = StructType(
+ List(
+ StructField("id", IntegerType, true),
+ StructField("name", StringType, true),
+ StructField("age", IntegerType, true))
+ )
+ //将studentRDD映射为rowRDD,rowRDD中的每个元素都为一个Row对象
+ val rowRDD = studentRDD.map(line =>
+ Row(line(0).toInt, line(1).trim, line(2).toInt)
+ )
+ //建立rowRDD和schema之间的对应关系
+ val studentDF = spark.createDataFrame(rowRDD, schema)
+ //将结果追加到MySQL的student表中
+ studentDF.write.mode("append") //保存模式为追加,即在原来的表中追加数据
+ .format("jdbc")
+ //.option("url", "jdbc:mysql://192.168.182.100:3307/spark_db")
+ .option("url", "jdbc:mysql://hadoop102:3306/spark_db")
+ .option("driver", "com.mysql.cj.jdbc.Driver")
+ .option("dbtable", "student") //表名
+ .option("user", "root")
+ .option("password", "123456")
+ .save()
+ }
+}
diff --git a/spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala b/spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala
new file mode 100644
index 0000000..ee554ee
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala
@@ -0,0 +1,37 @@
+package date_20250422
+
+import java.util.Properties
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+object SparkSQLReadFromMySQL {
+ def main(args: Array[String]): Unit = {
+ // 创建 SparkSession
+ val spark = SparkSession.builder()
+ .appName("Spark SQL Read from MySQL")
+ .config("spark.master", "local") // 使用 local 模式运行
+ .getOrCreate()
+
+ // 配置 MySQL 连接信息
+ val jdbcUrl = "jdbc:mysql://hadoop102:3306/spark_db?useSSL=false"
+ val connectionProperties = new Properties()
+ connectionProperties.setProperty("user", "root")
+ connectionProperties.setProperty("password", "123456")
+ connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver") // 添加 MySQL JDBC 驱动
+
+ // 读取 MySQL 表
+ val studentDf: DataFrame = spark.read.jdbc(jdbcUrl, "student", connectionProperties)
+
+ // 显示 student 表的内容
+ studentDf.show()
+
+ // 注册临时视图
+ studentDf.createOrReplaceTempView("student")
+
+ // 使用 SQL 语句查询
+ spark.sql("SELECT * FROM student").show()
+
+ // 停止 SparkSession
+ spark.stop()
+ }
+}
diff --git a/spark-lesson/src/main/scala/date_20250422/task.sql b/spark-lesson/src/main/scala/date_20250422/task.sql
new file mode 100644
index 0000000..e224f5e
--- /dev/null
+++ b/spark-lesson/src/main/scala/date_20250422/task.sql
@@ -0,0 +1,9 @@
+show databases;
+
+create database if not exists spark_db;
+
+use spark_db;
+
+create table student (id int,name varchar(225),age int);
+
+select * from student;
\ No newline at end of file