w22116972
12/9/2015 - 4:55 AM

hbase_test.java

package test;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;

/**	_________________________________________________________________________________________________________________________
 * 	|	TableName	|			ColumnFamily1				|	ColumnFamily2 ...	|
 * 	|			|_______________________________________________________________|_______________________________|
 * 	|			|	ColumnQualifier1	|	ColumnQualifier2...	|
 * 	|_______________________|_______________________________|_______________________________|
 * 	|	RowKey1		|	Cell			|	Cell ...		|
 * 	|			|	t1: a			|				|
 * 	|			|	t2: b			|				|
 * 	|_______________________|_______________________________|_______________________________|
 * 	|	RowKey2 ...	|	
 * 	|_______________________|
 * 
 * 	每當你在該cell insert值,並不會覆蓋原有的值(t1:a),而是在該cell位置上再增加一個t2:b
 * 	t1就是版本1,t2是版本2同時也是最新的版本,所以回傳該cell時,會回傳t2的值
 * 	這些t版本是以timestamp的方式來呈現
 * 
 * 	舉例:  tableName : 考試成績, RowKey : 學生學號, ColumnFamily : 自然學科..人文學科.., ColumnQualifier : 數學科..化學科.., Cell : 第一次考試..第二次考試..
 * 
 * 	hbase裡面是以bytes方式儲存,所以要一直很麻煩的Bytes.tobytes()
 * 
 * 	程式碼參考書 : HBase - The Definitive Guide - 2nd Edition (2015-07-07) This book covers the 1.0.0 release of HBase
 * 	參考書下載   : http://www.it-ebooks.org/book/oreilly/hbase_the_definitive_guide_2nd_edition_early_release
 */


public class hbase_test {
	
	private static Configuration conf;
	private static Connection con;
	//private static TableName tableName = TableName.valueOf("test2");
	
	static {
		/** Hadoop class, load and provide conf to client AP.
		 *                load from hbase-site.xml, hbase-default.xml 
		 *  使用HBaseConfiguration來建立configuration */
		conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "localhost"); // override ZooKeeper quorum address to point to a different cluster
		conf.set("hbase.zookeeper.property.clientPort", "2181");
		
