feat(spark-lesson): spark实验五部分内容,添加新的 Spark RDD 操作示例

- 新增了 6 个 Spark RDD 操作示例程序
- 包括求平均分、低于 60 分的成绩、最低2 个成绩、二次排序等操作
- 示例程序展示了 Spark RDD 的基本操作和常用算子的使用方法
This commit is contained in:
dev_xulongjin 2025-04-20 14:42:17 +08:00
parent 630e102a52
commit 2840970cb0
7 changed files with 313 additions and 9 deletions

View File

@ -11,8 +11,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Arrays;
public class task_DML_select {
public static void main(String[] args) throws Exception {
@ -27,7 +28,11 @@ public class task_DML_select {
Table table = conn.getTable(TableName.valueOf("ideaPro_space:students"));
scanRows(table);
// scanRows(table);
// getKeyValue(table,"000000");
getFamily(table);
// 关闭连接
table.close();
@ -40,20 +45,20 @@ public class task_DML_select {
*
* @param table
*/
public static void getKeyValue(Table table) throws IOException {
public static void getKeyValue(Table table,String rowKey) throws IOException {
String rowKey = "000010";
// String rowKey = "000010";
Get getParam = new Get(Bytes.toBytes(rowKey));
getParam.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"));
getParam.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("salary"));
Result result = table.get(getParam);
byte[] value = result.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("age"));
byte[] value = result.getValue(Bytes.toBytes("extra_info"), Bytes.toBytes("salary"));
int age = Bytes.toInt(value);
String age = Bytes.toString(value);
System.out.println(age);
@ -70,10 +75,10 @@ public class task_DML_select {
*/
public static void getFamily(Table table) throws IOException {
String rowKey = "000010";
String rowKey = "000000";
Get getParam = new Get(Bytes.toBytes(rowKey));
getParam.addFamily(Bytes.toBytes("base_info"));
getParam.addFamily(Bytes.toBytes("extra_info"));
Result result = table.get(getParam);
CellScanner cellScanner = result.cellScanner();

View File

@ -0,0 +1,65 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** 计算学生成绩平均分 */
object RDDAverageScore {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象存储应用程序的配置信息
val conf = new SparkConf()
.setMaster("local") // 启动本地化计算
.setAppName("Spark-AverageScore") // 设置应用程序名称
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取每个科目的成绩文件
val mathRDD: RDD[String] = sc.textFile("hdfs://192.168.182.100:9000/data/math.txt")
val chineseRDD: RDD[String] = sc.textFile("hdfs://192.168.182.100:9000/data/chinese.txt")
val englishRDD: RDD[String] = sc.textFile("hdfs://192.168.182.100:9000/data/english.txt")
// 处理每个科目RDD将每一行数据转化为 (姓名, 分数) 二元组
val mathScores: RDD[(String, Int)] = mathRDD.map(line => {
val Array(name, score) = line.split(" ")
(name, score.toInt)
})
val chineseScores: RDD[(String, Int)] = chineseRDD.map(line => {
val Array(name, score) = line.split(" ")
(name, score.toInt)
})
val englishScores: RDD[(String, Int)] = englishRDD.map(line => {
val Array(name, score) = line.split(" ")
(name, score.toInt)
})
// 合并三个RDD
val allScores: RDD[(String, Int)] = mathScores.union(chineseScores).union(englishScores)
// 对每个学生的所有分数进行聚合 (姓名, (总分, 科目数))
val studentScores: RDD[(String, (Int, Int))] = allScores.补充
.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
(sum1 + sum2, count1 + count2)
}
// 计算平均分 (姓名, 平均分)
val averageScores: RDD[(String, Int)] = studentScores.mapValues { case (sum, count) =>
sum / count
}
// 保存结果到HDFS
// averageScores.saveAsTextFile("hdfs://192.168.182.100:9000/output/averageScore")
// 打印平均分结果
averageScores.collect().foreach { case (name, avgScore) =>
println(s"$name $avgScore")
}
// 停止SparkContext
sc.stop()
}
}

View File

@ -0,0 +1,57 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** Spark求学生低于60分的成绩程序 */
object RDDBelow60Scores {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象存储应用程序的配置信息
val conf = new SparkConf()
.setMaster("local") // 启动本地化计算
.setAppName("Spark-Below60Scores") // 设置应用程序名称
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取指定路径中的文件内容生成一个RDD集合
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/students_scores.txt")
// 将每一行数据转化为 (姓名, 成绩) 二元组
val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => {
val Array(name, score) = line.split(",")
(name, score.toInt)
})
// 过滤出每个学生低于60分的成绩
val below60: RDD[(String, Iterable[Int])] = tupleRDD
.filter(_._2 < 60)
.groupByKey()
// 对姓名进行排序并输出结果
val sortedBelow60: RDD[(String, Iterable[Int])] = below60.sortBy(_._1) // 按名字进行排序
sortedBelow60
.collect()
.foreach(
x => {
println("姓名:" + x._1)
x._2.foreach(
y => {
println("成绩:" + y)
}
)
println("*" * 20)
}
)
// 保存结果到HDFS
sortedBelow60.saveAsTextFile("hdfs://hadoop102:8020/output/below60Scores")
// 停止SparkContext
sc.stop()
}
}

View File

@ -0,0 +1,55 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** Spark求学生成绩Top N程序 */
object RDDGroupTopN {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象存储应用程序的配置信息
val conf: SparkConf = new SparkConf()
.setMaster("local") // 启动本地化计算
.setAppName("Spark-GroupTopN") // 设置应用程序名称
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取指定路径中的文件内容生成一个RDD集合
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/score.txt")
// 将每一行数据转化为 (姓名, 成绩) 二元组
val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => {
val Array(name, score) = line.split(",")
(name, score.toInt)
})
// 对每个学生分数进行分组排序后取前3个成绩
val top3: RDD[(String, List[Int])] = tupleRDD.groupByKey().map { case (name, scores) =>
val scoreTop3: List[Int] = scores.toList.sortWith(_ > _).take(3)
(name, scoreTop3)
}
// 对姓名进行排序并输出结果
val sortedTop3: RDD[(String, List[Int])] = top3.sortBy(_._1) // 按名字进行排序
// 保存结果到HDFS
sortedTop3.collect().foreach(
x => {
println("姓名:" + x._1)
x._2.foreach(
y => {
println("成绩:" + y)
}
)
println("*" * 20)
}
)
sortedTop3.saveAsTextFile("hdfs://hadoop102:8020/output/result9")
// 停止SparkContext
sc.stop()
}
}

