实现thriftClient与thriftServer通信,实现访问内网HBase集群和101.118.124.111 分别为公网IP,分别为内网IP
域名映射 首先我们要做的是将ThriftServer服务的通信端口9000 映射到内网中,这边映射成了公网的9000端口
thrift 下面是Thrift的百度百科
TBinaryProtocol – 一种简单的二进制格式,简单,但没有为空间效率而优化。比文本协议处理起来更快,但更难于调试。
TCompactProtocol – 更紧凑的二进制格式,处理起来通常同样高效。
TFramedTransport – 当使用一个非阻塞服务器时,要求使用这个传输协议。它按帧来发送数据,其中每一帧的开头是长度信息。
TSocket – 使用阻塞的套接字I/O来传输。
HBase ThriftServer有下面两个参数用来指定是否使用TFramedTransport协议,默认是false这边CDH中不用开启
Use Thrift TFramedTransport on the server side. This is the recommended transport for thrift servers and requires a similar setting on the client side. Changing this to false will select the default transport, vulnerable to DoS when malformed requests are issued due to THRIFT-601.
Use Thrift TCompactProtocol binary serialization protocol.
下面参数用来配置Thrift Gateway的认证,如果你配了这个东西就必须用doAs完成认证才能完成通信
1 2 3 4 5 6 7 8 <property > <name > hbase.regionserver.thrift.http</name > <value > true</value > </property > <property > <name > hbase.thrift.support.proxyuser</name > <value > true/value></property >
想了解更多Configure the Thrift Gateway to Use the doAs Feature ,看59.6章节
Client 客户端可以用python或者是Java与ThriftServer进行通信。值得一提的是python3 在访问时会抛异常,这边初步查了一下也有解决方案,这边就先用python2.7进行测试,下面是代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from common import *from thrift.transport import TSocketfrom thrift.protocol import TBinaryProtocolfrom thrift.transport import TTransportfrom hbase import Hbase transport = TTransport.TBufferedTransport(TSocket.TSocket("" , "9090" )) protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) client = Hbase.Client(protocol) transport.open () rows = client.getRow("cars" , "row1" )for row in rows: rowKey = row.row print ("Got row:" + rowKey); transport.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package com.bim.hbase;import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;import org.apache.hadoop.hbase.thrift.generated.Hbase;import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;import org.apache.thrift.transport.*;import org.apache.thrift.protocol.*;import org.apache.thrift.protocol.TCompactProtocol;import org.apache.thrift.transport.TFramedTransport;import java.io.*;import java.util.ArrayList;import java.util.List;import java.nio.ByteBuffer;import java.nio.charset.Charset;import java.util.Properties;public class HbaseThriftTest { private static String host; private static Integer port; public static void init () { Properties properties = new Properties (); InputStream in = null ; try { in = HbaseThriftTest.class.getClassLoader().getResourceAsStream("system.properties" ); properties.load(in); host = properties.getProperty("hbase.thrift.host" ); port = Integer.parseInt(properties.getProperty("hbase.thrift.port" )); } catch (IOException e) { e.printStackTrace(); }finally { if (in != null ) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main (String[] args) throws Exception { init(); String Proto = "binary" ; String TableName = "t1" ; String ColFamily = "rowkey002" ; TTransport Transport; Transport = new TSocket (host, port); TCompactProtocol FProtocol = new TCompactProtocol (Transport); Hbase.Client Client = new Hbase .Client(FProtocol); if (Proto.equals("binary" )) { TProtocol Protocol = new TBinaryProtocol (Transport, true , true ); Client = new Hbase .Client(Protocol); } else if ( Proto.equals("framed" )) { Transport = new TFramedTransport (new TSocket (host, port)); TProtocol Protocol = new TBinaryProtocol (Transport, true , true ); Client = new Hbase .Client(Protocol); } else if ( ! Proto.equals("compact" )) { System.out.println("Protocol must be compact or framed or binary" ); } Transport.open(); List<ColumnDescriptor> Columns = new ArrayList <ColumnDescriptor>(); ColumnDescriptor col = new ColumnDescriptor (); col.name = ByteBuffer.wrap(ColFamily.getBytes()); Columns.add(col); System.out.println("#~ Dumping Existing tables" ); for (ByteBuffer tn : Client.getTableNames()) { System.out.println("-- found: " + new String (tn.array(), Charset.forName("UTF-8" ))); } System.out.println("#~ Creating table: " + TableName); try { Client.createTable(ByteBuffer.wrap(TableName.getBytes()), Columns); } catch (AlreadyExists ae) { System.out.println("WARN: " + ae.message); } Transport.close(); } }
1 2 hbase.thrift.host= hbase.thrift.port=9090