feat(hbase-lesson): 添加 HBase DML数据操作功能
- 新增数据生成器,生成模拟学生数据 - 实现 HBase 客户端连接和数据插入功能- 添加 HBase 数据查询功能,包括单行查询、列族查询和扫描查询 - 集成过滤器功能,支持行键前缀过滤和列值过滤
This commit is contained in:
parent
99161410f3
commit
86b4d22c7d
100000
hbase-lesson/src/main/java/date_20250411/data/students.txt
Normal file
100000
hbase-lesson/src/main/java/date_20250411/data/students.txt
Normal file
File diff suppressed because it is too large
Load Diff
49
hbase-lesson/src/main/java/date_20250411/data_gen.java
Normal file
49
hbase-lesson/src/main/java/date_20250411/data_gen.java
Normal file
@ -0,0 +1,49 @@
|
||||
package date_20250411;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.FileWriter;
|
||||
|
||||
public class data_gen {
|
||||
public static void main(String[] args) throws Exception {
|
||||
BufferedWriter bw =
|
||||
new BufferedWriter(new FileWriter("c"));
|
||||
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
//000001,zhangsan,18,male,beijing,18000,2018-07-01
|
||||
String stu_id = StringUtils.leftPad(i + "", 6, "0");
|
||||
String name = RandomStringUtils.randomAlphabetic(6);
|
||||
int age = RandomUtils.nextInt(18, 38);
|
||||
String gender = null;
|
||||
if (RandomUtils.nextInt() % 2 == 0) {
|
||||
gender = "male";
|
||||
} else {
|
||||
gender = "female";
|
||||
}
|
||||
|
||||
|
||||
String city = null;
|
||||
int flag = RandomUtils.nextInt() % 3;
|
||||
if (flag == 0) {
|
||||
city = "北京";
|
||||
} else if (flag == 1) {
|
||||
city = "上海";
|
||||
} else {
|
||||
city = "南京";
|
||||
}
|
||||
int salary = RandomUtils.nextInt(12000, 32000);
|
||||
|
||||
String dt = RandomUtils.nextInt(2010, 2020) + "-" + StringUtils.leftPad(RandomUtils.nextInt(1, 13) + "", 2, "0") + "-" + StringUtils.leftPad(RandomUtils.nextInt(1, 31) + "", 2, "0");
|
||||
|
||||
bw.write(stu_id + "," + name + "," + age + "," + gender + "," + city + "," + salary + "," + dt);
|
||||
bw.newLine();
|
||||
}
|
||||
|
||||
|
||||
bw.flush();
|
||||
bw.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
package date_20250411;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class task_DML_insert {
|
||||
public static void main(String[] args) throws Exception {
|
||||
// create方法会自动去加载运行时的classpath中的hbase-site.xml等配置文件
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set("hbase.zookeeper.quorum", "hadoop102:2181");
|
||||
|
||||
// 创建一个hbase的客户端连接
|
||||
Connection conn = ConnectionFactory.createConnection(conf);
|
||||
|
||||
// 拿到一个DDL的操作工具
|
||||
Table table = conn.getTable(TableName.valueOf("ideaPro_space:students"));
|
||||
|
||||
// 读数据源
|
||||
BufferedReader br = new BufferedReader(new FileReader("hbase-lesson/src/main/java/date_20250411/data/students.txt"));
|
||||
String line = null;
|
||||
|
||||
ArrayList<Put> puts = new ArrayList<Put>();
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
while ((line = br.readLine()) != null) {
|
||||
String[] split = line.split(",");
|
||||
|
||||
// 000001,zhangsan,18,male,beijing,18000,2018-07-01
|
||||
String rowkey = split[0];
|
||||
|
||||
// 构造一个用于封装KeyValue数据的put对象
|
||||
Put put = new Put(Bytes.toBytes(rowkey));
|
||||
put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
|
||||
put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes(split[2]));
|
||||
put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("gender"), Bytes.toBytes(split[3]));
|
||||
put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("city"), Bytes.toBytes(split[4]));
|
||||
put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("salary"), Bytes.toBytes(split[5]));
|
||||
put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("graduate"), Bytes.toBytes(split[6]));
|
||||
|
||||
// 把封装好的一行数据,先暂存在一个list中
|
||||
puts.add(put);
|
||||
|
||||
// 判断list中的条数是否满足批次大小(10条一批)
|
||||
if (puts.size() >= 10) {
|
||||
// 插入数据
|
||||
table.put(puts);
|
||||
// 清空list
|
||||
puts.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// 最后一批
|
||||
if (puts.size() > 0) {
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("总耗时: " + (end - start));
|
||||
|
||||
// 关闭连接
|
||||
table.close();
|
||||
conn.close();
|
||||
}
|
||||
}
|
197
hbase-lesson/src/main/java/date_20250411/task_DML_select.java
Normal file
197
hbase-lesson/src/main/java/date_20250411/task_DML_select.java
Normal file
@ -0,0 +1,197 @@
|
||||
package date_20250411;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class task_DML_select {
|
||||
public static void main(String[] args) throws Exception {
|
||||
// create方法会自动去加载运行时的classpath中的hbase-site.xml等配置文件
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set("hbase.zookeeper.quorum", "hadoop102:2181");
|
||||
|
||||
// 创建一个hbase的客户端连接
|
||||
Connection conn = ConnectionFactory.createConnection(conf);
|
||||
|
||||
// 拿到一个DDL的操作工具
|
||||
Table table = conn.getTable(TableName.valueOf("ideaPro_space:students"));
|
||||
|
||||
|
||||
scanRows(table);
|
||||
|
||||
// 关闭连接
|
||||
table.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 取一个指定的KeyValue
|
||||
*
|
||||
* @param table
|
||||
*/
|
||||
public static void getKeyValue(Table table) throws IOException {
|
||||
|
||||
|
||||
String rowKey = "000010";
|
||||
|
||||
Get getParam = new Get(Bytes.toBytes(rowKey));
|
||||
getParam.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"));
|
||||
|
||||
|
||||
Result result = table.get(getParam);
|
||||
|
||||
byte[] value = result.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("age"));
|
||||
|
||||
int age = Bytes.toInt(value);
|
||||
|
||||
System.out.println(age);
|
||||
|
||||
table.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get 单行中的指定列族的所有KeyValue数据
|
||||
*
|
||||
* @param table
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void getFamily(Table table) throws IOException {
|
||||
|
||||
String rowKey = "000010";
|
||||
|
||||
Get getParam = new Get(Bytes.toBytes(rowKey));
|
||||
getParam.addFamily(Bytes.toBytes("base_info"));
|
||||
|
||||
Result result = table.get(getParam);
|
||||
CellScanner cellScanner = result.cellScanner();
|
||||
while (cellScanner.advance()) {
|
||||
// 一个cell就代表一个KeyValue
|
||||
Cell cell = cellScanner.current();
|
||||
|
||||
byte[] rowKeyBytes = CellUtil.cloneRow(cell);
|
||||
byte[] familyBytes = CellUtil.cloneFamily(cell);
|
||||
byte[] qualifierBytes = CellUtil.cloneQualifier(cell);
|
||||
byte[] valueBytes = CellUtil.cloneValue(cell);
|
||||
|
||||
String qualifier = Bytes.toString(qualifierBytes);
|
||||
String value = Bytes.toString(valueBytes);
|
||||
System.out.println(Bytes.toString(rowKeyBytes) + ","
|
||||
+ Bytes.toString(familyBytes) + ","
|
||||
+ qualifier + ","
|
||||
+ value
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
table.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 扫描指定行范围的数据
|
||||
*
|
||||
* @param table
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void scanRows(Table table) throws IOException {
|
||||
|
||||
Scan scan = new Scan();
|
||||
String startMd5 = "001000";
|
||||
String stopMd5 = "002000";
|
||||
// 设置了扫描的起始行和结束行(默认含首不含尾)
|
||||
scan.withStartRow(Bytes.toBytes(startMd5));
|
||||
//scan.withStopRow(Bytes.toBytes(stopMd5),false);
|
||||
|
||||
// 设置总共要扫描的行数
|
||||
scan.setLimit(100000);
|
||||
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
// 扫描时的 行 迭代器
|
||||
Iterator<Result> iterator = scanner.iterator();
|
||||
printData(iterator);
|
||||
|
||||
table.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 扫描多行数据,并带过滤条件
|
||||
*
|
||||
* @param table
|
||||
*/
|
||||
public static void scanRowsWithFilter(Table table) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
|
||||
scan.withStartRow(Bytes.toBytes("000001"), true);
|
||||
scan.setLimit(10);
|
||||
|
||||
// 行键前缀过滤器
|
||||
Filter filter1 = new PrefixFilter(Bytes.toBytes("cd")); // 行键前缀过滤器
|
||||
|
||||
// 列值过滤器(匹配到条件的行,整行数据都将返回)
|
||||
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("extra_info"), Bytes.toBytes("city"), CompareOperator.EQUAL, Bytes.toBytes("南京"));
|
||||
|
||||
ArrayList<Filter> filterList = new ArrayList<Filter>();
|
||||
filterList.add(filter1);
|
||||
filterList.add(filter2);
|
||||
|
||||
// 将上面的2个过滤器,组成一个 MUST_PASS_ALL 关系的过滤器组
|
||||
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL, filterList);
|
||||
//FilterListWithAND filters = new FilterListWithAND(filterList);
|
||||
|
||||
|
||||
// 给scan参数设置过滤器条件
|
||||
scan.setFilter(filters);
|
||||
|
||||
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Iterator<Result> iterator = scanner.iterator(); // 拿到行迭代器
|
||||
|
||||
printData(iterator); // 迭代,并打印数据
|
||||
|
||||
table.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 用于打印 scan结果的工具方法
|
||||
*
|
||||
* @param iterator
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void printData(Iterator<Result> iterator) throws IOException {
|
||||
while (iterator.hasNext()) { // 行迭代
|
||||
// 迭代到一行
|
||||
Result result = iterator.next();
|
||||
// 拿到行中的cell单元格的迭代器
|
||||
CellScanner cellScanner = result.cellScanner();
|
||||
// 用单元格迭代器,迭代行中的每一个cell(单元格,KeyValue)
|
||||
while (cellScanner.advance()) { // 行内的keyValue迭代
|
||||
Cell cell = cellScanner.current();
|
||||
|
||||
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
|
||||
String family = Bytes.toString(CellUtil.cloneFamily(cell));
|
||||
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
|
||||
String value = Bytes.toString(CellUtil.cloneValue(cell));
|
||||
System.out.println(rowKey + "," + family + "," + qualifier + "," + value);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user