		/** Connection        : create instance only once per Ap, share it during runtime
		 *  ConnectionFactory : retrieve Connection instance, configured as per given config  
		 *  https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Connection.html 
		 *  https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/ConnectionFactory.html
		 */
		 // 設定完成後需要連線去連接資料庫,我們使用ConnectionFactory來建立連線
		try {
			con = ConnectionFactory.createConnection(conf);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public void getOneDataByRowKey(String name, String rowKey)throws IOException{
		/* 有了字串型態的表格名稱,就可以轉成TableName的表格名稱 */
		TableName tableName = TableName.valueOf(name);
		/* 利用連線(con)及表格名稱來找到該表格 
		 * 因為HTable已經被deprecated所以用Table來代替 
		 * Table : Used to communicate with a single HBase table. 
		 *         can be used to get, put, delete or scan data from a table. */
		Table table = con.getTable(tableName); // HTable is no longer a client API. Use Table instead.
		/* Constructor -> Get(byte[] row) : Create a Get operation for the specified row. 
		 * Get : The object that specifies what data to fetch and from which row */
		Get get = new Get(Bytes.toBytes(rowKey));
		/* Result : Single row result of a Get or Scan query 
		 * table.get() : Extracts certain cells from a given row */
		Result result = table.get(get);
		System.out.println("----------Result---------------"); 
		System.out.println(result); //<row-key>/<family>:<qualifier>/<version>/<type>/<value-length>/<sequence-id>
		/* 有了字串型態的表格名稱,就可以轉成TableName的表格名稱 
		 * CellScanner : An interface for iterating through a sequence of cells */
		CellScanner scanner = result.cellScanner(); // https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/CellScanner.html
		System.out.println("----------CellScanner----------");
		while (scanner.advance()) { // scanner.advance() : Advance the scanner 1 cell
			// scanner.current() : the current Cell
			Cell cell = scanner.current(); //https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/Cell.html
			System.out.println("Row Key          : " + Bytes.toStringBinary(CellUtil.cloneRow(cell)));
			System.out.println("Timestamp        : " + cell.getTimestamp());
			System.out.println("Column Family    : " + Bytes.toStringBinary(CellUtil.cloneFamily(cell)));
			System.out.println("Column Qualifier : " + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
			System.out.println("Value            : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
			System.out.println("----------------------------------------");
			// https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/CellUtil.html
		}
		table.close();
	}
	
	public void insertRow(String tblname, String rowKey,  String columnFamily, String columnQual, String value) throws Exception {  
		/** TableName : just a table name with default namespace
		 *  Table     : data table within client, create one per thread, not thread-safe  */
		TableName tableName = TableName.valueOf(tblname);
		Table table = con.getTable(tableName); 
		/** Put : create put with specific row  */
        	Put put = new Put(Bytes.toBytes(rowKey));  
        	/** addColumn : add "columnFamily:qualifier = value" to put  */
        	put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQual), Bytes.toBytes(value));
        	/** put : store row with col into HBase table  */
        	System.out.printf("----Insert row into %s with rowKey : %s, colFamily : %s, colQual : %s, value : %s ----\n", tblname, rowKey, columnFamily, columnQual, value);
        	table.put(put);  
        	table.close();
    	}
	
	/** 把老師版本的第二個參數做修改,變成能夠一次接受多個字串當做columnFamily
	*   String... 是接收"多個"字串型態的參數當成陣列,所以後面要多少個都行 eg. t.createTable("hbaseTest", "colFam1", "colFam2", "colFam3");
	*   再把familyName這個陣列用foreach的方式全都取出來再addFamily
	*/
	public void createTable(String tblname, String... familyName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
		/*  Admin : The administrative API for HBase, btain an instance from an Connection.getAdmin() 
		 *  Admin can be used to create, drop, list, enable and disable tables, add and drop table column families */
		Admin admin = con.getAdmin();
		TableName tableName = TableName.valueOf(tblname);
		HTableDescriptor tableDesc = new HTableDescriptor(tableName);
		System.out.print("--------createTable with ColumnFamily :");
		for(String name : familyName) {
			HColumnDescriptor colDesc = new HColumnDescriptor(name);
			//colDesc.setMaxVersions(1);
			tableDesc.addFamily(colDesc);
			System.out.print(" " + name);
		}
		System.out.println("--------");
		admin.createTable(tableDesc);
	}
	
	
	public void getVersionData(String tblname, String rowKey, String columnFamily, String columnQual, int version)throws IOException {
		TableName tableName = TableName.valueOf(tblname);
		Table table = con.getTable(tableName);
		Get get = new Get(Bytes.toBytes(rowKey));
		Result result = table.get(get);
		List<Cell> pwd = result.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQual));
		Cell cell = pwd.get(version);
		System.out.println("Versioned data : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
		System.out.println("timestamp      : " + cell.getTimestamp());
		table.close();
	}
	