View File

@ -0,0 +1,57 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** Spark求学生最低2个成绩程序 */
object RDDLowest2Scores {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象存储应用程序的配置信息
val conf = new SparkConf()
.setMaster("local") // 启动本地化计算
.setAppName("Spark-Lowest2Scores") // 设置应用程序名称
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取指定路径中的文件内容生成一个RDD集合
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/students_scores.txt")
// 将每一行数据转化为 (姓名, 成绩) 二元组
val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => {
val Array(name, score) = line.split(",")
(name, score.toInt)
})
// 对每个学生分数进行分组排序后取最低的前2个成绩
val sortedLowest2: RDD[(String, List[Int])] = tupleRDD
.groupByKey()
.map { case (x, y) =>
val lastTop2: List[Int] = y.toList.sortWith(_ < _).take(2)
(x, lastTop2)
}
.sortBy(_._1)
sortedLowest2
.collect()
.foreach(
x => {
println("姓名:" + x._1)
x._2.foreach(
y => {
println("成绩:" + y)
}
)
println("*" * 20)
}
)
// 保存结果到HDFS
sortedLowest2.saveAsTextFile("hdfs://hadoop102:8020/output/lowest2Scores")
// 停止SparkContext
sc.stop()
}
}

View File

@ -0,0 +1,40 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** Spark二次排序程序 */
object RDDSecondarySort {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象存储应用程序的配置信息
val conf = new SparkConf()
.setMaster("local") // 启动本地化计算
.setAppName("Spark-SecondarySort") // 设置应用程序名称
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取指定路径中的文件内容生成一个RDD集合
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/sort_data.txt")
// 将每一行数据转化为 (第一列, 第二列) 二元组
val tupleRDD: RDD[(Int, Int)] = linesRDD.map(line => {
val Array(col1, col2) = line.split(" ").map(_.toInt)
(col1, col2)
})
// 进行二次排序首先按第一列升序如果第一列相同按第二列降序
val sortedRDD: RDD[(Int, Int)] = tupleRDD.sortBy(x => (x._1, -x._2))
// 保存排序结果到HDFS
sortedRDD.saveAsTextFile("hdfs://hadoop102:8020/output/secondarySort")
// 打印排序后的结果
sortedRDD.collect().foreach(x => println(x._1 + " " + x._2))
// 停止SparkContext
sc.stop()
}
}

View File

@ -0,0 +1,25 @@
package date_20250415
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object task2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local")
.setAppName("Spark-WordCount")
val sc = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/data/sort_data.txt")
linesRDD
.map(x => {
val arr: Array[String] = x.split(" ")
(arr(0).toInt, arr(1).toInt)
})
.sortBy(_._1)
.collect()
.foreach(println)
}
}