hbase根据rowkey多个值过滤查询(scala环境)
(2018-04-04 14:17:34)| 标签: hbasescanfilter多个值rowfilter | 分类: 大数据处理 | 
			我们的目标是:在hbase中能够实现where in(xxx, yyy)多个值过滤的sql查询。
   
1、hbase提供了多种过滤器,也可以同时处理多个过滤  
   
2、把多个值放在列表中,每个值都是一个过滤器,最后组合成过滤器数组  
   
3、因为多个值不可能同时满足,所以过滤器的模式是Operator.MUST_PASS_ONE,即只要满足一个即可  
  def main(args: Array[String]): Unit =
{ 
    val sparkConf = new
SparkConf().setMaster("local").setAppName("demo")  
    val sc = new
SparkContext(sparkConf)  
    val tablename =
"USER_INTEREST"  
    val hbaseConf =
HBaseConfiguration.create()  
   
hbaseConf.set("hbase.zookeeper.quorum",
"hadoop4,hadoop5,hadoop6")  
   
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")  
   
hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)  
    val scan = new Scan()  
   
val cookieids = List("123456", "234567")  
   
val filters = new util.ArrayList[Filter]()  
   
for(cookieid <- cookieids) {  
   
  val filter = new
RowFilter(CompareFilter.CompareOp.EQUAL, new
BinaryComparator(Bytes.toBytes(cookieid)))   
   
  filters.add(filter)   
   
}  
   
val filterList = new
FilterList(Operator.MUST_PASS_ONE,filters)  
   
scan.setFilter(filterList)  
   
hbaseConf.set(TableInputFormat.SCAN,
convertScanToString(scan))  
   
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf,
classOf[TableInputFormat],  
   
 
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],   
   
 
classOf[org.apache.hadoop.hbase.client.Result])   
    val count =
hBaseRDD.count()  
    println(count)  
    hBaseRDD.foreach{case
(_, result) => {  
      //
获取行键   
      val
cookieId = Bytes.toString(result.getRow)   
      val
funnyWeight = Bytes.toString(result.getValue("INFO".getBytes,
"FUNNY".getBytes))   
     
println("Row key:" + cookieId + " funny-weight:" +
funnyWeight)   
    }}  
    sc.stop()  
  } 
  def convertScanToString(scan: Scan) =
{ 
    val proto =
ProtobufUtil.toScan(scan)  
   
Base64.encodeBytes(proto.toByteArray)  
  } 
    val hbaseConf =
HBaseConfiguration.create()  
   
hbaseConf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM)  
   
hbaseConf.set("hbase.zookeeper.property.clientPort",
ZOOKEEPER_PORT)  
   
hbaseConf.set("zookeeper.znode.parent",
ZOOKEEPER_ZNODE_PARENT)  
   
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)  
    val connection =
ConnectionFactory.createConnection(hbaseConf)  
    val table =
connection.getTable(TableName.valueOf(tableName))  
    val getList = new
util.ArrayList[Get]()  
    for(rowkey <-
rowkeyValueList){  
      val
get = new Get(Bytes.toBytes(rowkey))   
     
getList.add(get)   
    }  
    val result =
table.get(getList)  
    connection.close()  
    result  
  } 
							
		
						
		
		
		
		
		
		
							
		
				
		
				
	我们的思路:
一、使用scala接口
我们的代码:
import java.util
import org.apache.hadoop.hbase.HBaseConfiguration
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration,
HTableDescriptor, TableName}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.{HTable, Scan}
import
org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import spire.std.byte
object HbaseReadDirectTest {
}
二、使用java接口
def queryHbaseByRowkeyJava(sc: SparkContext, tableName:
String, rowkeyValueList: List[String]): Array[Result] = {

 加载中…
加载中…