From 5a6475fb1460250c678fc5e948eb41adc4714f6f Mon Sep 17 00:00:00 2001 From: dev_xulongjin Date: Wed, 23 Apr 2025 09:14:47 +0800 Subject: [PATCH] =?UTF-8?q?refactor(spark-lesson):=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=AE=9E=E9=AA=8C=E5=85=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spark-lesson/pom.xml | 9 ++++ .../main/scala/date_20250422/CsvSorter.scala | 39 ++++++++++++++++ .../date_20250422/Dataset_Return_DateF.scala | 35 +++++++++++++++ .../main/scala/date_20250422/JsonToHdfs.scala | 36 +++++++++++++++ .../date_20250422/ParquetSQLExample.scala | 31 +++++++++++++ .../scala/date_20250422/SparkSQLJDBC.scala | 44 +++++++++++++++++++ .../date_20250422/SparkSQLReadFromMySQL.scala | 37 ++++++++++++++++ .../src/main/scala/date_20250422/task.sql | 9 ++++ 8 files changed, 240 insertions(+) create mode 100644 spark-lesson/src/main/scala/date_20250422/CsvSorter.scala create mode 100644 spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala create mode 100644 spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala create mode 100644 spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala create mode 100644 spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala create mode 100644 spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala create mode 100644 spark-lesson/src/main/scala/date_20250422/task.sql diff --git a/spark-lesson/pom.xml b/spark-lesson/pom.xml index 435099f..6b79ccf 100644 --- a/spark-lesson/pom.xml +++ b/spark-lesson/pom.xml @@ -13,6 +13,8 @@ 8 UTF-8 2.12.2 + 8.0.33 + @@ -127,6 +129,13 @@ 2.5.1 + + + mysql + mysql-connector-java + ${mysqlconnect.version} + + diff --git a/spark-lesson/src/main/scala/date_20250422/CsvSorter.scala b/spark-lesson/src/main/scala/date_20250422/CsvSorter.scala new file mode 100644 index 0000000..f2cb034 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/CsvSorter.scala @@ -0,0 +1,39 @@ +package date_20250422 + +import org.apache.spark.sql.SparkSession + +object CsvSorter { + def main(args: Array[String]): Unit = { + + System.setProperty("HADOOP_USER_NAME", "root") + + // 创建 SparkSession + val spark = SparkSession.builder() + .appName("CSV Sorter") + .master("local[*]") // 使用本地模式,根据需要可以更改成集群模式 + .getOrCreate() + + // 读取 CSV 文件 + val csvFilePath = "hdfs://hadoop102:8020/input/data_6/person.csv" // 替换为你的 CSV 文件路径 + val df = spark.read + .option("header", "false") // CSV 文件没有头部,因此设置为 false + .option("inferSchema", "true") // 自动推断列的数据类型 + .csv(csvFilePath) + .toDF("id", "name", "age") // 指定列名 + + // 注册临时视图以便使用 Spark SQL + df.createOrReplaceTempView("csv_table") + + // 使用 Spark SQL 进行排序 + val sortedDf = spark.sql("SELECT * FROM csv_table ORDER BY name ASC") + + // 显示排序后的结果 + sortedDf.show() + + sortedDf.select("id", "name", "age") + .write.save("hdfs://hadoop102:8020/output/data_6/user_sort") + + // 停止 SparkSession + spark.stop() + } +} diff --git a/spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala b/spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala new file mode 100644 index 0000000..13e5ee0 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/Dataset_Return_DateF.scala @@ -0,0 +1,35 @@ +package date_20250422 + +import org.apache.spark.sql.SparkSession + +case class Person(id: Int, name: String, age: Int) + +object Dataset_Return_DateF { + def main(args: Array[String]): Unit = { + //创建或得到SparkSession + val spark = SparkSession.builder() + .appName("SparkSQLJDBC") + .master("local[*]") + .config("spark.testing.memory", "2147480000") + .getOrCreate() + + val d1 = spark.read.textFile("hdfs://hadoop102:8020/input/data_6/person.txt") + import spark.implicits._ + + val personDataset = d1.map(line => { + val fields = line.split(",") + val id = fields(0).toInt + val name = fields(1) + val age = fields(2).toInt + Person(id, name, age) + }) + // 将Dataset转为DataFrame + val pdf = personDataset.toDF() + // 在DataFrame上创建一个临时视图“v_person” + pdf.createTempView("v_person") + // 使用SparkSession对象执行SQL查询 + val result = spark.sql("select * from v_person order by age desc") + // 调用show()方法输出结果数据 + result.show() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala b/spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala new file mode 100644 index 0000000..f622270 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/JsonToHdfs.scala @@ -0,0 +1,36 @@ +package date_20250422 + +import org.apache.spark.sql.SparkSession + +object JsonToHdfs { + def main(args: Array[String]): Unit = { + System.setProperty("HADOOP_USER_NAME", "root") + + // 创建 SparkSession + val spark = SparkSession.builder() + .appName("JsonToHdfs") + .master("local[*]") // 设置本地运行,使用所有可用核心。根据需要调整或移除。 + .getOrCreate() + + // 读取 JSON 文件 + val jsonFilePath = "hdfs://hadoop102:8020/input/data_6/people.json" // 替换为你的 JSON 文件路径 + val df = spark.read.json(jsonFilePath) + + // 注册临时视图以便使用 Spark SQL + df.createOrReplaceTempView("json_table") + + // 使用 Spark SQL 执行查询 + val queryResultDf = spark.sql("SELECT * FROM json_table") // 这里可以根据需要修改 SQL 查询 + + // 显示查询结果 + queryResultDf.show() + + // 将查询结果写入 HDFS + val hdfsOutputPath = "hdfs://hadoop102:8020/output/data_6/jsonToHdfs" // 替换为你的 HDFS 输出目录 +// queryResultDf.write.mode("append").parquet(hdfsOutputPath) // 使用 append 模式写入,避免覆盖现有数据 + queryResultDf.write.mode("overwrite").parquet(hdfsOutputPath) // 使用 overwrite 模式写入,覆盖现有数据 + + // 停止 SparkSession + spark.stop() + } +} diff --git a/spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala b/spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala new file mode 100644 index 0000000..7ae2036 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/ParquetSQLExample.scala @@ -0,0 +1,31 @@ +package date_20250422 + +import org.apache.spark.sql.SparkSession + +object ParquetSQLExample { + def main(args: Array[String]): Unit = { + + System.setProperty("HADOOP_USER_NAME","root") + + // 创建 SparkSession + val spark = SparkSession.builder() + .appName("Spark SQL Parquet Example") + .master("local[*]") // 本地运行 + .getOrCreate() + + // 读取 Parquet 文件 + val ratingsDF = spark.read.parquet("hdfs://hadoop102:8020/input/data_6/users.parquet") + + ratingsDF.show() + // 注册临时视图 + ratingsDF.createOrReplaceTempView("users") + + + // 将查询结果写入到 HDFS 自定义目录 + ratingsDF.select("name", "favorite_color", "favorite_numbers") + .write.save("hdfs://hadoop102:8020/output/data_6/users_output2") + + // 停止 SparkSession + spark.stop() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala b/spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala new file mode 100644 index 0000000..eaa3080 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/SparkSQLJDBC.scala @@ -0,0 +1,44 @@ +package date_20250422 + +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SparkSession} + +/** 将RDD中的数据写入到MySQL* */ + +object SparkSQLJDBC { + + def main(args: Array[String]): Unit = { + //创建或得到SparkSession + val spark = SparkSession.builder() + .appName("SparkSQLJDBC") + .master("local[*]") + .getOrCreate() + //创建存放两条学生信息的RDD + val studentRDD = spark.sparkContext.parallelize( + Array("4 xiaoming 26", "5 xiaogang 27") + ).map(_.split(" ")) + //通过StructType指定每个字段的schema + val schema = StructType( + List( + StructField("id", IntegerType, true), + StructField("name", StringType, true), + StructField("age", IntegerType, true)) + ) + //将studentRDD映射为rowRDD,rowRDD中的每个元素都为一个Row对象 + val rowRDD = studentRDD.map(line => + Row(line(0).toInt, line(1).trim, line(2).toInt) + ) + //建立rowRDD和schema之间的对应关系 + val studentDF = spark.createDataFrame(rowRDD, schema) + //将结果追加到MySQL的student表中 + studentDF.write.mode("append") //保存模式为追加,即在原来的表中追加数据 + .format("jdbc") + //.option("url", "jdbc:mysql://192.168.182.100:3307/spark_db") + .option("url", "jdbc:mysql://hadoop102:3306/spark_db") + .option("driver", "com.mysql.cj.jdbc.Driver") + .option("dbtable", "student") //表名 + .option("user", "root") + .option("password", "123456") + .save() + } +} diff --git a/spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala b/spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala new file mode 100644 index 0000000..ee554ee --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/SparkSQLReadFromMySQL.scala @@ -0,0 +1,37 @@ +package date_20250422 + +import java.util.Properties + +import org.apache.spark.sql.{DataFrame, SparkSession} + +object SparkSQLReadFromMySQL { + def main(args: Array[String]): Unit = { + // 创建 SparkSession + val spark = SparkSession.builder() + .appName("Spark SQL Read from MySQL") + .config("spark.master", "local") // 使用 local 模式运行 + .getOrCreate() + + // 配置 MySQL 连接信息 + val jdbcUrl = "jdbc:mysql://hadoop102:3306/spark_db?useSSL=false" + val connectionProperties = new Properties() + connectionProperties.setProperty("user", "root") + connectionProperties.setProperty("password", "123456") + connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver") // 添加 MySQL JDBC 驱动 + + // 读取 MySQL 表 + val studentDf: DataFrame = spark.read.jdbc(jdbcUrl, "student", connectionProperties) + + // 显示 student 表的内容 + studentDf.show() + + // 注册临时视图 + studentDf.createOrReplaceTempView("student") + + // 使用 SQL 语句查询 + spark.sql("SELECT * FROM student").show() + + // 停止 SparkSession + spark.stop() + } +} diff --git a/spark-lesson/src/main/scala/date_20250422/task.sql b/spark-lesson/src/main/scala/date_20250422/task.sql new file mode 100644 index 0000000..e224f5e --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250422/task.sql @@ -0,0 +1,9 @@ +show databases; + +create database if not exists spark_db; + +use spark_db; + +create table student (id int,name varchar(225),age int); + +select * from student; \ No newline at end of file