package test;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
/** _________________________________________________________________________________________________________________________
* | TableName | ColumnFamily1 | ColumnFamily2 ... |
* | |_______________________________________________________________|_______________________________|
* | | ColumnQualifier1 | ColumnQualifier2... |
* |_______________________|_______________________________|_______________________________|
* | RowKey1 | Cell | Cell ... |
* | | t1: a | |
* | | t2: b | |
* |_______________________|_______________________________|_______________________________|
* | RowKey2 ... |
* |_______________________|
*
* 每當你在該cell insert值,並不會覆蓋原有的值(t1:a),而是在該cell位置上再增加一個t2:b
* t1就是版本1,t2是版本2同時也是最新的版本,所以回傳該cell時,會回傳t2的值
* 這些t版本是以timestamp的方式來呈現
*
* 舉例: tableName : 考試成績, RowKey : 學生學號, ColumnFamily : 自然學科..人文學科.., ColumnQualifier : 數學科..化學科.., Cell : 第一次考試..第二次考試..
*
* hbase裡面是以bytes方式儲存,所以要一直很麻煩的Bytes.tobytes()
*
* 程式碼參考書 : HBase - The Definitive Guide - 2nd Edition (2015-07-07) This book covers the 1.0.0 release of HBase
* 參考書下載 : http://www.it-ebooks.org/book/oreilly/hbase_the_definitive_guide_2nd_edition_early_release
*/
public class hbase_test {
private static Configuration conf;
private static Connection con;
//private static TableName tableName = TableName.valueOf("test2");
static {
/** Hadoop class, load and provide conf to client AP.
* load from hbase-site.xml, hbase-default.xml
* 使用HBaseConfiguration來建立configuration */
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost"); // override ZooKeeper quorum address to point to a different cluster
conf.set("hbase.zookeeper.property.clientPort", "2181");
/** Connection : create instance only once per Ap, share it during runtime
* ConnectionFactory : retrieve Connection instance, configured as per given config
* https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Connection.html
* https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/ConnectionFactory.html
*/
// 設定完成後需要連線去連接資料庫,我們使用ConnectionFactory來建立連線
try {
con = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public void getOneDataByRowKey(String name, String rowKey)throws IOException{
/* 有了字串型態的表格名稱,就可以轉成TableName的表格名稱 */
TableName tableName = TableName.valueOf(name);
/* 利用連線(con)及表格名稱來找到該表格
* 因為HTable已經被deprecated所以用Table來代替
* Table : Used to communicate with a single HBase table.
* can be used to get, put, delete or scan data from a table. */
Table table = con.getTable(tableName); // HTable is no longer a client API. Use Table instead.
/* Constructor -> Get(byte[] row) : Create a Get operation for the specified row.
* Get : The object that specifies what data to fetch and from which row */
Get get = new Get(Bytes.toBytes(rowKey));
/* Result : Single row result of a Get or Scan query
* table.get() : Extracts certain cells from a given row */
Result result = table.get(get);
System.out.println("----------Result---------------");
System.out.println(result); //<row-key>/<family>:<qualifier>/<version>/<type>/<value-length>/<sequence-id>
/* 有了字串型態的表格名稱,就可以轉成TableName的表格名稱
* CellScanner : An interface for iterating through a sequence of cells */
CellScanner scanner = result.cellScanner(); // https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/CellScanner.html
System.out.println("----------CellScanner----------");
while (scanner.advance()) { // scanner.advance() : Advance the scanner 1 cell
// scanner.current() : the current Cell
Cell cell = scanner.current(); //https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/Cell.html
System.out.println("Row Key : " + Bytes.toStringBinary(CellUtil.cloneRow(cell)));
System.out.println("Timestamp : " + cell.getTimestamp());
System.out.println("Column Family : " + Bytes.toStringBinary(CellUtil.cloneFamily(cell)));
System.out.println("Column Qualifier : " + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
System.out.println("Value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
System.out.println("----------------------------------------");
// https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/CellUtil.html
}
table.close();
}
public void insertRow(String tblname, String rowKey, String columnFamily, String columnQual, String value) throws Exception {
/** TableName : just a table name with default namespace
* Table : data table within client, create one per thread, not thread-safe */
TableName tableName = TableName.valueOf(tblname);
Table table = con.getTable(tableName);
/** Put : create put with specific row */
Put put = new Put(Bytes.toBytes(rowKey));
/** addColumn : add "columnFamily:qualifier = value" to put */
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQual), Bytes.toBytes(value));
/** put : store row with col into HBase table */
System.out.printf("----Insert row into %s with rowKey : %s, colFamily : %s, colQual : %s, value : %s ----\n", tblname, rowKey, columnFamily, columnQual, value);
table.put(put);
table.close();
}
/** 把老師版本的第二個參數做修改,變成能夠一次接受多個字串當做columnFamily
* String... 是接收"多個"字串型態的參數當成陣列,所以後面要多少個都行 eg. t.createTable("hbaseTest", "colFam1", "colFam2", "colFam3");
* 再把familyName這個陣列用foreach的方式全都取出來再addFamily
*/
public void createTable(String tblname, String... familyName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
/* Admin : The administrative API for HBase, btain an instance from an Connection.getAdmin()
* Admin can be used to create, drop, list, enable and disable tables, add and drop table column families */
Admin admin = con.getAdmin();
TableName tableName = TableName.valueOf(tblname);
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
System.out.print("--------createTable with ColumnFamily :");
for(String name : familyName) {
HColumnDescriptor colDesc = new HColumnDescriptor(name);
//colDesc.setMaxVersions(1);
tableDesc.addFamily(colDesc);
System.out.print(" " + name);
}
System.out.println("--------");
admin.createTable(tableDesc);
}
public void getVersionData(String tblname, String rowKey, String columnFamily, String columnQual, int version)throws IOException {
TableName tableName = TableName.valueOf(tblname);
Table table = con.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
List<Cell> pwd = result.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQual));
Cell cell = pwd.get(version);
System.out.println("Versioned data : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
System.out.println("timestamp : " + cell.getTimestamp());
table.close();
}
public void deleteData(String tblname, String rowKey, String columnFamily, String columnQual)throws IOException {
TableName tableName = TableName.valueOf(tblname);
Table table = con.getTable(tableName);
Delete del = new Delete(Bytes.toBytes(rowKey));
/* addColumn() : Delete the latest version of the specified column */
del.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQual));
// https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Delete.html#addColumn(byte[],%20byte[])
table.delete(del);
}
public void scanTable(String tblname) throws IOException {
TableName tableName = TableName.valueOf(tblname);
Table table = con.getTable(tableName);
Scan scan = new Scan(); // Scan() : Create a Scan operation across all rows
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
CellScanner scanner = r.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
System.out.println("--------ScanTable--------");
System.out.println("Row Key : " + Bytes.toStringBinary(CellUtil.cloneRow(cell)));
System.out.println("Timestamp : " + cell.getTimestamp());
System.out.println("Column Family : " + Bytes.toStringBinary(CellUtil.cloneFamily(cell)));
System.out.println("Column Qualifier : " + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
System.out.println("Value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
System.out.println("----------------------------------------");
}
}
// Scan(byte[] startRow, byte[] stopRow) : Create a Scan operation for the range of rows specified.
}
public void filterScan(String tblname, String find) throws IOException {
TableName tableName = TableName.valueOf(tblname);
Table table = con.getTable(tableName);
Scan scan = new Scan(); // Scan() : Create a Scan operation across all rows
Filter f = new ValueFilter(CompareOp.EQUAL, new RegexStringComparator(find)); // eg. ".*2.*"
scan.setFilter(f);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
CellScanner scanner = r.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
System.out.println("--------ScanFilterTable--------");
System.out.println("Row Key : " + Bytes.toStringBinary(CellUtil.cloneRow(cell)));
System.out.println("Timestamp : " + cell.getTimestamp());
System.out.println("Column Family : " + Bytes.toStringBinary(CellUtil.cloneFamily(cell)));
System.out.println("Column Qualifier : " + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
System.out.println("Value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
System.out.println("----------------------------------------");
}
}
}
public static void main(String[] args) throws Exception {
hbase_test t = new hbase_test();
t.createTable("hbaseTest", "colFam1", "colFam2"); // 這段程式碼執行第二次會失敗,因為已經存在同名的tableName了
t.insertRow("hbaseTest", "rowKey1" , "colFam1", "colQual1", "value1");
t.insertRow("hbaseTest", "rowKey1" , "colFam2", "colQual1", "value2");
t.insertRow("hbaseTest", "rowKey1" , "colFam1", "colQual2", "value3");
t.getOneDataByRowKey("hbaseTest", "rowKey1");
t.scanTable("hbaseTest");
t.filterScan("hbaseTest", ".*2.*");
con.close();
/////////////////////////////////////////////////////////
/** TableName : just a table name with default namespace
* Table : data table within client, create one per thread, not thread-safe */
//Table table = con.getTable(TableName.valueOf("test2")); // Instantiate a new client
/** Put : create put with specific row */
//Put put = new Put(Bytes.toBytes("testrow")); // byte[] rowkey = Bytes.toBytes("testrow"); Put(byte[] rowkey);
/** addColumn : add col name "colfam-1:qual-1" to put */
/*
put.addColumn(Bytes.toBytes("colfam-1"), Bytes.toBytes("qual-1"), Bytes.toBytes("val-11")); // Put addColumn(byte[] family, byte[] qualifier, byte[] value)
put.addColumn(Bytes.toBytes("colfam-1"), Bytes.toBytes("qual-2"), Bytes.toBytes("val-22"));
put.addColumn(Bytes.toBytes("colfam-2"), Bytes.toBytes("qual-3"), Bytes.toBytes("val-33"));
/** put : store row with col into HBase table */
//table.put(put);
/** Inserting data into HBase using a list */
/*
List<Put> puts = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
puts.add(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
puts.add(put2);
Put put3 = new Put(Bytes.toBytes("row2"));
put3.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val3"));
puts.add(put3);
table.put(puts);
System.out.println("getOneDataByRowKey :");
getOneDataByRowKey(TableName.valueOf("test2"), "testrow");
/** close : close instance to free resources */
//table.close();
}
/*
public void getOneDataByAllInfo(TableName tableName, String rowKey, String colFam, String colQual) throws IOException {
Table table = con.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
// 1
get.addFamily(Bytes.toBytes(colFam));
Result result = table.get(get);
byte[] val = result.getValue(Bytes.toBytes(colFam), Bytes.toBytes(colQual));
// 2
get.addColumn(Bytes.toBytes(colFam), Bytes.toBytes(colQual));
Result result = table.get(get);
byte[] val = result.getValue(Bytes.toBytes(colFam), Bytes.toBytes(colQual)); // Get a specific value for the given column
System.out.println("Value: " + Bytes.toString(val));
table.close();
}
*/
}