From b2f5b0e40b56152d10a3dbb166ba12022fea5d85 Mon Sep 17 00:00:00 2001 From: dev_xulongjin Date: Mon, 12 May 2025 09:07:29 +0800 Subject: [PATCH] =?UTF-8?q?feat(hbase-lesson):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=96=B0=E6=A8=A1=E5=9D=97=20hbase-20250509?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 hbase-20250509 模块用于 HBase 课程项目 - 实现了流量统计和词频统计两个 MapReduce 任务 - 添加了相应的 Mapper、Reducer 和 Driver 类- 创建了输入样例文件- 配置了 Maven 依赖 --- hbase-lesson/hbase-20250509/pom.xml | 44 +++++++++++ .../main/java/FlowCount/FlowCountDriver.java | 37 +++++++++ .../main/java/FlowCount/FlowCountMapper.java | 25 ++++++ .../main/java/FlowCount/FlowCountReducer.java | 20 +++++ .../java/FlowCount/input/FlowCountSample.txt | 7 ++ .../src/main/java/FlowCount2/FlowBean.java | 77 +++++++++++++++++++ .../main/java/FlowCount2/FlowCountDriver.java | 35 +++++++++ .../main/java/FlowCount2/FlowCountMapper.java | 26 +++++++ .../java/FlowCount2/FlowCountReducer.java | 21 +++++ .../java/FlowCount2/input/FlowCountSample.txt | 7 ++ .../main/java/WordCount/WordCountMapper.java | 35 +++++++++ .../main/java/WordCount/WordCountReducer.java | 30 ++++++++ .../main/java/WordCount/WorldCountDriver.java | 41 ++++++++++ .../java/WordCount/input/wordcountSample.txt | 4 + hbase-lesson/pom.xml | 7 ++ .../java/date_20250509/WordCountMapper.java | 35 +++++++++ .../java/date_20250509/WordCountReducer.java | 30 ++++++++ .../java/date_20250509/WorldCountDriver.java | 41 ++++++++++ .../date_20250509/input/wordcountSample.txt | 4 + 19 files changed, 526 insertions(+) create mode 100644 hbase-lesson/hbase-20250509/pom.xml create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountDriver.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountMapper.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountReducer.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount/input/FlowCountSample.txt create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowBean.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountDriver.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountMapper.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountReducer.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/FlowCount2/input/FlowCountSample.txt create mode 100644 hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountMapper.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountReducer.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/WordCount/WorldCountDriver.java create mode 100644 hbase-lesson/hbase-20250509/src/main/java/WordCount/input/wordcountSample.txt create mode 100644 hbase-lesson/src/main/java/date_20250509/WordCountMapper.java create mode 100644 hbase-lesson/src/main/java/date_20250509/WordCountReducer.java create mode 100644 hbase-lesson/src/main/java/date_20250509/WorldCountDriver.java create mode 100644 hbase-lesson/src/main/java/date_20250509/input/wordcountSample.txt diff --git a/hbase-lesson/hbase-20250509/pom.xml b/hbase-lesson/hbase-20250509/pom.xml new file mode 100644 index 0000000..3be12a0 --- /dev/null +++ b/hbase-lesson/hbase-20250509/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + + cn.vscoder + hbase-lesson + 1.0-SNAPSHOT + + + hbase-20250509 + + + 8 + 8 + UTF-8 + + + + + org.apache.hadoop + hadoop-common + 3.3.1 + + + org.apache.hadoop + hadoop-hdfs + 3.3.1 + + + org.apache.hadoop + hadoop-mapreduce-client-core + 3.3.1 + + + + org.apache.hadoop + hadoop-mapreduce-client-common + 3.3.1 + + + + \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountDriver.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountDriver.java new file mode 100644 index 0000000..d2baaf7 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountDriver.java @@ -0,0 +1,37 @@ +package FlowCount; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +public class FlowCountDriver { + public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + Configuration conf = new Configuration(); + + Job job = Job.getInstance(conf, "FlowCount"); + + job.setMapperClass(FlowCountMapper.class); + job.setReducerClass(FlowCountReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount/input/")); + FileOutputFormat.setOutputPath(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount/output/")); + + job.waitForCompletion(true); + + + } +} + diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountMapper.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountMapper.java new file mode 100644 index 0000000..d63f76f --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountMapper.java @@ -0,0 +1,25 @@ +package FlowCount; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class FlowCountMapper extends Mapper { + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] oneLine = value.toString().split("\\s+"); + try { + long upFlow = Long.parseLong(oneLine[1]); + long downFlow = Long.parseLong(oneLine[2]); + long sumFlow = upFlow + downFlow; + + context.write(new Text(oneLine[0]), new LongWritable(sumFlow)); + } catch (Exception e) { + e.printStackTrace(); + } + + } +} diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountReducer.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountReducer.java new file mode 100644 index 0000000..dbf009e --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/FlowCountReducer.java @@ -0,0 +1,20 @@ +package FlowCount; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class FlowCountReducer extends Reducer { + + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + long count =0; + for(LongWritable value_i: values) { + count+=value_i.get(); + } + + context.write(key, new LongWritable(count)); + } +} \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount/input/FlowCountSample.txt b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/input/FlowCountSample.txt new file mode 100644 index 0000000..2c051dc --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount/input/FlowCountSample.txt @@ -0,0 +1,7 @@ +手机号 上行流量 下行流量 +13726230501 200 1100 +13396230502 300 1200 +13897230503 400 1300 +13897230503 100 300 +13597230534 500 1400 +13597230534 300 1200 \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowBean.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowBean.java new file mode 100644 index 0000000..e1f64b1 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowBean.java @@ -0,0 +1,77 @@ +package FlowCount2; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +//implements关键字用于接口的实现,接口定义一个类应该如何操作的蓝图 +public class FlowBean implements Writable { + private long upFlow; + private long downFlow; + private long sumFlow; + + public FlowBean(){ + + } + + public FlowBean(long upFlow, long downFlow){ + this.upFlow = upFlow; + this.downFlow =downFlow; + this.sumFlow = upFlow+downFlow; + + } + + public long getUpFlow() { + return upFlow; + } + + public void setUpFlow(long upFlow) { + this.upFlow = upFlow; + } + + public long getDownFlow() { + return downFlow; + } + + public void setDownFlow(long downFlow) { + this.downFlow = downFlow; + } + + public long getSumFlow() { + return sumFlow; + } + + public void setSumFlow(long sumFlow) { + this.sumFlow = sumFlow; + } + + //写的方法 序列化方式 Java原始数据类型写入到字节流 + public void write(DataOutput out) throws IOException { + out.writeLong(this.upFlow); + out.writeLong(this.downFlow); + out.writeLong(this.sumFlow); + + } + + //读的方法 反序列化 从字节流中读取Java原始数据类型的值 + public void readFields(DataInput in) throws IOException { + this.upFlow = in.readLong(); + this.downFlow = in.readLong(); + this.sumFlow =in.readLong(); + + } + + // Object 类的 toString() 方法返回的格式是 ClassName@hashcode + // 重写 toString() 方法 + // 当MapReduce框架需要将 FlowBean 对象转换为字符串形式会调用自定义的 toString() 方法 + @Override + public String toString() { + return "FlowBean{" + + "upFlow=" + upFlow + + ", downFlow=" + downFlow + + ", sumFlow=" + sumFlow + + '}'; + } +} \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountDriver.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountDriver.java new file mode 100644 index 0000000..41b4350 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountDriver.java @@ -0,0 +1,35 @@ +package FlowCount2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +public class FlowCountDriver { + public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + Configuration conf = new Configuration(); + + Job job = Job.getInstance(conf, "FlowBean"); + + job.setMapperClass(FlowCountMapper.class); + job.setReducerClass(FlowCountReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(FlowBean.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(FlowBean.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount2/input/")); + FileOutputFormat.setOutputPath(job, new Path("hbase-lesson/hbase-20250509/src/main/java/FlowCount2/output/")); + + job.waitForCompletion(true); + + + } +} diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountMapper.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountMapper.java new file mode 100644 index 0000000..4bc3da5 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountMapper.java @@ -0,0 +1,26 @@ +package FlowCount2; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class FlowCountMapper extends Mapper { + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] oneLine = value.toString().split("\\s+"); + try { + long upFlow = Long.parseLong(oneLine[1]); + long downFlow = Long.parseLong(oneLine[2]); + long sumFlow = upFlow + downFlow; + + context.write(new Text(oneLine[0]), new FlowBean(upFlow, downFlow)); + } catch (Exception e) { + e.printStackTrace(); + } + + } +} + diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountReducer.java b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountReducer.java new file mode 100644 index 0000000..da7110f --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/FlowCountReducer.java @@ -0,0 +1,21 @@ +package FlowCount2; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class FlowCountReducer extends Reducer { + + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + long sumUpFLow =0; + long sumDownFLow =0; + for(FlowBean value_i: values) { + sumUpFLow+=value_i.getUpFlow(); + sumDownFLow+=value_i.getDownFlow(); + } + + context.write(key, new FlowBean(sumUpFLow, sumDownFLow)); + } +} \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/input/FlowCountSample.txt b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/input/FlowCountSample.txt new file mode 100644 index 0000000..2c051dc --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/FlowCount2/input/FlowCountSample.txt @@ -0,0 +1,7 @@ +手机号 上行流量 下行流量 +13726230501 200 1100 +13396230502 300 1200 +13897230503 400 1300 +13897230503 100 300 +13597230534 500 1400 +13597230534 300 1200 \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountMapper.java b/hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountMapper.java new file mode 100644 index 0000000..e373c69 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountMapper.java @@ -0,0 +1,35 @@ +package WordCount; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class WordCountMapper extends Mapper { + /** + * 一行执行一次 + * 100 + * 参数一 输入key 数据的偏移量 + * 参数二 输入value 行数据 + * @param key + * @param value + * @param context + * @throws IOException + * @throws InterruptedException + * + * 用于编写map逻辑的方法 + * 一对输入KV执行一次,案例中一行执行一次 + * 参数一 输入的key的内容 偏移量 + * 参数二 输入的value的内容 line 重点处理它 + */ + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] ws = value.toString().split(" "); + for (String word : ws) { + // 单词 1 单词 1 + context.write(new Text(word),new IntWritable(1)); + } + } +} \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountReducer.java b/hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountReducer.java new file mode 100644 index 0000000..93d9d84 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/WordCount/WordCountReducer.java @@ -0,0 +1,30 @@ +package WordCount; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class WordCountReducer extends Reducer { + /** + * 一个单词 一组 执行一次 + * b + * d 两组 + * 执行两次 + * 单词 b <1,1,1,1,1,1,1,1,1> + * @param key + * @param values + * @param context + * @throws IOException + * @throws InterruptedException + */ + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + int number = 0 ; + for (IntWritable value : values) { // + number++ ; + } + context.write(key ,new IntWritable(number)); + } +} \ No newline at end of file diff --git a/hbase-lesson/hbase-20250509/src/main/java/WordCount/WorldCountDriver.java b/hbase-lesson/hbase-20250509/src/main/java/WordCount/WorldCountDriver.java new file mode 100644 index 0000000..95e8473 --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/WordCount/WorldCountDriver.java @@ -0,0 +1,41 @@ +package WordCount; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +public class WorldCountDriver { + public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + // 1 配置对象 + Configuration conf = new Configuration(); + // 2 创建任务对象 + Job job = Job.getInstance(conf, "wordcount"); + // 2.1 设置 map和reduce任务类 + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(WordCountReducer.class); + //2.2 设置map和reduce 的输出KV + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + // 2.3 设置reduce的个数 默认1 + job.setNumReduceTasks(2); + // 2.3 设置输入和输出路径 + +// String hdfs_projPath = "hdfs://localhost:9000/user/ccd/HBaseCourseProj/hbase_demo3_mapreduce/"; + String hdfs_projPath = "hbase-lesson/hbase-20250509/src/main/java/WordCount"; + + FileInputFormat.setInputPaths(job, new Path(hdfs_projPath + "/input/")); + FileOutputFormat.setOutputPath(job, new Path(hdfs_projPath + "/output/")); + + // 3 提交任务 等待程序执行完毕 返回值是否成功 + boolean b = job.waitForCompletion(true); + System.exit(b ? 0 : 1); + } +} diff --git a/hbase-lesson/hbase-20250509/src/main/java/WordCount/input/wordcountSample.txt b/hbase-lesson/hbase-20250509/src/main/java/WordCount/input/wordcountSample.txt new file mode 100644 index 0000000..c6e345d --- /dev/null +++ b/hbase-lesson/hbase-20250509/src/main/java/WordCount/input/wordcountSample.txt @@ -0,0 +1,4 @@ +a b c d a a a a +a b c d a a a b +a b c d a a c c +a b c d a d d d \ No newline at end of file diff --git a/hbase-lesson/pom.xml b/hbase-lesson/pom.xml index 40412c5..f486611 100644 --- a/hbase-lesson/pom.xml +++ b/hbase-lesson/pom.xml @@ -7,6 +7,10 @@ cn.vscoder hbase-lesson 1.0-SNAPSHOT + pom + + hbase-20250509 + 8 @@ -14,6 +18,7 @@ UTF-8 + org.apache.hbase @@ -32,6 +37,8 @@ 2.4.17 compile + + \ No newline at end of file diff --git a/hbase-lesson/src/main/java/date_20250509/WordCountMapper.java b/hbase-lesson/src/main/java/date_20250509/WordCountMapper.java new file mode 100644 index 0000000..6932fbf --- /dev/null +++ b/hbase-lesson/src/main/java/date_20250509/WordCountMapper.java @@ -0,0 +1,35 @@ +package date_20250509; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class WordCountMapper extends Mapper { + /** + * 一行执行一次 + * 100 + * 参数一 输入key 数据的偏移量 + * 参数二 输入value 行数据 + * @param key + * @param value + * @param context + * @throws IOException + * @throws InterruptedException + * + * 用于编写map逻辑的方法 + * 一对输入KV执行一次,案例中一行执行一次 + * 参数一 输入的key的内容 偏移量 + * 参数二 输入的value的内容 line 重点处理它 + */ + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] ws = value.toString().split(" "); + for (String word : ws) { + // 单词 1 单词 1 + context.write(new Text(word),new IntWritable(1)); + } + } +} \ No newline at end of file diff --git a/hbase-lesson/src/main/java/date_20250509/WordCountReducer.java b/hbase-lesson/src/main/java/date_20250509/WordCountReducer.java new file mode 100644 index 0000000..f72c01c --- /dev/null +++ b/hbase-lesson/src/main/java/date_20250509/WordCountReducer.java @@ -0,0 +1,30 @@ +package date_20250509; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class WordCountReducer extends Reducer { + /** + * 一个单词 一组 执行一次 + * b + * d 两组 + * 执行两次 + * 单词 b <1,1,1,1,1,1,1,1,1> + * @param key + * @param values + * @param context + * @throws IOException + * @throws InterruptedException + */ + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + int number = 0 ; + for (IntWritable value : values) { // + number++ ; + } + context.write(key ,new IntWritable(number)); + } +} \ No newline at end of file diff --git a/hbase-lesson/src/main/java/date_20250509/WorldCountDriver.java b/hbase-lesson/src/main/java/date_20250509/WorldCountDriver.java new file mode 100644 index 0000000..f78c81f --- /dev/null +++ b/hbase-lesson/src/main/java/date_20250509/WorldCountDriver.java @@ -0,0 +1,41 @@ +package date_20250509; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +public class WorldCountDriver { + public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + // 1 配置对象 + Configuration conf = new Configuration(); + // 2 创建任务对象 + Job job = Job.getInstance(conf, "wordcount"); + // 2.1 设置 map和reduce任务类 + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(WordCountReducer.class); + //2.2 设置map和reduce 的输出KV + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + // 2.3 设置reduce的个数 默认1 + job.setNumReduceTasks(2); + // 2.3 设置输入和输出路径 + +// String hdfs_projPath = "hdfs://localhost:9000/user/ccd/HBaseCourseProj/hbase_demo3_mapreduce/"; + String hdfs_projPath = "./"; + + FileInputFormat.setInputPaths(job, new Path(hdfs_projPath + "date_20250509/input/")); + FileOutputFormat.setOutputPath(job, new Path(hdfs_projPath + "date_20250509/output/")); + + // 3 提交任务 等待程序执行完毕 返回值是否成功 + boolean b = job.waitForCompletion(true); + System.exit(b ? 0 : 1); + } +} diff --git a/hbase-lesson/src/main/java/date_20250509/input/wordcountSample.txt b/hbase-lesson/src/main/java/date_20250509/input/wordcountSample.txt new file mode 100644 index 0000000..c6e345d --- /dev/null +++ b/hbase-lesson/src/main/java/date_20250509/input/wordcountSample.txt @@ -0,0 +1,4 @@ +a b c d a a a a +a b c d a a a b +a b c d a a c c +a b c d a d d d \ No newline at end of file