初始化
This commit is contained in:
7
spark-lesson/src/main/java/org/example/Main.java
Normal file
7
spark-lesson/src/main/java/org/example/Main.java
Normal file
@@ -0,0 +1,7 @@
|
||||
package org.example;
|
||||
|
||||
public class Main {
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Hello, World!");
|
||||
}
|
||||
}
|
||||
22
spark-lesson/src/main/resources/log4j.properties
Executable file
22
spark-lesson/src/main/resources/log4j.properties
Executable file
@@ -0,0 +1,22 @@
|
||||
log4j.rootCategory=ERROR,console
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.target=System.err
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|
||||
|
||||
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
|
||||
# log level for this class is used to overwrite the root logger's log level, so that
|
||||
# the user can have different defaults for the shell and regular Spark apps.
|
||||
log4j.logger.org.apache.spark.repl.Main=ERROR
|
||||
|
||||
# Settings to quiet third party logs that are too verbose
|
||||
log4j.logger.org.spark_project.jetty=ERROR
|
||||
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
|
||||
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
|
||||
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
|
||||
log4j.logger.org.apache.parquet=ERROR
|
||||
log4j.logger.parquet=ERROR
|
||||
|
||||
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
|
||||
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
|
||||
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
|
||||
5
spark-lesson/src/main/scala/date_20250401/data/word
Normal file
5
spark-lesson/src/main/scala/date_20250401/data/word
Normal file
@@ -0,0 +1,5 @@
|
||||
hadoop hadoop java
|
||||
python spark
|
||||
flink java
|
||||
flink mysql
|
||||
java spark
|
||||
86
spark-lesson/src/main/scala/date_20250401/task.scala
Normal file
86
spark-lesson/src/main/scala/date_20250401/task.scala
Normal file
@@ -0,0 +1,86 @@
|
||||
package date_20250401
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
|
||||
object task {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf: SparkConf = new SparkConf().setMaster("local") // 启动本地化计算,默认1个CPU核数
|
||||
.setAppName("testRdd")
|
||||
// 设置本程序名称
|
||||
|
||||
// Spark程序的编写都是从SparkContext开始的
|
||||
val sc = new SparkContext(conf)
|
||||
|
||||
// 1、map算子:现有一个序列Seq(2, 3, 4, 5,6),请使用map算子,序列每个元素乘以一个数,这个数是你学号最后两位数,然后打印结果。
|
||||
println("----1----")
|
||||
// val data1: RDD[Int] = sc.parallelize(Seq(2, 3, 4, 5, 6))
|
||||
// data1.map(_ * 28).foreach(println)
|
||||
|
||||
// 2、flatMap算子:现有一个序列Seq("Hello lily", "Hello lucy", "Hello 你的名字拼音"),请使用flatMap算子,按空格切分单词,输出一个新的序列
|
||||
println("----2----")
|
||||
val data2: RDD[String] = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello xulongjin"))
|
||||
data2.flatMap(_.split(" ")).foreach(println)
|
||||
|
||||
// 3、filter算子:现有一个序列Seq(4, 5,62, 31, 4, 50,6),请使用filter算子,过滤大于10的元素,输出一个新的序列
|
||||
println("----3----")
|
||||
val data3: RDD[Int] = sc.parallelize(Seq(4, 5, 62, 31, 4, 50, 6))
|
||||
data3.filter(_ > 10).foreach(println)
|
||||
|
||||
// 4、mapValues算子:现有一个序列Seq(("a",11), ("b",21), ("c",43)),请使用mapValues算子,每个Value值乘以20,输出一个新的序列
|
||||
println("----4----")
|
||||
val data4: RDD[(String, Int)] = sc.parallelize(Seq(("a", 11), ("b", 21), ("c", 43)))
|
||||
data4.mapValues(_ * 20).foreach(println)
|
||||
|
||||
// 5、sample算子:现有一个序列Seq(1,2,3,4,5,6,7,8,9,10),请使用sample算子进行抽样,抽样参数自定义,输出一个新的序列
|
||||
println("----5----")
|
||||
val data5: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
|
||||
data5.sample(withReplacement = true, 0.7, 3).foreach(println)
|
||||
|
||||
// 6、union算子:请自定义两个序列,然后使用union算子,输出一个新的序列
|
||||
println("----6----")
|
||||
val data6_1: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5))
|
||||
val data6_2: RDD[Int] = sc.parallelize(Seq(6, 7, 8, 9, 10))
|
||||
|
||||
val data6_re: RDD[Int] = data6_1.union(data6_2)
|
||||
data6_re.foreach(println)
|
||||
|
||||
// 7、intersection算子:请自定义两个序列,然后使用intersection算子,求它们的交集,输出一个新的序列
|
||||
println("----7----")
|
||||
val data7_1: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8))
|
||||
val data7_2: RDD[Int] = sc.parallelize(Seq(2, 4, 6, 8, 10))
|
||||
data7_1.intersection(data7_2).foreach(println)
|
||||
|
||||
|
||||
// 8、distinct算子:请自定义一个序列,然后使用distinct算子,输出一个新的序列
|
||||
println("----8----")
|
||||
val data8: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 2, 4, 6, 8, 10))
|
||||
data8.distinct().foreach(println)
|
||||
|
||||
// 9、reduceByKey算子:现有一个序列Seq(("a",1), ("b",1), ("c",1), ("a",1), ("d",1), ("c",1)),然后使用reduceByKey算子,输出一个新的序列
|
||||
println("----9----")
|
||||
val data9: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("b", 1), ("c", 1), ("a", 1), ("d", 1), ("c", 1)))
|
||||
data9.reduceByKey(_ + _).foreach(println)
|
||||
|
||||
// 10、groupByKey算子:现有一个序列Seq(("a",1), ("b",1), ("c",1), ("a",1), ("d",1), ("c",1)),然后使用groupByKey算子,输出一个新的序列
|
||||
println("----10----")
|
||||
val data10: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("b", 1), ("c", 1), ("a", 1), ("d", 1), ("c", 1)))
|
||||
data10.groupByKey().foreach(println)
|
||||
|
||||
// 11、join算子:请自定义两个map结构的序列,然后使用join算子,输出一个新的序列
|
||||
println("----11----")
|
||||
val data11_1: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
|
||||
val data11_2: RDD[(String, Int)] = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))
|
||||
data11_1.join(data11_2).foreach(println)
|
||||
|
||||
// 12、sortBy算子:现有一个序列Seq(("e", 4),("a", 3), ("b", 2), ("c", 1), ("d", 5)),然后使用sortBy算子,分别按照key值和value值排序,输出一个新的序列
|
||||
println("----12----")
|
||||
val data12: RDD[(String, Int)] = sc.parallelize(Seq(("e", 4), ("a", 3), ("b", 2), ("c", 1), ("d", 5)))
|
||||
data12.sortBy(_._1).foreach(println)
|
||||
println("----------")
|
||||
data12.sortBy(_._2, false).foreach(println)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
22
spark-lesson/src/main/scala/date_20250401/task2.scala
Normal file
22
spark-lesson/src/main/scala/date_20250401/task2.scala
Normal file
@@ -0,0 +1,22 @@
|
||||
package date_20250401
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
|
||||
object task2 {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("testRdd")
|
||||
|
||||
val sc = new SparkContext(conf)
|
||||
|
||||
val data: RDD[String] = sc.textFile("/Volumes/Data/04CodeData/gcc-project-25-2/spark-lesson/src/main/scala/date_20250401/data/word")
|
||||
|
||||
data.flatMap(_.split(" "))
|
||||
.map(x => (x, 1))
|
||||
.reduceByKey(_ + _)
|
||||
.sortBy(_._2, false)
|
||||
.collect()
|
||||
.foreach(println)
|
||||
|
||||
}
|
||||
}
|
||||
30
spark-lesson/src/main/scala/date_20250408/WCLocalFile.scala
Normal file
30
spark-lesson/src/main/scala/date_20250408/WCLocalFile.scala
Normal file
@@ -0,0 +1,30 @@
|
||||
package date_20250408
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
|
||||
object WCLocalFile {
|
||||
def main(args: Array[String]) {
|
||||
/**
|
||||
* SparkContext 的初始化需要一个SparkConf对象
|
||||
* SparkConf包含了Spark集群的配置的各种参数 */
|
||||
val conf = new SparkConf().setMaster("local")
|
||||
//启动本地化计算,默认1个CPU核数
|
||||
//.setMaster("local[2]")//启动本地化计算,2个CPU核数
|
||||
//.setMaster("local[*]")//启动本地化计算,所有CPU核数
|
||||
.setAppName("Spark-WordCount")
|
||||
//设置本程序名称
|
||||
//Spark程序的编写都是从SparkContext开始的
|
||||
val sc = new SparkContext(conf)
|
||||
val data = sc.textFile("spark-lesson/src/main/scala/date_20250408/data/README.md")
|
||||
//读取本地文件
|
||||
val result: RDD[(String, Int)] = data.flatMap(_.split(" "))
|
||||
//下划线是占位符,flatMap是对行操作的方法,对读入的数据进行分割
|
||||
.map((_, 1))
|
||||
//将每一项转换为key-value,数据是key,value是1
|
||||
.reduceByKey(_ + _)
|
||||
//将具有相同key的项相加合并成一个
|
||||
//保存结果在本地文件
|
||||
result.saveAsTextFile("spark-lesson/out/result")
|
||||
}
|
||||
}
|
||||
31
spark-lesson/src/main/scala/date_20250408/WordCount.scala
Normal file
31
spark-lesson/src/main/scala/date_20250408/WordCount.scala
Normal file
@@ -0,0 +1,31 @@
|
||||
package date_20250408
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
|
||||
/** Spark单词计数程序* */
|
||||
object WordCount {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
//创建SparkConf对象,存储应用程序的配置信息
|
||||
val conf = new SparkConf() //设置应用程序名称,可以在Spark Web UI中显示
|
||||
conf.setAppName("Spark-WordCount") //设置集群Master节点访问地址
|
||||
//conf.setMaster("local");
|
||||
conf.setMaster("spark://hadoop102:7077")
|
||||
|
||||
//创建SparkContext对象,该对象是提交Spark应用程序的入口
|
||||
val sc = new SparkContext(conf);
|
||||
//读取指定路径(取程序执行时传入的第一个参数)中的文件内容,生成一个RDD集合
|
||||
val linesRDD: RDD[String] = sc.textFile(args(0))
|
||||
//将RDD的每个元素按照空格进行拆分并将结果合并为一个新的RDD
|
||||
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
|
||||
//将RDD中的每个单词和数字1放到一个元组里,即(word,1)
|
||||
val paresRDD: RDD[(String, Int)] = wordsRDD.map((_, 1))
|
||||
//对单词根据key进行聚合,对相同的key进行value的累加
|
||||
val wordCountsRDD: RDD[(String, Int)] = paresRDD.reduceByKey(_ + _)
|
||||
//按照单词数量降序排列
|
||||
val wordCountsSortRDD: RDD[(String, Int)] = wordCountsRDD.sortBy(_._2, false) //保存结果到指定的路径(取程序执行时传入的第二个参数)
|
||||
wordCountsSortRDD.saveAsTextFile(args(1)) //停止SparkContext,结束该任务
|
||||
sc.stop();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package date_20250408
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
|
||||
/** Spark单词计数程序* */
|
||||
object WordCountHdfsFile {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
System.setProperty("HADOOP_USER_NAME", "root")
|
||||
//创建SparkConf对象,存储应用程序的配置信息
|
||||
val conf = new SparkConf().setMaster("local")
|
||||
//启动本地化计算,默认1个CPU核数
|
||||
.setAppName("Spark-WordCount")
|
||||
//创建SparkContext对象,该对象是提交Spark应用程序的入口
|
||||
val sc = new SparkContext(conf);
|
||||
//读取指定路径(取程序执行时传入的第一个参数)中的文件内容,生成一个RDD集合
|
||||
//val linesRDD: RDD[String] = sc.textFile(args(0))
|
||||
val linesRDD: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input/wordcount2.txt")
|
||||
//将RDD的每个元素按照空格进行拆分并将结果合并为一个新的RDD
|
||||
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
|
||||
//将RDD中的每个单词和数字1放到一个元组里,即(word,1)
|
||||
val paresRDD: RDD[(String, Int)] = wordsRDD.map((_, 1))
|
||||
//对单词根据key进行聚合,对相同的key进行value的累加
|
||||
val wordCountsRDD: RDD[(String, Int)] = paresRDD.reduceByKey(_ + _)
|
||||
//按照单词数量降序排列
|
||||
val wordCountsSortRDD: RDD[(String, Int)] = wordCountsRDD.sortBy(_._2, false) //保存结果到指定的路径(取程序执行时传入的第二个参数)
|
||||
//wordCountsSortRDD.saveAsTextFile(args(1))
|
||||
wordCountsSortRDD.saveAsTextFile("hdfs://hadoop102:8020/output/result1") //停止SparkContext,结束该任务
|
||||
sc.stop();
|
||||
}
|
||||
}
|
||||
125
spark-lesson/src/main/scala/date_20250408/data/README.md
Executable file
125
spark-lesson/src/main/scala/date_20250408/data/README.md
Executable file
@@ -0,0 +1,125 @@
|
||||
# Apache Spark
|
||||
|
||||
Spark is a unified analytics engine for large-scale data processing. It provides
|
||||
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
|
||||
supports general computation graphs for data analysis. It also supports a
|
||||
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
|
||||
pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing,
|
||||
and Structured Streaming for stream processing.
|
||||
|
||||
<https://spark.apache.org/>
|
||||
|
||||
[](https://github.com/apache/spark/actions/workflows/build_main.yml)
|
||||
[](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark)
|
||||
[](https://codecov.io/gh/apache/spark)
|
||||
[](https://pypi.org/project/pyspark/)
|
||||
|
||||
|
||||
## Online Documentation
|
||||
|
||||
You can find the latest Spark documentation, including a programming
|
||||
guide, on the [project web page](https://spark.apache.org/documentation.html).
|
||||
This README file only contains basic setup instructions.
|
||||
|
||||
## Building Spark
|
||||
|
||||
Spark is built using [Apache Maven](https://maven.apache.org/).
|
||||
To build Spark and its example programs, run:
|
||||
|
||||
```bash
|
||||
./build/mvn -DskipTests clean package
|
||||
```
|
||||
|
||||
(You do not need to do this if you downloaded a pre-built package.)
|
||||
|
||||
More detailed documentation is available from the project site, at
|
||||
["Building Spark"](https://spark.apache.org/docs/latest/building-spark.html).
|
||||
|
||||
For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](https://spark.apache.org/developer-tools.html).
|
||||
|
||||
## Interactive Scala Shell
|
||||
|
||||
The easiest way to start using Spark is through the Scala shell:
|
||||
|
||||
```bash
|
||||
./bin/spark-shell
|
||||
```
|
||||
|
||||
Try the following command, which should return 1,000,000,000:
|
||||
|
||||
```scala
|
||||
scala> spark.range(1000 * 1000 * 1000).count()
|
||||
```
|
||||
|
||||
## Interactive Python Shell
|
||||
|
||||
Alternatively, if you prefer Python, you can use the Python shell:
|
||||
|
||||
```bash
|
||||
./bin/pyspark
|
||||
```
|
||||
|
||||
And run the following command, which should also return 1,000,000,000:
|
||||
|
||||
```python
|
||||
>>> spark.range(1000 * 1000 * 1000).count()
|
||||
```
|
||||
|
||||
## Example Programs
|
||||
|
||||
Spark also comes with several sample programs in the `examples` directory.
|
||||
To run one of them, use `./bin/run-example <class> [params]`. For example:
|
||||
|
||||
```bash
|
||||
./bin/run-example SparkPi
|
||||
```
|
||||
|
||||
will run the Pi example locally.
|
||||
|
||||
You can set the MASTER environment variable when running examples to submit
|
||||
examples to a cluster. This can be a mesos:// or spark:// URL,
|
||||
"yarn" to run on YARN, and "local" to run
|
||||
locally with one thread, or "local[N]" to run locally with N threads. You
|
||||
can also use an abbreviated class name if the class is in the `examples`
|
||||
package. For instance:
|
||||
|
||||
```bash
|
||||
MASTER=spark://host:7077 ./bin/run-example SparkPi
|
||||
```
|
||||
|
||||
Many of the example programs print usage help if no params are given.
|
||||
|
||||
## Running Tests
|
||||
|
||||
Testing first requires [building Spark](#building-spark). Once Spark is built, tests
|
||||
can be run using:
|
||||
|
||||
```bash
|
||||
./dev/run-tests
|
||||
```
|
||||
|
||||
Please see the guidance on how to
|
||||
[run tests for a module, or individual tests](https://spark.apache.org/developer-tools.html#individual-tests).
|
||||
|
||||
There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md
|
||||
|
||||
## A Note About Hadoop Versions
|
||||
|
||||
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
|
||||
storage systems. Because the protocols have changed in different versions of
|
||||
Hadoop, you must build Spark against the same version that your cluster runs.
|
||||
|
||||
Please refer to the build documentation at
|
||||
["Specifying the Hadoop Version and Enabling YARN"](https://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version-and-enabling-yarn)
|
||||
for detailed guidance on building for a particular distribution of Hadoop, including
|
||||
building for particular Hive and Hive Thriftserver distributions.
|
||||
|
||||
## Configuration
|
||||
|
||||
Please refer to the [Configuration Guide](https://spark.apache.org/docs/latest/configuration.html)
|
||||
in the online documentation for an overview on how to configure Spark.
|
||||
|
||||
## Contributing
|
||||
|
||||
Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html)
|
||||
for information on how to get started contributing to the project.
|
||||
5
spark-lesson/src/main/scala/date_20250408/data/wordcount2.txt
Executable file
5
spark-lesson/src/main/scala/date_20250408/data/wordcount2.txt
Executable file
@@ -0,0 +1,5 @@
|
||||
Hadoop Common The common utilities that support the other Hadoop modules
|
||||
Hadoop Distributed File System HDFS A distributed file system that provides high-throughput access to application data
|
||||
Hadoop YARN A framework for job scheduling and cluster resource management
|
||||
Hadoop MapReduce A YARN-based system for parallel processing of large data sets
|
||||
Who Uses Hadoop
|
||||
6
spark-lesson/src/main/scala/test.scala
Normal file
6
spark-lesson/src/main/scala/test.scala
Normal file
@@ -0,0 +1,6 @@
|
||||
object test {
|
||||
def main(args: Array[String]): Unit = {
|
||||
println("Hello world")
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user