diff --git a/README.md b/README.md index 078484b..7f0032d 100644 --- a/README.md +++ b/README.md @@ -98,14 +98,16 @@ TODO ## 九、HBase -1. [HBase基本环境搭建(Standalone /pseudo-distributed mode)](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Hbase%E5%9F%BA%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md) +1. Hbase 简介 2. [HBase系统架构及数据结构](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%E7%B3%BB%E7%BB%9F%E6%9E%B6%E6%9E%84%E5%8F%8A%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84.md) -3. [HBase常用Shell命令](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%20Shell.md) -4. HBase Java API -5. [HBase 协处理器](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase协处理器.md) -6. [HBase 容灾与备份](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%E5%AE%B9%E7%81%BE%E4%B8%8E%E5%A4%87%E4%BB%BD.md) -7. [HBase的SQL中间层——Phoenix](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%E7%9A%84SQL%E5%B1%82%E2%80%94%E2%80%94Phoenix.md) -8. [Spring/Spring Boot 整合 Mybatis + Phoenix](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spring%2BMybtais%2BPhoenix%E6%95%B4%E5%90%88.md) +3. [HBase基本环境搭建(Standalone /pseudo-distributed mode)](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Hbase%E5%9F%BA%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md) +4. [HBase常用Shell命令](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%20Shell.md) +5. HBase Java API +6. Hbase 过滤器 +7. [HBase 协处理器](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase协处理器.md) +8. [HBase 容灾与备份](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%E5%AE%B9%E7%81%BE%E4%B8%8E%E5%A4%87%E4%BB%BD.md) +9. [HBase的SQL中间层——Phoenix](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hbase%E7%9A%84SQL%E5%B1%82%E2%80%94%E2%80%94Phoenix.md) +10. [Spring/Spring Boot 整合 Mybatis + Phoenix](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spring%2BMybtais%2BPhoenix%E6%95%B4%E5%90%88.md) ## 十、Kafka 1. Kafka 简介及消息处理过程分析 diff --git a/code/Hbase/hbase-java-api-1.x/pom.xml b/code/Hbase/hbase-java-api-1.x/pom.xml new file mode 100644 index 0000000..915329b --- /dev/null +++ b/code/Hbase/hbase-java-api-1.x/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + com.heibaiying + hbase-java-api-1.x + 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + + org.apache.hbase + hbase-client + 1.2.0 + + + junit + junit + 4.12 + test + + + + \ No newline at end of file diff --git a/code/Hbase/hbase-java-api-1.x/src/main/java/com/heibaiying/HBaseUtils.java b/code/Hbase/hbase-java-api-1.x/src/main/java/com/heibaiying/HBaseUtils.java new file mode 100644 index 0000000..cc76890 --- /dev/null +++ b/code/Hbase/hbase-java-api-1.x/src/main/java/com/heibaiying/HBaseUtils.java @@ -0,0 +1,267 @@ +package com.heibaiying; + +import javafx.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.List; + +public class HBaseUtils { + + private static Connection connection; + + static { + Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.property.clientPort", "2181"); + // 如果是集群 则主机名用逗号分隔 + configuration.set("hbase.zookeeper.quorum", "hadoop001"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 创建HBase表 + * + * @param tableName 表名 + * @param columnFamilies 列族的数组 + */ + public static boolean createTable(String tableName, List columnFamilies) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + if (admin.tableExists(tableName)) { + return false; + } + HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); + columnFamilies.forEach(columnFamily -> { + HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily); + columnDescriptor.setMaxVersions(1); + tableDescriptor.addFamily(columnDescriptor); + }); + admin.createTable(tableDescriptor); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除hBase表 + * + * @param tableName 表名 + */ + public static boolean deleteTable(String tableName) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + // 删除表前需要先禁用表 + admin.disableTable(tableName); + admin.deleteTable(tableName); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param qualifier 列标识 + * @param value 数据 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, + String value) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param pairList 列标识和值的集合 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List> pairList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 根据rowKey获取指定行的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static Result getRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + return table.get(get); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 获取指定行指定列(cell)的最新版本的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamily 列族 + * @param qualifier 列标识 + */ + public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + if (!get.isCheckExistenceOnly()) { + get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + Result result = table.get(get); + byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + return Bytes.toString(resultValue); + } else { + return null; + } + + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索全表 + * + * @param tableName 表名 + */ + public static ResultScanner getScanner(String tableName) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param startRowKey 起始RowKey + * @param endRowKey 终止RowKey + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, + FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.setStartRow(Bytes.toBytes(startRowKey)); + scan.setStopRow(Bytes.toBytes(endRowKey)); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 删除指定行记录 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static boolean deleteRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + table.delete(delete); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除指定行的指定列 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param familyName 列族 + * @param qualifier 列标识 + */ + public static boolean deleteColumn(String tableName, String rowKey, String familyName, + String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); + table.delete(delete); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + +} diff --git a/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java b/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java new file mode 100644 index 0000000..159d883 --- /dev/null +++ b/code/Hbase/hbase-java-api-1.x/src/test/java/com/heibaiying/HbaseUtilsTest.java @@ -0,0 +1,107 @@ +package com.heibaiying; + +import javafx.util.Pair; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class HBaseUtilsTest { + + private static final String TABLE_NAME = "class"; + private static final String TEACHER = "teacher"; + private static final String STUDENT = "student"; + + @Test + public void createTable() { + // 新建表 + List columnFamilies = Arrays.asList(TEACHER, STUDENT); + boolean table = HBaseUtils.createTable(TABLE_NAME, columnFamilies); + System.out.println("表创建结果:" + table); + } + + @Test + public void insertData() { + List> pairs1 = Arrays.asList(new Pair<>("name", "Tom"), + new Pair<>("age", "22"), + new Pair<>("gender", "1")); + HBaseUtils.putRow(TABLE_NAME, "rowKey1", STUDENT, pairs1); + + List> pairs2 = Arrays.asList(new Pair<>("name", "Jack"), + new Pair<>("age", "33"), + new Pair<>("gender", "2")); + HBaseUtils.putRow(TABLE_NAME, "rowKey2", STUDENT, pairs2); + + List> pairs3 = Arrays.asList(new Pair<>("name", "Mike"), + new Pair<>("age", "44"), + new Pair<>("gender", "1")); + HBaseUtils.putRow(TABLE_NAME, "rowKey3", STUDENT, pairs3); + } + + + @Test + public void getRow() { + Result result = HBaseUtils.getRow(TABLE_NAME, "rowKey1"); + if (result != null) { + System.out.println(Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))); + } + + } + + @Test + public void getCell() { + String cell = HBaseUtils.getCell(TABLE_NAME, "rowKey2", STUDENT, "age"); + System.out.println("cell age :" + cell); + + } + + @Test + public void getScanner() { + ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME); + if (scanner != null) { + scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); + scanner.close(); + } + } + + + @Test + public void getScannerWithFilter() { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes(STUDENT), + Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("Jack")); + filterList.addFilter(nameFilter); + ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME, filterList); + if (scanner != null) { + scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); + scanner.close(); + } + } + + @Test + public void deleteColumn() { + boolean b = HBaseUtils.deleteColumn(TABLE_NAME, "rowKey2", STUDENT, "age"); + System.out.println("删除结果: " + b); + } + + @Test + public void deleteRow() { + boolean b = HBaseUtils.deleteRow(TABLE_NAME, "rowKey2"); + System.out.println("删除结果: " + b); + } + + @Test + public void deleteTable() { + boolean b = HBaseUtils.deleteTable(TABLE_NAME); + System.out.println("删除结果: " + b); + } +} \ No newline at end of file diff --git a/code/Hbase/hbase-java-api-2.x/pom.xml b/code/Hbase/hbase-java-api-2.x/pom.xml new file mode 100644 index 0000000..a4a449b --- /dev/null +++ b/code/Hbase/hbase-java-api-2.x/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.heibaiying + hbase-java-api-2.x + 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + org.apache.hbase + hbase-client + 2.1.4 + + + junit + junit + 4.12 + test + + + + \ No newline at end of file diff --git a/code/Hbase/hbase-java-api-2.x/src/main/java/com/heibaiying/HBaseUtils.java b/code/Hbase/hbase-java-api-2.x/src/main/java/com/heibaiying/HBaseUtils.java new file mode 100644 index 0000000..8f1489f --- /dev/null +++ b/code/Hbase/hbase-java-api-2.x/src/main/java/com/heibaiying/HBaseUtils.java @@ -0,0 +1,266 @@ +package com.heibaiying; + +import javafx.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.List; + +public class HBaseUtils { + + private static Connection connection; + + static { + Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.property.clientPort", "2181"); + // 如果是集群 则主机名用逗号分隔 + configuration.set("hbase.zookeeper.quorum", "hadoop001"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 创建HBase表 + * + * @param tableName 表名 + * @param columnFamilies 列族的数组 + */ + public static boolean createTable(String tableName, List columnFamilies) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + if (admin.tableExists(TableName.valueOf(tableName))) { + return false; + } + TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); + columnFamilies.forEach(columnFamily -> { + ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)); + cfDescriptorBuilder.setMaxVersions(1); + ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build(); + tableDescriptor.setColumnFamily(familyDescriptor); + }); + admin.createTable(tableDescriptor.build()); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除hBase表 + * + * @param tableName 表名 + */ + public static boolean deleteTable(String tableName) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + // 删除表前需要先禁用表 + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param qualifier 列标识 + * @param value 数据 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, + String value) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param pairList 列标识和值的集合 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List> pairList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 根据rowKey获取指定行的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static Result getRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + return table.get(get); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 获取指定行指定列(cell)的最新版本的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamily 列族 + * @param qualifier 列标识 + */ + public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + if (!get.isCheckExistenceOnly()) { + get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + Result result = table.get(get); + byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + return Bytes.toString(resultValue); + } else { + return null; + } + + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索全表 + * + * @param tableName 表名 + */ + public static ResultScanner getScanner(String tableName) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param startRowKey 起始RowKey + * @param endRowKey 终止RowKey + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, + FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.withStartRow(Bytes.toBytes(startRowKey)); + scan.withStopRow(Bytes.toBytes(endRowKey)); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 删除指定行记录 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static boolean deleteRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + table.delete(delete); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除指定行指定列 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param familyName 列族 + * @param qualifier 列标识 + */ + public static boolean deleteColumn(String tableName, String rowKey, String familyName, + String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); + table.delete(delete); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + +} diff --git a/code/Hbase/hbase-java-api-2.x/src/test/java/heibaiying/HBaseUtilsTest.java b/code/Hbase/hbase-java-api-2.x/src/test/java/heibaiying/HBaseUtilsTest.java new file mode 100644 index 0000000..a11e6da --- /dev/null +++ b/code/Hbase/hbase-java-api-2.x/src/test/java/heibaiying/HBaseUtilsTest.java @@ -0,0 +1,109 @@ +package heibaiying; + +import com.heibaiying.HBaseUtils; +import javafx.util.Pair; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class HBaseUtilsTest { + + private static final String TABLE_NAME = "class"; + private static final String TEACHER = "teacher"; + private static final String STUDENT = "student"; + + @Test + public void createTable() { + // 新建表 + List columnFamilies = Arrays.asList(TEACHER, STUDENT); + boolean table = HBaseUtils.createTable(TABLE_NAME, columnFamilies); + System.out.println("表创建结果:" + table); + } + + @Test + public void insertData() { + List> pairs1 = Arrays.asList(new Pair<>("name", "Tom"), + new Pair<>("age", "22"), + new Pair<>("gender", "1")); + HBaseUtils.putRow(TABLE_NAME, "rowKey1", STUDENT, pairs1); + + List> pairs2 = Arrays.asList(new Pair<>("name", "Jack"), + new Pair<>("age", "33"), + new Pair<>("gender", "2")); + HBaseUtils.putRow(TABLE_NAME, "rowKey2", STUDENT, pairs2); + + List> pairs3 = Arrays.asList(new Pair<>("name", "Mike"), + new Pair<>("age", "44"), + new Pair<>("gender", "1")); + HBaseUtils.putRow(TABLE_NAME, "rowKey3", STUDENT, pairs3); + } + + + @Test + public void getRow() { + Result result = HBaseUtils.getRow(TABLE_NAME, "rowKey1"); + if (result != null) { + System.out.println(Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))); + } + + } + + @Test + public void getCell() { + String cell = HBaseUtils.getCell(TABLE_NAME, "rowKey2", STUDENT, "age"); + System.out.println("cell age :" + cell); + + } + + @Test + public void getScanner() { + ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME); + if (scanner != null) { + scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); + scanner.close(); + } + } + + + @Test + public void getScannerWithFilter() { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes(STUDENT), + Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Jack")); + filterList.addFilter(nameFilter); + ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME, filterList); + if (scanner != null) { + scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); + scanner.close(); + } + } + + @Test + public void deleteColumn() { + boolean b = HBaseUtils.deleteColumn(TABLE_NAME, "rowKey2", STUDENT, "age"); + System.out.println("删除结果: " + b); + } + + @Test + public void deleteRow() { + boolean b = HBaseUtils.deleteRow(TABLE_NAME, "rowKey2"); + System.out.println("删除结果: " + b); + } + + @Test + public void deleteTable() { + boolean b = HBaseUtils.deleteTable(TABLE_NAME); + System.out.println("删除结果: " + b); + } +} \ No newline at end of file diff --git a/notes/Hbase Java API.md b/notes/Hbase Java API.md new file mode 100644 index 0000000..6f858f8 --- /dev/null +++ b/notes/Hbase Java API.md @@ -0,0 +1,758 @@ +# Hbase Java API 的基本使用 + + + + + +## 一、简述 + +截至到目前(2019年4月),Hbase 主要有1.x 和 2.x 两个主要的版本,两个版本的Java API的接口和方法都是有所不同的,1.x 中某些方法在2.x中被标识为`@deprecated `过时,所以下面关于API的样例,我会分别给出1.x和2.x两个版本。完整的代码见本仓库: + +>+ [Java API 1.x Examples](https://github.com/heibaiying/BigData-Notes/tree/master/code/Hbase/hbase-java-api-1.x) +> +>+ [Java API 2.x Examples](https://github.com/heibaiying/BigData-Notes/tree/master/code/Hbase/hbase-java-api-2.x) + +同时在实际使用中,客户端的版本必须与服务端保持一致,如果用2.x版本的客户端代码去连接1.x版本的服务端,是会抛出`NoSuchColumnFamilyException`等异常的。 + +## 二、Java API 1.x 基本使用 + +#### 2.1 新建Maven工程,导入项目依赖 + +要使用Java API 操作HBase,仅需要引入`hbase-client`。这里我服务端的Hbase版本为`hbase-1.2.0-cdh5.15.2`,对应的`Hbase client` 选取 1.2.0 版本 + +```xml + + org.apache.hbase + hbase-client + 1.2.0 + +``` + +#### 2.2 API 基本使用 + +这里列举了常用的增删改查操作 + +```java +public class HBaseUtils { + + private static Connection connection; + + static { + Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.property.clientPort", "2181"); + // 如果是集群 则主机名用逗号分隔 + configuration.set("hbase.zookeeper.quorum", "hadoop001"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 创建HBase表 + * + * @param tableName 表名 + * @param columnFamilies 列族的数组 + */ + public static boolean createTable(String tableName, List columnFamilies) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + if (admin.tableExists(tableName)) { + return false; + } + HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); + columnFamilies.forEach(columnFamily -> { + HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily); + columnDescriptor.setMaxVersions(1); + tableDescriptor.addFamily(columnDescriptor); + }); + admin.createTable(tableDescriptor); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除hBase表 + * + * @param tableName 表名 + */ + public static boolean deleteTable(String tableName) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + // 删除表前需要先禁用表 + admin.disableTable(tableName); + admin.deleteTable(tableName); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param qualifier 列标识 + * @param value 数据 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, + String value) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param pairList 列标识和值的集合 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List> pairList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 根据rowKey获取指定行的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static Result getRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + return table.get(get); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 获取指定行指定列(cell)的最新版本的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamily 列族 + * @param qualifier 列标识 + */ + public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + if (!get.isCheckExistenceOnly()) { + get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + Result result = table.get(get); + byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + return Bytes.toString(resultValue); + } else { + return null; + } + + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索全表 + * + * @param tableName 表名 + */ + public static ResultScanner getScanner(String tableName) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param startRowKey 起始RowKey + * @param endRowKey 终止RowKey + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, + FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.setStartRow(Bytes.toBytes(startRowKey)); + scan.setStopRow(Bytes.toBytes(endRowKey)); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 删除指定行记录 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static boolean deleteRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + table.delete(delete); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除指定行的指定列 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param familyName 列族 + * @param qualifier 列标识 + */ + public static boolean deleteColumn(String tableName, String rowKey, String familyName, + String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); + table.delete(delete); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + +} +``` + +### 2.3 单元测试 + +以单元测试的方式对封装的API进行测试 + +```java +public class HBaseUtilsTest { + + private static final String TABLE_NAME = "class"; + private static final String TEACHER = "teacher"; + private static final String STUDENT = "student"; + + @Test + public void createTable() { + // 新建表 + List columnFamilies = Arrays.asList(TEACHER, STUDENT); + boolean table = HBaseUtils.createTable(TABLE_NAME, columnFamilies); + System.out.println("表创建结果:" + table); + } + + @Test + public void insertData() { + List> pairs1 = Arrays.asList(new Pair<>("name", "Tom"), + new Pair<>("age", "22"), + new Pair<>("gender", "1")); + HBaseUtils.putRow(TABLE_NAME, "rowKey1", STUDENT, pairs1); + + List> pairs2 = Arrays.asList(new Pair<>("name", "Jack"), + new Pair<>("age", "33"), + new Pair<>("gender", "2")); + HBaseUtils.putRow(TABLE_NAME, "rowKey2", STUDENT, pairs2); + + List> pairs3 = Arrays.asList(new Pair<>("name", "Mike"), + new Pair<>("age", "44"), + new Pair<>("gender", "1")); + HBaseUtils.putRow(TABLE_NAME, "rowKey3", STUDENT, pairs3); + } + + + @Test + public void getRow() { + Result result = HBaseUtils.getRow(TABLE_NAME, "rowKey1"); + if (result != null) { + System.out.println(Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))); + } + + } + + @Test + public void getCell() { + String cell = HBaseUtils.getCell(TABLE_NAME, "rowKey2", STUDENT, "age"); + System.out.println("cell age :" + cell); + + } + + @Test + public void getScanner() { + ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME); + if (scanner != null) { + scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); + scanner.close(); + } + } + + + @Test + public void getScannerWithFilter() { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes(STUDENT), + Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Jack")); + filterList.addFilter(nameFilter); + ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME, filterList); + if (scanner != null) { + scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes + .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); + scanner.close(); + } + } + + @Test + public void deleteColumn() { + boolean b = HBaseUtils.deleteColumn(TABLE_NAME, "rowKey2", STUDENT, "age"); + System.out.println("删除结果: " + b); + } + + @Test + public void deleteRow() { + boolean b = HBaseUtils.deleteRow(TABLE_NAME, "rowKey2"); + System.out.println("删除结果: " + b); + } + + @Test + public void deleteTable() { + boolean b = HBaseUtils.deleteTable(TABLE_NAME); + System.out.println("删除结果: " + b); + } +} +``` + + + +## 三、Java API 2.x 基本使用 + +#### 3.1 新建Maven工程,导入项目依赖 + +这里选取的`HBase Client`的版本为最新的`2.1.4` + +```xml + + org.apache.hbase + hbase-client + 2.1.4 + +``` + +#### 3.2 API 的基本使用 + +2.x 版本相比于1.x 废弃了一部分方法,关于废弃的方法在源码中都会指明新的替代方法,比如,在2.x中创建表时:`HTableDescriptor`和`HColumnDescriptor`等类都标识为废弃,且会在3.0.0版本移除,取而代之的是使用`TableDescriptorBuilder`和`ColumnFamilyDescriptorBuilder`来定义表和列族。在升级版本时,可以用源码中指明的新的替代方法来代替过期的方法。 + +
+ + + +以下为HBase 2.x 版本Java API使用的完整示例: + +```java +public class HBaseUtils { + + private static Connection connection; + + static { + Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.property.clientPort", "2181"); + // 如果是集群 则主机名用逗号分隔 + configuration.set("hbase.zookeeper.quorum", "hadoop001"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 创建HBase表 + * + * @param tableName 表名 + * @param columnFamilies 列族的数组 + */ + public static boolean createTable(String tableName, List columnFamilies) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + if (admin.tableExists(TableName.valueOf(tableName))) { + return false; + } + TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); + columnFamilies.forEach(columnFamily -> { + ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)); + cfDescriptorBuilder.setMaxVersions(1); + ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build(); + tableDescriptor.setColumnFamily(familyDescriptor); + }); + admin.createTable(tableDescriptor.build()); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除hBase表 + * + * @param tableName 表名 + */ + public static boolean deleteTable(String tableName) { + try { + HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); + // 删除表前需要先禁用表 + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param qualifier 列标识 + * @param value 数据 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, + String value) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 插入数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamilyName 列族名 + * @param pairList 列标识和值的集合 + */ + public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List> pairList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Put put = new Put(Bytes.toBytes(rowKey)); + pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); + table.put(put); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 根据rowKey获取指定行的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static Result getRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + return table.get(get); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 获取指定行指定列(cell)的最新版本的数据 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param columnFamily 列族 + * @param qualifier 列标识 + */ + public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Get get = new Get(Bytes.toBytes(rowKey)); + if (!get.isCheckExistenceOnly()) { + get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + Result result = table.get(get); + byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); + return Bytes.toString(resultValue); + } else { + return null; + } + + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索全表 + * + * @param tableName 表名 + */ + public static ResultScanner getScanner(String tableName) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 检索表中指定数据 + * + * @param tableName 表名 + * @param startRowKey 起始RowKey + * @param endRowKey 终止RowKey + * @param filterList 过滤器 + */ + + public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, + FilterList filterList) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + scan.withStartRow(Bytes.toBytes(startRowKey)); + scan.withStopRow(Bytes.toBytes(endRowKey)); + scan.setFilter(filterList); + return table.getScanner(scan); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 删除指定行记录 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + */ + public static boolean deleteRow(String tableName, String rowKey) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + table.delete(delete); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + + + /** + * 删除指定行指定列 + * + * @param tableName 表名 + * @param rowKey 唯一标识 + * @param familyName 列族 + * @param qualifier 列标识 + */ + public static boolean deleteColumn(String tableName, String rowKey, String familyName, + String qualifier) { + try { + Table table = connection.getTable(TableName.valueOf(tableName)); + Delete delete = new Delete(Bytes.toBytes(rowKey)); + delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); + table.delete(delete); + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return true; + } + +} +``` + + + +## 四、正确连接Hbase + +在上面的代码中在类加载时候就初始化了Connection连接,并且之后的方法都是复用这个Connection,这时我们可能会考虑是否可以使用义连接池来获取更好的性能表现?实际上这是没有必要的。 + +首先官方对于`Connection Pooling`做了如下表述: + +```properties +Connection Pooling +For applications which require high-end multithreaded access (e.g., web-servers or application servers that may serve many application threads in a single JVM), you can pre-create a Connection, as shown in the following example: + +#对于高并发多线程访问的应用程序(例如,在单个JVM中存在的为多个线程服务的Web服务器或应用程序服务器),您只需要预先创建一个Connection。例子如下: +``` + +```java +// Create a connection to the cluster. +Configuration conf = HBaseConfiguration.create(); +try (Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(TableName.valueOf(tablename))) { + // use table as needed, the table returned is lightweight +} +``` + +之所以能这样使用,这是因为Connection并不是一个简单的socket连接,[接口文档](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html)中对Connection的表述是: + +```properties +A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper. Connections are instantiated through the ConnectionFactory class. The lifecycle of the connection is managed by the caller, who has to close() the connection to release the resources. + +# Connection是一个集群连接,封装了与多台服务器(Matser/Region Server)的底层连接以及与zookeeper的连接。连接通过ConnectionFactory类实例化。连接的生命周期由调用者管理,调用者必须使用close()关闭连接以释放资源。 +``` + +之所以封装这些连接,是因为HBase客户端需要连接三个不同的服务角色: + ++ Zookeeper:主要用于获得meta-region位置,集群Id、master等信息。 ++ HBase Master:主要用于执行HBaseAdmin接口的一些操作,例如建表等。 ++ HBase RegionServer:用于读、写数据。 + +
+ +Connection对象和实际的socket连接之间的对应关系如下图: + +
+ +> 上面两张图片引用自博客:[连接HBase的正确姿势](https://yq.aliyun.com/articles/581702?spm=a2c4e.11157919.spm-cont-list.1.146c27aeFxoMsN%20%E8%BF%9E%E6%8E%A5HBase%E7%9A%84%E6%AD%A3%E7%A1%AE%E5%A7%BF%E5%8A%BF) + +在HBase客户端代码中,真正对应socket连接的是RpcConnection对象。HBase使用PoolMap这种数据结构来存储客户端到HBase服务器之间的连接。PoolMap封装了ConcurrentHashMap>的结构,key是ConnectionId(封装了服务器地址和用户ticket),value是一个RpcConnection对象的资源池。当HBase需要连接一个服务器时,首先会根据ConnectionId找到对应的连接池,然后从连接池中取出一个连接对象。 + +```java +@InterfaceAudience.Private +public class PoolMap implements Map { + private PoolType poolType; + + private int poolMaxSize; + + private Map> pools = new ConcurrentHashMap<>(); + + public PoolMap(PoolType poolType) { + this.poolType = poolType; + } + ..... +``` + +HBase中提供了三种资源池的实现,分别是Reusable,RoundRobin和ThreadLocal。具体实现可以通hbase.client.ipc.pool.type配置项指定,默认为Reusable。连接池的大小也可以通过hbase.client.ipc.pool.size配置项指定,默认为1,即每个Server 1个连接。也可以通过修改配置实现: + +```java +config.set("hbase.client.ipc.pool.type",...); +config.set("hbase.client.ipc.pool.size",...); +connection = ConnectionFactory.createConnection(config); +``` + +从以上的表述中,可以看出HBase中Connection类已经实现了对连接的管理功能,所以我们不需要自己在Connection之上再做额外的管理。 + +另外,Connection是线程安全的,而Table和Admin则不是线程安全的,因此正确的做法是一个进程共用一个Connection对象,而在不同的线程中使用单独的Table和Admin对象,且Table和Admin的获取`getTable()`和`getAdmin()`都是轻量级的操作,所以不必担心性能的消耗,在使用完成后建议显示的调用`close()`方法关闭它们。 + + + +## 参考资料 + +1. [连接HBase的正确姿势](https://yq.aliyun.com/articles/581702?spm=a2c4e.11157919.spm-cont-list.1.146c27aeFxoMsN%20%E8%BF%9E%E6%8E%A5HBase%E7%9A%84%E6%AD%A3%E7%A1%AE%E5%A7%BF%E5%8A%BF) +2. [Apache HBase ™ Reference Guide](http://hbase.apache.org/book.htm) + diff --git a/pictures/deprecated.png b/pictures/deprecated.png new file mode 100644 index 0000000..7137864 Binary files /dev/null and b/pictures/deprecated.png differ diff --git a/pictures/hbase-arc.png b/pictures/hbase-arc.png new file mode 100644 index 0000000..0ce2be9 Binary files /dev/null and b/pictures/hbase-arc.png differ diff --git a/pictures/hbase-connection.png b/pictures/hbase-connection.png new file mode 100644 index 0000000..5e722ef Binary files /dev/null and b/pictures/hbase-connection.png differ