项目源码下载:demo-boot-hbase2.zip(9987)
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.3</version>
</dependency>
@Data
//@Component
@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
/**
* 配置
*/
private Map<String, String> config;
/**
* hbase home目录
* 如果系统配置了环境变量HADOOP_HOME则不需要配置这里
*/
private String hadoopHome;
}
hbase:
config:
hbase:
master: 192.168.79.132:16000
zookeeper:
property:
clientPort: 2181
quorum: 192.168.79.132
zookeeper:
znode:
parent: /hbase
hadoop-home: D:\software\HADOOP_HOME
hadoop-home配置目录,Windows请下载源码,找的里面的hadoop-common-2.7.0-bin.zip,解压后就是这个配置的目录
@org.springframework.context.annotation.Configuration
@EnableConfigurationProperties(HbaseProperties.class)
public class HbaseConfig {
private final HbaseProperties prop;
public HbaseConfig(HbaseProperties properties) {
this.prop = properties;
}
@Bean
public Configuration configuration() {
//配置
if (StringUtils.isNotBlank(prop.getHadoopHome())){
System.setProperty("hadoop.home.dir",prop.getHadoopHome());
}
Configuration configuration = HBaseConfiguration.create();
Map<String, String> config = prop.getConfig();
// configuration.set("hbase.zookeeper.quorum", prop.getConfig().get("zookeeper.quorum"));
// configuration.set("zookeeper.znode.parent", "/hbase");
// configuration.set("hbase.master", prop.getConfig().get("zookeeper.quorum")+":16000");
config.forEach(configuration::set);
return configuration;
}
@Bean
public Connection getConnection() throws IOException {
return ConnectionFactory.createConnection(configuration());
}
}
public interface IHbaseService {
/**
* 创建表
* @param tableName 表名
* @param columnFamily 族名
* @throws IOException
*/
void createTable(String tableName, List<String> columnFamily);
/**
* 预分区创建表
* @param tableName 表名
* @param columnFamily 族名
* @param splitKeys 预分区region
*/
void createTableBySplitKeys(String tableName, List<String> columnFamily,byte[][]splitKeys);
/**
* 创建预分区region key
* @param keys 预分区region key
*/
byte[][] createSplitKeys(String[] keys);
/**
* 创建16进制预分区region key
* @param startKey 开始key
* @param endKey 结束key
* @param regionNum 分区数量
* @return
*/
byte[][] createHexSplitKeys(String startKey,String endKey,int regionNum);
/**
* 删除表
* @param tableName
* @throws IOException
*/
void deleteTable(String tableName);
/**
* 列出所有表
* @return
* @throws IOException
*/
List<String> listTables();
/**
* 添加数据
* @param tableName 表名
* @param rowKey key
* @param columnFamily
* @param columns
* @param values
*/
void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values);
/***
* 添加数据
* @param tableName
* @param rowKey
* @param columnFamily
* @param map
*/
void insertOrUpdate(String tableName, String rowKey, String columnFamily, LinkedHashMap<String, String> map);
/**
* 根据rowKey获取行数据
* @param tableName
* @param rowKey
* @return
*/
LinkedHashMap<String,String> getRowData(String tableName, String rowKey,String columnFamily);
/***
* 根据rowKey获取行数据,并指定返回行的那些字段(columns)
* @param tableName 表名
* @param rowKey 行id
* @param columnFamily cf
* @param columns 指定返回字段名称
* @return
*/
LinkedHashMap<String,String> getRowData(String tableName, String rowKey,String columnFamily, List<byte[]> columns);
/***
* 删除行
* @param tableName
* @param rowKey
* @return
*/
boolean deleteRow(String tableName, String rowKey);
/**
*
* @param tableName
* @param rowKey
* @param familyName
* @param column
* @param value
*/
void setColumnValue(String tableName, String rowKey, String familyName, String column, String value);
/***
* 删除列族
* @param tableName 表名
* @param rowKey 行id
* @param columnFamily 列族名
*/
void deleteColumnFamily(String tableName, String rowKey, String columnFamily);
/***
* 删除列
* @param tableName
* @param rowKey
* @param columnFamily
* @param columnName
* @return
*/
boolean deleteColumn(String tableName, String rowKey, String columnFamily, String columnName);
/**
* 获取表所有数据
* @param tableName
* @return 返回map说明
* 第一层
* key-> rowKey
* value-> 数据
*
* 第二层
* key -> columnFamily:columnName
* value -> 具体数据值
*/
Map<String, Map<String, String>> findAll(String tableName);
/**
* 根据 startKey-stopKey查询之间的数据
* @param tableName 表名
* @param startRowKey 开始行id
* @param stopRowKey 结束行id
* @return
*/
Map<String, Map<String, String>> findList(String tableName,String startRowKey,String stopRowKey);
/**
* 根据 startKey-stopKey并指定columnFamily查询之间的数据
* @param tableName 表名
* @param columnFamily 列族名
* @param startRowKey 开始行id
* @param stopRowKey 结束行id
* @return
*/
Map<String, Map<String, String>> findList(String tableName,String columnFamily,String startRowKey,String stopRowKey);
@Slf4j
@Service
public class HbaseServiceImpl implements IHbaseService {
@Resource
Connection connection;
/**
* 创建表
* - 只有一个列族
* @param tableName 表名
* @param columnFamily 列族
* @throws IOException
*/
public void createTable(String tableName, List<String> columnFamily) {
TableName table = TableName.valueOf(tableName);
try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
if (admin.tableExists(table)) {
log.warn("表[{}]已存在!", tableName);
return;
}
//列族column family
List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
Bytes.toBytes(cf)).build());
});
TableDescriptor tableDesc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(cfDesc).build();
admin.createTable(tableDesc);
}catch (Exception e){
log.error("创建表错误:",e);
}
}
@Override
public void createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
TableName table = TableName.valueOf(tableName);
try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
if (admin.tableExists(table)) {
log.warn("表[{}]已存在!", tableName);
return;
}
//列族column family
List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
Bytes.toBytes(cf)).build());
});
TableDescriptor tableDesc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(cfDesc).build();
admin.createTable(tableDesc,splitKeys);
}catch (Exception e){
log.error("创建表错误:",e);
}
}
@Override
public byte[][] createSplitKeys(String[] keys) {
if(keys==null){
//默认为3个分区
keys = new String[] { "1", "2", "3"};
}
byte[][] splitKeys = new byte[keys.length][];
//升序排序
TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for(String key : keys){
rows.add(Bytes.toBytes(key));
}
Iterator<byte[]> rowKeyIter = rows.iterator();
int i=0;
while (rowKeyIter.hasNext()) {
byte[] tempRow = rowKeyIter.next();
rowKeyIter.remove();
splitKeys[i] = tempRow;
i++;
}
return splitKeys;
}
@Override
public byte[][] createHexSplitKeys(String startKey, String endKey, int regionNum) {
byte[][] splits = new byte[regionNum-1][];
BigInteger lowestKey = new BigInteger(startKey, 16);
BigInteger highestKey = new BigInteger(endKey, 16);
BigInteger range = highestKey.subtract(lowestKey);
BigInteger regionIncrement = range.divide(BigInteger.valueOf(regionNum));
lowestKey = lowestKey.add(regionIncrement);
for(int i=0; i < regionNum-1;i++) {
BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
byte[] b = String.format("%016x", key).getBytes();
splits[i] = b;
}
return splits;
}
/**
* 删除表
* @param tableName 表名称
*/
public void deleteTable(String tableName) {
TableName tName = TableName.valueOf(tableName);
try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
if (admin.tableExists(tName)) {
admin.disableTable(tName);
admin.deleteTable(tName);
} else {
log.error("表 {} 不存在!", tableName);
return;
}
}catch (Exception e){
log.error("删除表错误:",e);
}
}
/**
* 列出hbase中所有的表
*/
public List<String> listTables() {
List<String> tables = new ArrayList<>(8);
try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
tables.add(tableName.getNameAsString());
}
return tables;
}catch (Exception e){
log.error("获取表信息错误:",e);
return new ArrayList<>();
}
}
@Override
public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values) {
try (Table table = connection.getTable(TableName.valueOf(tableName));){
Put put=new Put(Bytes.toBytes(rowKey));
if (columns!=null && values!=null&&columns.length==values.length){
for (int i=0;i<columns.length;i++){
if (columns[i] != null && values[i] != null) {
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
} else {
throw new NullPointerException(MessageFormat.format(
"列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
}
}
table.put(put);
log.debug("putData add or update data Success,rowKey:" + rowKey);
}
}catch (Exception e){
log.error(MessageFormat.format(
"为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
tableName, rowKey, columnFamily), e);
}
}
@Override
public void insertOrUpdate(String tableName, String rowKey, String columnFamily, LinkedHashMap<String, String> map) {
try (Table table = connection.getTable(TableName.valueOf(tableName))){
//设置rowkey
Put put = new Put(Bytes.toBytes(rowKey));
if(!CollectionUtils.isEmpty(map)) {
for(Map.Entry<String, String> cell : map.entrySet()) {
String key = cell.getKey();
String value = cell.getValue();
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(key), Bytes.toBytes(value));
}
table.put(put);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public LinkedHashMap<String, String> getRowData(String tableName, String rowKey,String columnFamily) {
return getRowData(tableName,rowKey, columnFamily,null);
}
@Override
public LinkedHashMap<String, String> getRowData(String tableName, String rowKey, String columnFamily, List<byte[]> columns) {
System.err.println("表:"+tableName+" roeKey:"+rowKey +" 查询列个数:"+columns);
//返回的键值对
LinkedHashMap<String,String> result = new LinkedHashMap<>();
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(columnFamily.getBytes());
if (Objects.nonNull(columns) && columns.size() > 0){
for (int i = 0; i < columns.size(); i++) {
get.addColumn(columnFamily.getBytes(),columns.get(i));
}
}
try ( Table table= connection.getTable(TableName.valueOf(tableName))){
Result hTableResult = table.get(get);
if (hTableResult != null && !hTableResult.isEmpty()) {
for (Cell cell : hTableResult.listCells()) {
result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}
}catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("hbase查询异常");
}
System.err.println("当行记录:"+result);
return result;
}
@Override
public boolean deleteRow(String tableName, String rowKey) {
try(Admin admin = connection.getAdmin();Table table=connection.getTable(TableName.valueOf(tableName))) {
if(admin.tableExists(TableName.valueOf(tableName))){
// 获取表
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}
}catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
@Override
public void setColumnValue(String tableName, String rowKey, String familyName, String column, String value) {
try(Table table=connection.getTable(TableName.valueOf(tableName))) {
// 设置rowKey
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
}catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) {
try (Table table = connection.getTable(TableName.valueOf(tableName))){
Delete delete = new Delete(rowKey.getBytes());
delete.addFamily(Bytes.toBytes(columnFamily));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public boolean deleteColumn(String tableName, String rowKey, String columnFamily, String columnName) {
try(Admin admin = connection.getAdmin();Table table=connection.getTable(TableName.valueOf(tableName))) {
if(admin.tableExists(TableName.valueOf(tableName))){
Delete delete = new Delete(Bytes.toBytes(rowKey));
// 设置待删除的列
delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
table.delete(delete);
}
}catch (IOException e) {
e.printStackTrace();
return false;
}
return false;
}
/**
* 获取表数据
* @param tableName 表名
* @return 返回map说明
* 第一层
* key-> rowKey
* value-> 数据
*
* 第二层
* key -> columnFamily:columnName
* value -> 具体数据值
*/
private LinkedHashMap<String, Map<String, String>> queryData(String tableName, Scan scan) {
// <rowKey,对应的行数据>
LinkedHashMap<String, Map<String,String>> result = new LinkedHashMap<>();
try(Table table=connection.getTable(TableName.valueOf(tableName)); ResultScanner rs=table.getScanner(scan)) {
for (Result r : rs) {
String rowKey = null;
// 行键,列族和列限定符一起确定一个单元(Cell)
for (Cell cell : r.listCells()) {
String columnFamily = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
Map<String,String> rowMap = result.get(rowKey);
if (rowKey == null) {
rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
if (Objects.isNull(rowMap)){
rowMap=new LinkedHashMap<>();
result.put(rowKey,rowMap);
}
String columnName=Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue=Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
String key=columnFamily+":"+columnName;
System.out.println(key+" : "+columnValue);
rowMap.put(key,columnValue);
}
}
} catch (IOException e) {
log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}", tableName), e);
}
return result;
}
@Override
public Map<String, Map<String, String>> findAll(String tableName) {
Scan scan = new Scan();
return this.queryData(tableName, scan);
}
@Override
public Map<String, Map<String, String>> findList(String tableName, String startRowKey, String stopRowKey) {
Scan scan=new Scan();
if (StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(stopRowKey));
}
return this.queryData(tableName, scan);
}
@Override
public Map<String, Map<String, String>> findList(String tableName, String columnFamily, String startRowKey, String stopRowKey) {
Scan scan=new Scan();
scan.addFamily(Bytes.toBytes(columnFamily));
if (StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(stopRowKey));
}
return this.queryData(tableName, scan);
}
@SpringBootTest
class DemoBootHbase2ApplicationTests {
@Resource
IHbaseService hbaseService;
@Test
public void tableCreate(){
hbaseService.createTable("test_base", Arrays.asList("a", "back"));
}
@Test
public void listTable(){
List<String> strings = hbaseService.listTables();
System.out.println(String.join(",",strings));
}
@Test
public void delTable(){
hbaseService.deleteTable("test_base");
}
@Test
public void testPutData(){
hbaseService.insertOrUpdate("test_base", "000001", "a", new String[]{
"project_id", "varName", "coefs", "pvalues", "tvalues",
"create_time"}, new String[]{"40866", "mob_3", "0.9416",
"0.0000", "12.2293", "null"});
hbaseService.insertOrUpdate("test_base", "000002", "a", new String[]{
"project_id", "varName", "coefs", "pvalues", "tvalues",
"create_time"}, new String[]{"40866", "idno_prov", "0.9317",
"0.0000", "9.8679", "null"});
hbaseService.insertOrUpdate("test_base", "000003", "a", new String[]{
"project_id", "varName", "coefs", "pvalues", "tvalues",
"create_time"}, new String[]{"40866", "education", "0.8984",
"0.0000", "25.5649", "null"});
Map<String, Map<String,String>> test_base = hbaseService.findAll("test_base");
System.out.println(0);
}
@Test
public void putMapData(){
LinkedHashMap<String,String> dataMap=new LinkedHashMap<>();
int tmp= 20211201;
for (int i=1;i<20;i++) {
dataMap.put("userName","李"+i);
dataMap.put("userAge",String.valueOf(20+i));
hbaseService.insertOrUpdate("test_base","10000_"+tmp,"a",dataMap);
dataMap.clear();
tmp++;
}
}
@Test
public void findList(){
Map<String, Map<String, String>> test_base = hbaseService.findList("test_base", "10000_20211201", "10000_20211201");
System.out.println(test_base);
}
@Test
public void setCol(){
hbaseService.setColumnValue("test_base","000001","a","姓名","aa");
LinkedHashMap<String, String> rowData = hbaseService.getRowData("test_base", "000","a");
System.out.println(rowData);
hbaseService.setColumnValue("test_base","000001","a","姓名","abb");
rowData = hbaseService.getRowData("test_base", "000","back",Arrays.asList("姓名".getBytes()));
System.out.println(rowData);
}
@Test
public void getRowData(){
LinkedHashMap<String, String> rowData = hbaseService.getRowData("test_base", "000001","a");
System.out.println(JSON.toJSONString(rowData));
rowData= hbaseService.getRowData("test_base", "000001","back");
System.out.println(JSON.toJSONString(rowData));
}
@Test
public void queryAll(){
Map<String, Map<String, String>> test_base = hbaseService.findAll("test_base");
System.out.println(JSON.toJSONString(test_base));
}
}
http://blog.xqlee.com/article/1045.html