加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

hbase根据rowkey多个值过滤查询(scala环境)

(2018-04-04 14:17:34)
标签:

hbase

scan

filter

多个值

rowfilter

分类: 大数据处理
我们的目标是:在hbase中能够实现where in(xxx, yyy)多个值过滤的sql查询。
我们的思路:
一、使用scala接口
    1、hbase提供了多种过滤器,也可以同时处理多个过滤
    2、把多个值放在列表中,每个值都是一个过滤器,最后组合成过滤器数组
    3、因为多个值不可能同时满足,所以过滤器的模式是Operator.MUST_PASS_ONE,即只要满足一个即可


我们的代码:
import java.util

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.{HTable, Scan}
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import spire.std.byte

object HbaseReadDirectTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("demo")
    val sc = new SparkContext(sparkConf)

    val tablename = "USER_INTEREST"
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "hadoop4,hadoop5,hadoop6")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)

    val scan = new Scan()
    val cookieids = List("123456", "234567")
    val filters = new util.ArrayList[Filter]()
    for(cookieid <- cookieids) {
      val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cookieid)))
      filters.add(filter)
    }
    val filterList = new FilterList(Operator.MUST_PASS_ONE,filters)
    scan.setFilter(filterList)

    hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val count = hBaseRDD.count()
    println(count)

    hBaseRDD.foreach{case (_, result) => {
      // 获取行键
      val cookieId = Bytes.toString(result.getRow)
      val funnyWeight = Bytes.toString(result.getValue("INFO".getBytes, "FUNNY".getBytes))
      println("Row key:" + cookieId + " funny-weight:" + funnyWeight)
    }}

    sc.stop()

  }

  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }

}

二、使用java接口

def queryHbaseByRowkeyJava(sc: SparkContext, tableName: String, rowkeyValueList: List[String]): Array[Result] = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM)
    hbaseConf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT)
    hbaseConf.set("zookeeper.znode.parent", ZOOKEEPER_ZNODE_PARENT)
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)

    val connection = ConnectionFactory.createConnection(hbaseConf)
    val table = connection.getTable(TableName.valueOf(tableName))

    val getList = new util.ArrayList[Get]()
    for(rowkey <- rowkeyValueList){
      val get = new Get(Bytes.toBytes(rowkey))
      getList.add(get)
    }

    val result = table.get(getList)
    connection.close()
    result
  }

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有