	public void deleteData(String tblname, String rowKey,  String columnFamily, String columnQual)throws IOException {
		TableName tableName = TableName.valueOf(tblname);
		Table table = con.getTable(tableName);
		Delete del = new Delete(Bytes.toBytes(rowKey));
		/* addColumn() : Delete the latest version of the specified column */
		del.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQual)); 
		// https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Delete.html#addColumn(byte[],%20byte[])
		table.delete(del);
	}
	
	public void scanTable(String tblname) throws IOException {
		TableName tableName = TableName.valueOf(tblname);
		Table table = con.getTable(tableName);
		Scan scan = new Scan(); // Scan() : Create a Scan operation across all rows
		ResultScanner rs = table.getScanner(scan);
		for (Result r : rs) {
			CellScanner scanner = r.cellScanner();
			while (scanner.advance()) {
				Cell cell = scanner.current();
				System.out.println("--------ScanTable--------");
				System.out.println("Row Key          : " + Bytes.toStringBinary(CellUtil.cloneRow(cell)));
				System.out.println("Timestamp        : " + cell.getTimestamp());
				System.out.println("Column Family    : " + Bytes.toStringBinary(CellUtil.cloneFamily(cell)));
				System.out.println("Column Qualifier : " + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
				System.out.println("Value            : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
				System.out.println("----------------------------------------");
			}
		}
		// Scan(byte[] startRow, byte[] stopRow) : Create a Scan operation for the range of rows specified.
	}
	
	public void filterScan(String tblname, String find) throws IOException {
		TableName tableName = TableName.valueOf(tblname);
		Table table = con.getTable(tableName);
		Scan scan = new Scan(); // Scan() : Create a Scan operation across all rows
		Filter f = new ValueFilter(CompareOp.EQUAL, new RegexStringComparator(find)); // eg. ".*2.*"
		scan.setFilter(f);
		ResultScanner rs = table.getScanner(scan);
		for (Result r : rs) {
			CellScanner scanner = r.cellScanner();
			while (scanner.advance()) {
				Cell cell = scanner.current();
				System.out.println("--------ScanFilterTable--------");
				System.out.println("Row Key          : " + Bytes.toStringBinary(CellUtil.cloneRow(cell)));
				System.out.println("Timestamp        : " + cell.getTimestamp());
				System.out.println("Column Family    : " + Bytes.toStringBinary(CellUtil.cloneFamily(cell)));
				System.out.println("Column Qualifier : " + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
				System.out.println("Value            : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
				System.out.println("----------------------------------------");
			}
		}
	}


	public static void main(String[] args) throws Exception {
		hbase_test t = new hbase_test();
		t.createTable("hbaseTest", "colFam1", "colFam2"); // 這段程式碼執行第二次會失敗,因為已經存在同名的tableName了
		t.insertRow("hbaseTest", "rowKey1" , "colFam1", "colQual1", "value1");
		t.insertRow("hbaseTest", "rowKey1" , "colFam2", "colQual1", "value2");
		t.insertRow("hbaseTest", "rowKey1" , "colFam1", "colQual2", "value3");
		t.getOneDataByRowKey("hbaseTest", "rowKey1");
		t.scanTable("hbaseTest");
		t.filterScan("hbaseTest", ".*2.*");
		con.close();
		
		
		/////////////////////////////////////////////////////////
		
		/** TableName : just a table name with default namespace
		 *  Table     : data table within client, create one per thread, not thread-safe  */
		//Table table = con.getTable(TableName.valueOf("test2")); // Instantiate a new client
		
		/** Put : create put with specific row  */
		//Put put = new Put(Bytes.toBytes("testrow")); // byte[] rowkey = Bytes.toBytes("testrow"); Put(byte[] rowkey);
		
		/** addColumn : add col name "colfam-1:qual-1" to put  */
		/*
		put.addColumn(Bytes.toBytes("colfam-1"), Bytes.toBytes("qual-1"), Bytes.toBytes("val-11")); // Put addColumn(byte[] family, byte[] qualifier, byte[] value)
		put.addColumn(Bytes.toBytes("colfam-1"), Bytes.toBytes("qual-2"), Bytes.toBytes("val-22"));
		put.addColumn(Bytes.toBytes("colfam-2"), Bytes.toBytes("qual-3"), Bytes.toBytes("val-33"));
		/** put : store row with col into HBase table  */
		//table.put(put);
		
		/** Inserting data into HBase using a list  */
		/*
		List<Put> puts = new ArrayList<Put>();
		Put put1 = new Put(Bytes.toBytes("row1"));
		put1.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
		Bytes.toBytes("val1"));
		puts.add(put1);
		Put put2 = new Put(Bytes.toBytes("row2"));
		put2.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
		Bytes.toBytes("val2"));
		puts.add(put2);
		Put put3 = new Put(Bytes.toBytes("row2"));
		put3.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
		Bytes.toBytes("val3"));
		puts.add(put3);
		table.put(puts);
		
		
		System.out.println("getOneDataByRowKey :");
		getOneDataByRowKey(TableName.valueOf("test2"), "testrow");
		
		/** close : close instance to free resources  */
		//table.close();
		
	}
	/*
	public void getOneDataByAllInfo(TableName tableName, String rowKey, String colFam, String colQual) throws IOException {
		Table table = con.getTable(tableName);
		Get get = new Get(Bytes.toBytes(rowKey));
		// 1
		get.addFamily(Bytes.toBytes(colFam));
		Result result = table.get(get);
		byte[] val = result.getValue(Bytes.toBytes(colFam), Bytes.toBytes(colQual));
		
		// 2
		
		get.addColumn(Bytes.toBytes(colFam), Bytes.toBytes(colQual));
		Result result = table.get(get);
		byte[] val = result.getValue(Bytes.toBytes(colFam), Bytes.toBytes(colQual)); // Get a specific value for the given column
		
		
		System.out.println("Value: " + Bytes.toString(val));
		table.close();
	}
	*/
}