feat(hbase-lesson): 添加新模块 hbase-20250509

- 新增 hbase-20250509 模块用于 HBase 课程项目
- 实现了流量统计和词频统计两个 MapReduce 任务
- 添加了相应的 Mapper、Reducer 和 Driver 类- 创建了输入样例文件- 配置了 Maven 依赖
This commit is contained in:
dev_xulongjin 2025-05-12 09:07:29 +08:00
parent 6ca57600c0
commit b2f5b0e40b
19 changed files with 526 additions and 0 deletions

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.vscoder</groupId>
<artifactId>hbase-lesson</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>hbase-20250509</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,37 @@
package FlowCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "FlowCount");
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount/input/"));
FileOutputFormat.setOutputPath(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount/output/"));
job.waitForCompletion(true);
}
}

View File

@ -0,0 +1,25 @@
package FlowCount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] oneLine = value.toString().split("\\s+");
try {
long upFlow = Long.parseLong(oneLine[1]);
long downFlow = Long.parseLong(oneLine[2]);
long sumFlow = upFlow + downFlow;
context.write(new Text(oneLine[0]), new LongWritable(sumFlow));
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,20 @@
package FlowCount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count =0;
for(LongWritable value_i: values) {
count+=value_i.get();
}
context.write(key, new LongWritable(count));
}
}

View File

@ -0,0 +1,7 @@
手机号 上行流量 下行流量
13726230501 200 1100
13396230502 300 1200
13897230503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200

View File

@ -0,0 +1,77 @@
package FlowCount2;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//implements关键字用于接口的实现,接口定义一个类应该如何操作的蓝图
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean(){
}
public FlowBean(long upFlow, long downFlow){
this.upFlow = upFlow;
this.downFlow =downFlow;
this.sumFlow = upFlow+downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
//写的方法 序列化方式 Java原始数据类型写入到字节流
public void write(DataOutput out) throws IOException {
out.writeLong(this.upFlow);
out.writeLong(this.downFlow);
out.writeLong(this.sumFlow);
}
//读的方法 反序列化 从字节流中读取Java原始数据类型的值
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow =in.readLong();
}
// Object 类的 toString() 方法返回的格式是 ClassName@hashcode
// 重写 toString() 方法
// 当MapReduce框架需要将 FlowBean 对象转换为字符串形式会调用自定义的 toString() 方法
@Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}
}

View File

@ -0,0 +1,35 @@
package FlowCount2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "FlowBean");
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount2/input/"));
FileOutputFormat.setOutputPath(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount2/output/"));
job.waitForCompletion(true);
}
}

View File

@ -0,0 +1,26 @@
package FlowCount2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] oneLine = value.toString().split("\\s+");
try {
long upFlow = Long.parseLong(oneLine[1]);
long downFlow = Long.parseLong(oneLine[2]);
long sumFlow = upFlow + downFlow;
context.write(new Text(oneLine[0]), new FlowBean(upFlow, downFlow));
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,21 @@
package FlowCount2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpFLow =0;
long sumDownFLow =0;
for(FlowBean value_i: values) {
sumUpFLow+=value_i.getUpFlow();
sumDownFLow+=value_i.getDownFlow();
}
context.write(key, new FlowBean(sumUpFLow, sumDownFLow));
}
}

View File

@ -0,0 +1,7 @@
手机号 上行流量 下行流量
13726230501 200 1100
13396230502 300 1200
13897230503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200

View File

@ -0,0 +1,35 @@
package WordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 一行执行一次
* 100
* 参数一 输入key 数据的偏移量
* 参数二 输入value 行数据
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*
* 用于编写map逻辑的方法
* 一对输入KV执行一次,案例中一行执行一次
* 参数一 输入的key的内容 偏移量
* 参数二 输入的value的内容 line 重点处理它
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ws = value.toString().split(" ");
for (String word : ws) {
// 单词 1 单词 1
context.write(new Text(word),new IntWritable(1));
}
}
}

View File

@ -0,0 +1,30 @@
package WordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
/**
* 一个单词 一组 执行一次
* b
* d 两组
* 执行两次
* 单词 b <1,1,1,1,1,1,1,1,1>
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int number = 0 ;
for (IntWritable value : values) { //
number++ ;
}
context.write(key ,new IntWritable(number));
}
}

View File

@ -0,0 +1,41 @@
package WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WorldCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 配置对象
Configuration conf = new Configuration();
// 2 创建任务对象
Job job = Job.getInstance(conf, "wordcount");
// 2.1 设置 map和reduce任务类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//2.2 设置map和reduce 的输出KV
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 2.3 设置reduce的个数 默认1
job.setNumReduceTasks(2);
// 2.3 设置输入和输出路径
// String hdfs_projPath = "hdfs://localhost:9000/user/ccd/HBaseCourseProj/hbase_demo3_mapreduce/";
String hdfs_projPath = "hbase-lesson/hbase-20250509/src/main/java/WordCount";
FileInputFormat.setInputPaths(job, new Path(hdfs_projPath + "/input/"));
FileOutputFormat.setOutputPath(job, new Path(hdfs_projPath + "/output/"));
// 3 提交任务 等待程序执行完毕 返回值是否成功
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

View File

@ -0,0 +1,4 @@
a b c d a a a a
a b c d a a a b
a b c d a a c c
a b c d a d d d

View File

@ -7,6 +7,10 @@
<groupId>cn.vscoder</groupId>
<artifactId>hbase-lesson</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>hbase-20250509</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -14,6 +18,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
@ -32,6 +37,8 @@
<version>2.4.17</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,35 @@
package date_20250509;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 一行执行一次
* 100
* 参数一 输入key 数据的偏移量
* 参数二 输入value 行数据
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*
* 用于编写map逻辑的方法
* 一对输入KV执行一次,案例中一行执行一次
* 参数一 输入的key的内容 偏移量
* 参数二 输入的value的内容 line 重点处理它
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ws = value.toString().split(" ");
for (String word : ws) {
// 单词 1 单词 1
context.write(new Text(word),new IntWritable(1));
}
}
}

View File

@ -0,0 +1,30 @@
package date_20250509;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
/**
* 一个单词 一组 执行一次
* b
* d 两组
* 执行两次
* 单词 b <1,1,1,1,1,1,1,1,1>
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int number = 0 ;
for (IntWritable value : values) { //
number++ ;
}
context.write(key ,new IntWritable(number));
}
}

View File

@ -0,0 +1,41 @@
package date_20250509;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WorldCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 配置对象
Configuration conf = new Configuration();
// 2 创建任务对象
Job job = Job.getInstance(conf, "wordcount");
// 2.1 设置 map和reduce任务类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//2.2 设置map和reduce 的输出KV
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 2.3 设置reduce的个数 默认1
job.setNumReduceTasks(2);
// 2.3 设置输入和输出路径
// String hdfs_projPath = "hdfs://localhost:9000/user/ccd/HBaseCourseProj/hbase_demo3_mapreduce/";
String hdfs_projPath = "./";
FileInputFormat.setInputPaths(job, new Path(hdfs_projPath + "date_20250509/input/"));
FileOutputFormat.setOutputPath(job, new Path(hdfs_projPath + "date_20250509/output/"));
// 3 提交任务 等待程序执行完毕 返回值是否成功
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

View File

@ -0,0 +1,4 @@
a b c d a a a a
a b c d a a a b
a b c d a a c c
a b c d a d d d