hbase根据rowkey多个值过滤查询(scala环境)
(2018-04-04 14:17:34)
标签:
hbasescanfilter多个值rowfilter |
分类: 大数据处理 |
我们的目标是:在hbase中能够实现where in(xxx, yyy)多个值过滤的sql查询。
1、hbase提供了多种过滤器,也可以同时处理多个过滤
2、把多个值放在列表中,每个值都是一个过滤器,最后组合成过滤器数组
3、因为多个值不可能同时满足,所以过滤器的模式是Operator.MUST_PASS_ONE,即只要满足一个即可
def main(args: Array[String]): Unit =
{
val sparkConf = new
SparkConf().setMaster("local").setAppName("demo")
val sc = new
SparkContext(sparkConf)
val tablename =
"USER_INTEREST"
val hbaseConf =
HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum",
"hadoop4,hadoop5,hadoop6")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)
val scan = new Scan()
val cookieids = List("123456", "234567")
val filters = new util.ArrayList[Filter]()
for(cookieid <- cookieids) {
val filter = new
RowFilter(CompareFilter.CompareOp.EQUAL, new
BinaryComparator(Bytes.toBytes(cookieid)))
filters.add(filter)
}
val filterList = new
FilterList(Operator.MUST_PASS_ONE,filters)
scan.setFilter(filterList)
hbaseConf.set(TableInputFormat.SCAN,
convertScanToString(scan))
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count =
hBaseRDD.count()
println(count)
hBaseRDD.foreach{case
(_, result) => {
//
获取行键
val
cookieId = Bytes.toString(result.getRow)
val
funnyWeight = Bytes.toString(result.getValue("INFO".getBytes,
"FUNNY".getBytes))
println("Row key:" + cookieId + " funny-weight:" +
funnyWeight)
}}
sc.stop()
}
def convertScanToString(scan: Scan) =
{
val proto =
ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
val hbaseConf =
HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM)
hbaseConf.set("hbase.zookeeper.property.clientPort",
ZOOKEEPER_PORT)
hbaseConf.set("zookeeper.znode.parent",
ZOOKEEPER_ZNODE_PARENT)
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
val connection =
ConnectionFactory.createConnection(hbaseConf)
val table =
connection.getTable(TableName.valueOf(tableName))
val getList = new
util.ArrayList[Get]()
for(rowkey <-
rowkeyValueList){
val
get = new Get(Bytes.toBytes(rowkey))
getList.add(get)
}
val result =
table.get(getList)
connection.close()
result
}
我们的思路:
一、使用scala接口
我们的代码:
import java.util
import org.apache.hadoop.hbase.HBaseConfiguration
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration,
HTableDescriptor, TableName}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.{HTable, Scan}
import
org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import spire.std.byte
object HbaseReadDirectTest {
}
二、使用java接口
def queryHbaseByRowkeyJava(sc: SparkContext, tableName:
String, rowkeyValueList: List[String]): Array[Result] = {