From 2840970cb030847515be4cbef98d95b65e60feb0 Mon Sep 17 00:00:00 2001 From: dev_xulongjin Date: Sun, 20 Apr 2025 14:42:17 +0800 Subject: [PATCH] =?UTF-8?q?feat(spark-lesson):=20spark=E5=AE=9E=E9=AA=8C?= =?UTF-8?q?=E4=BA=94=E9=83=A8=E5=88=86=E5=86=85=E5=AE=B9=EF=BC=8C=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=96=B0=E7=9A=84=20Spark=20RDD=20=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增了 6 个 Spark RDD 操作示例程序 - 包括求平均分、低于 60 分的成绩、最低2 个成绩、二次排序等操作 - 示例程序展示了 Spark RDD 的基本操作和常用算子的使用方法 --- .../java/date_20250411/task_DML_select.java | 23 ++++--- .../scala/date_20250415/RDDAverageScore.scala | 65 +++++++++++++++++++ .../date_20250415/RDDBelow60Scores.scala | 57 ++++++++++++++++ .../scala/date_20250415/RDDGroupTopN.scala | 55 ++++++++++++++++ .../date_20250415/RDDLowest2Scores.scala | 57 ++++++++++++++++ .../date_20250415/RDDSecondarySort.scala | 40 ++++++++++++ .../src/main/scala/date_20250415/task2.scala | 25 +++++++ 7 files changed, 313 insertions(+), 9 deletions(-) create mode 100755 spark-lesson/src/main/scala/date_20250415/RDDAverageScore.scala create mode 100755 spark-lesson/src/main/scala/date_20250415/RDDBelow60Scores.scala create mode 100755 spark-lesson/src/main/scala/date_20250415/RDDGroupTopN.scala create mode 100755 spark-lesson/src/main/scala/date_20250415/RDDLowest2Scores.scala create mode 100755 spark-lesson/src/main/scala/date_20250415/RDDSecondarySort.scala create mode 100644 spark-lesson/src/main/scala/date_20250415/task2.scala diff --git a/hbase-lesson/src/main/java/date_20250411/task_DML_select.java b/hbase-lesson/src/main/java/date_20250411/task_DML_select.java index c3bd8a1..d101b39 100644 --- a/hbase-lesson/src/main/java/date_20250411/task_DML_select.java +++ b/hbase-lesson/src/main/java/date_20250411/task_DML_select.java @@ -11,8 +11,9 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; +import java.util.Arrays; + public class task_DML_select { public static void main(String[] args) throws Exception { @@ -27,7 +28,11 @@ public class task_DML_select { Table table = conn.getTable(TableName.valueOf("ideaPro_space:students")); - scanRows(table); +// scanRows(table); + +// getKeyValue(table,"000000"); + + getFamily(table); // 关闭连接 table.close(); @@ -40,20 +45,20 @@ public class task_DML_select { * * @param table */ - public static void getKeyValue(Table table) throws IOException { + public static void getKeyValue(Table table,String rowKey) throws IOException { - String rowKey = "000010"; +// String rowKey = "000010"; Get getParam = new Get(Bytes.toBytes(rowKey)); - getParam.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age")); + getParam.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("salary")); Result result = table.get(getParam); - byte[] value = result.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("age")); + byte[] value = result.getValue(Bytes.toBytes("extra_info"), Bytes.toBytes("salary")); - int age = Bytes.toInt(value); + String age = Bytes.toString(value); System.out.println(age); @@ -70,10 +75,10 @@ public class task_DML_select { */ public static void getFamily(Table table) throws IOException { - String rowKey = "000010"; + String rowKey = "000000"; Get getParam = new Get(Bytes.toBytes(rowKey)); - getParam.addFamily(Bytes.toBytes("base_info")); + getParam.addFamily(Bytes.toBytes("extra_info")); Result result = table.get(getParam); CellScanner cellScanner = result.cellScanner(); diff --git a/spark-lesson/src/main/scala/date_20250415/RDDAverageScore.scala b/spark-lesson/src/main/scala/date_20250415/RDDAverageScore.scala new file mode 100755 index 0000000..8a09672 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/RDDAverageScore.scala @@ -0,0 +1,65 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +/** 计算学生成绩平均分 */ +object RDDAverageScore { + + def main(args: Array[String]): Unit = { + + // 创建SparkConf对象,存储应用程序的配置信息 + val conf = new SparkConf() + .setMaster("local") // 启动本地化计算 + .setAppName("Spark-AverageScore") // 设置应用程序名称 + + // 创建SparkContext对象 + val sc = new SparkContext(conf) + + // 读取每个科目的成绩文件 + val mathRDD: RDD[String] = sc.textFile("hdfs://192.168.182.100:9000/data/math.txt") + val chineseRDD: RDD[String] = sc.textFile("hdfs://192.168.182.100:9000/data/chinese.txt") + val englishRDD: RDD[String] = sc.textFile("hdfs://192.168.182.100:9000/data/english.txt") + + // 处理每个科目RDD,将每一行数据转化为 (姓名, 分数) 二元组 + val mathScores: RDD[(String, Int)] = mathRDD.map(line => { + val Array(name, score) = line.split(" ") + (name, score.toInt) + }) + + val chineseScores: RDD[(String, Int)] = chineseRDD.map(line => { + val Array(name, score) = line.split(" ") + (name, score.toInt) + }) + + val englishScores: RDD[(String, Int)] = englishRDD.map(line => { + val Array(name, score) = line.split(" ") + (name, score.toInt) + }) + + // 合并三个RDD + val allScores: RDD[(String, Int)] = mathScores.union(chineseScores).union(englishScores) + + // 对每个学生的所有分数进行聚合 (姓名, (总分, 科目数)) + val studentScores: RDD[(String, (Int, Int))] = allScores.补充 + .reduceByKey { case ((sum1, count1), (sum2, count2)) => + (sum1 + sum2, count1 + count2) + } + + // 计算平均分 (姓名, 平均分) + val averageScores: RDD[(String, Int)] = studentScores.mapValues { case (sum, count) => + sum / count + } + + // 保存结果到HDFS +// averageScores.saveAsTextFile("hdfs://192.168.182.100:9000/output/averageScore") + + // 打印平均分结果 + averageScores.collect().foreach { case (name, avgScore) => + println(s"$name $avgScore") + } + + // 停止SparkContext + sc.stop() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250415/RDDBelow60Scores.scala b/spark-lesson/src/main/scala/date_20250415/RDDBelow60Scores.scala new file mode 100755 index 0000000..9d75348 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/RDDBelow60Scores.scala @@ -0,0 +1,57 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +/** Spark求学生低于60分的成绩程序 */ +object RDDBelow60Scores { + + def main(args: Array[String]): Unit = { + + // 创建SparkConf对象,存储应用程序的配置信息 + val conf = new SparkConf() + .setMaster("local") // 启动本地化计算 + .setAppName("Spark-Below60Scores") // 设置应用程序名称 + + // 创建SparkContext对象 + val sc = new SparkContext(conf) + + // 读取指定路径中的文件内容,生成一个RDD集合 + val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/students_scores.txt") + + // 将每一行数据转化为 (姓名, 成绩) 二元组 + val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => { + val Array(name, score) = line.split(",") + (name, score.toInt) + }) + + // 过滤出每个学生低于60分的成绩 + val below60: RDD[(String, Iterable[Int])] = tupleRDD + .filter(_._2 < 60) + .groupByKey() + + // 对姓名进行排序并输出结果 + val sortedBelow60: RDD[(String, Iterable[Int])] = below60.sortBy(_._1) // 按名字进行排序 + + + sortedBelow60 + .collect() + .foreach( + x => { + println("姓名:" + x._1) + x._2.foreach( + y => { + println("成绩:" + y) + } + ) + println("*" * 20) + } + ) + + // 保存结果到HDFS + sortedBelow60.saveAsTextFile("hdfs://hadoop102:8020/output/below60Scores") + + // 停止SparkContext + sc.stop() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250415/RDDGroupTopN.scala b/spark-lesson/src/main/scala/date_20250415/RDDGroupTopN.scala new file mode 100755 index 0000000..4eda5dd --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/RDDGroupTopN.scala @@ -0,0 +1,55 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +/** Spark求学生成绩Top N程序 */ +object RDDGroupTopN { + + def main(args: Array[String]): Unit = { + + // 创建SparkConf对象,存储应用程序的配置信息 + val conf: SparkConf = new SparkConf() + .setMaster("local") // 启动本地化计算 + .setAppName("Spark-GroupTopN") // 设置应用程序名称 + + // 创建SparkContext对象 + val sc = new SparkContext(conf) + + // 读取指定路径中的文件内容,生成一个RDD集合 + val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/score.txt") + + // 将每一行数据转化为 (姓名, 成绩) 二元组 + val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => { + val Array(name, score) = line.split(",") + (name, score.toInt) + }) + + // 对每个学生分数进行分组,排序后取前3个成绩 + val top3: RDD[(String, List[Int])] = tupleRDD.groupByKey().map { case (name, scores) => + val scoreTop3: List[Int] = scores.toList.sortWith(_ > _).take(3) + (name, scoreTop3) + } + + // 对姓名进行排序并输出结果 + val sortedTop3: RDD[(String, List[Int])] = top3.sortBy(_._1) // 按名字进行排序 + + // 保存结果到HDFS + sortedTop3.collect().foreach( + x => { + println("姓名:" + x._1) + x._2.foreach( + y => { + println("成绩:" + y) + } + ) + println("*" * 20) + } + ) + + sortedTop3.saveAsTextFile("hdfs://hadoop102:8020/output/result9") + + // 停止SparkContext + sc.stop() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250415/RDDLowest2Scores.scala b/spark-lesson/src/main/scala/date_20250415/RDDLowest2Scores.scala new file mode 100755 index 0000000..06ad439 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/RDDLowest2Scores.scala @@ -0,0 +1,57 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +/** Spark求学生最低2个成绩程序 */ +object RDDLowest2Scores { + + def main(args: Array[String]): Unit = { + + // 创建SparkConf对象,存储应用程序的配置信息 + val conf = new SparkConf() + .setMaster("local") // 启动本地化计算 + .setAppName("Spark-Lowest2Scores") // 设置应用程序名称 + + // 创建SparkContext对象 + val sc = new SparkContext(conf) + + // 读取指定路径中的文件内容,生成一个RDD集合 + val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/students_scores.txt") + + // 将每一行数据转化为 (姓名, 成绩) 二元组 + val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => { + val Array(name, score) = line.split(",") + (name, score.toInt) + }) + + // 对每个学生分数进行分组,排序后取最低的前2个成绩 + val sortedLowest2: RDD[(String, List[Int])] = tupleRDD + .groupByKey() + .map { case (x, y) => + val lastTop2: List[Int] = y.toList.sortWith(_ < _).take(2) + (x, lastTop2) + } + .sortBy(_._1) + + sortedLowest2 + .collect() + .foreach( + x => { + println("姓名:" + x._1) + x._2.foreach( + y => { + println("成绩:" + y) + } + ) + println("*" * 20) + } + ) + + // 保存结果到HDFS + sortedLowest2.saveAsTextFile("hdfs://hadoop102:8020/output/lowest2Scores") + + // 停止SparkContext + sc.stop() + } +} diff --git a/spark-lesson/src/main/scala/date_20250415/RDDSecondarySort.scala b/spark-lesson/src/main/scala/date_20250415/RDDSecondarySort.scala new file mode 100755 index 0000000..f71ce12 --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/RDDSecondarySort.scala @@ -0,0 +1,40 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +/** Spark二次排序程序 */ +object RDDSecondarySort { + + def main(args: Array[String]): Unit = { + + // 创建SparkConf对象,存储应用程序的配置信息 + val conf = new SparkConf() + .setMaster("local") // 启动本地化计算 + .setAppName("Spark-SecondarySort") // 设置应用程序名称 + + // 创建SparkContext对象 + val sc = new SparkContext(conf) + + // 读取指定路径中的文件内容,生成一个RDD集合 + val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/sort_data.txt") + + // 将每一行数据转化为 (第一列, 第二列) 二元组 + val tupleRDD: RDD[(Int, Int)] = linesRDD.map(line => { + val Array(col1, col2) = line.split(" ").map(_.toInt) + (col1, col2) + }) + + // 进行二次排序,首先按第一列升序,如果第一列相同,按第二列降序 + val sortedRDD: RDD[(Int, Int)] = tupleRDD.sortBy(x => (x._1, -x._2)) + + // 保存排序结果到HDFS + sortedRDD.saveAsTextFile("hdfs://hadoop102:8020/output/secondarySort") + + // 打印排序后的结果 + sortedRDD.collect().foreach(x => println(x._1 + " " + x._2)) + + // 停止SparkContext + sc.stop() + } +} \ No newline at end of file diff --git a/spark-lesson/src/main/scala/date_20250415/task2.scala b/spark-lesson/src/main/scala/date_20250415/task2.scala new file mode 100644 index 0000000..457612c --- /dev/null +++ b/spark-lesson/src/main/scala/date_20250415/task2.scala @@ -0,0 +1,25 @@ +package date_20250415 + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +object task2 { + def main(args: Array[String]): Unit = { + val conf = new SparkConf().setMaster("local") + .setAppName("Spark-WordCount") + + val sc = new SparkContext(conf) + val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/sort_data.txt") + + linesRDD + .map(x => { + val arr: Array[String] = x.split(" ") + (arr(0).toInt, arr(1).toInt) + }) + .sortBy(_._1) + .collect() + .foreach(println) + + } + +}