加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

HBase基本使用以及和hadoop的交互

(2017-11-28 15:26:11)
标签:

hbase

hadoop

列族

高并发

分类: 大数据处理
一、为什么需要HBase       
HBase是NoSQL数据库,是Not only SQL的缩写,泛指非关系型的数据库。其数据存储可以不需要固定的表模式,也通常会避免使用SQL的JOIN操作,一般又都具备水平可扩展的特性。
  大数据之所以重要,是因为其具备解决显示问题的三个关键方面。
1、分析各种不同来源的结构化、半结构化和非结构化数据的理想选择
2、当需要分析所有或大部分数据,或者对一个数据抽样分析效果不明显时,大数据解决方案是理想的选择
3、未预先确定数据的业务度量指标时,是进行迭代式和探索式分析的理想选择
  NoSQL在大数据中扮演的角色:
1、高并发读写
传统关系型数据库每秒应付上万次SQL查询还勉强顶得住,但是应付上万次SQL写数据请求,硬盘I/O却无法承受。NoSQL数据库具有非常良好的读写性能。因为:一般MySQL使用query cache,每当表发生更新操作时,Cache就会失效,这是一种大粒度的Cache,这种Cache性能并不高。而NoSQL的Cache是记录级的,是一种细粒度的Cache,所以NoSQL在这个层面上来说性能要高很多。
2、可扩展性
传统数据库系统升级和扩展是非常痛苦的,往往需要停机维护和数据迁移,而不能通过横向添加节点的方式实现无缝扩展。NoSQL无须事先为要存储的数据建立字段,随时可以存储自定义的数据格式。NoSQL允许使用者随时随地添加字段,并且字段类型可以是任意格式的。HBase的列式存储的特性支撑它实时随机读取、基于KEY的特殊访问需求。
  HBase作为一个典型的NoSQL数据库,可以通过行键(RowKey)检索数据,仅支持单行事务,主要用于存储非结构化的松散数据。HBase设计目标主要依靠横向扩展,通过不断增加廉价的商用服务器来增加计算和存储能力。

二、HBase核心组件
1、文件存储:HBase严重依赖Hadoop的HDFS组件,HBase使用HDFS作为底层存储系统。
2、协调服务组件:
  Zookeeper Quorum(队列)负责管理HBase中多HMaster的选举、服务器之间状态同步等。主节点HMaster主要负责Table和Region的管理工作。Region节点HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据。

三、数据模型
  HBase的表由行(Row)和列(Column)共同构成,与关系型数据库不同的是HBase有一个列族(Column Family)的概念,它将一列或者多列组织在一期,HBase的列必须属于某一个列族。HBase没有数据类型,任何列值都被转换成字节数组进行存储。HBase表中的行是通过行键(RowKey)进行区分的,行键也是用来唯一确定一行的标识。两类数据模型,表是HBase表达数据的逻辑组织方式,而基于列的存储则是数据在底层的组织方式。
(1)逻辑模型
  HBase中的一个表有若干行,每行有很多列,列中的值有多个版本,每个版本的值称为一个单元格,每个单元存储的是不同时刻该列的值。列需要用“列族前缀+修饰符”的方式。HBase同一列可以存储不同时刻的值,同时多个列可以组成一个列族(Column Family)。
(2)物理模型
  HBase的列是按列族分组的,HFile是面向列的,存放行的不同列的物理文件,一个列族的数据存放在多个HFile中,最重要的是一个列族的数据会被同一个Region管理,物理上存放在一起。
举例说明:
我们创建一个表叫member:create 'member','member_id','address','info',并往里面插入数据。此时我们查看member的物理存储,发现有3个子目录分别为member_id,address,info。见下图所示:
HBase基本使用以及和hadoop的交互
 
  从物理结构上看,表存储在不同的分区,即不同的Region。每个Region只在一个RegionServer中提供服务,而Region直接向客户端提供存储和读取服务。
(3)列族和单元格
  一个列族的成员在文件系统上都是存储在一起的。因为存储优化都是针对列族级别的,这就意味着,一个列族的所有成员是通过相同的方式访问的。
   HBase中的单元格由行键、列族、列、时间戳唯一确定的。
(4)自动分区
  HBase中一个表的数据会被划分成很多的Region,Region可以动态扩展并且HBase保证Region的负载均衡。Region实际上是行键排序后的按规则分割的连续的存储空间。如果Region太大,会被动态拆分,相反多个Region会合并成一个较大的Region,以减少HDFS上存储文件的数量。这两个过程就是HBase的split和compaction。
  按照现在主流硬件的配置,每个RegionServer可以管理大约100~1000个Region,每个Region的大小可以是1~20G。

四、数据模型的操作
  HBase对数据模型的4个主要操作包括Get、Put、Scan和Delete。其中Scan是迭代获取部分或者全部数据(一般是多行),而Get是获取特定条件的数据。
  HBase的Delete操作并不是真正地从磁盘删除数据,而是通过创建墓碑(tombstones)标志进行处理。这些墓碑标记的值和小于该时间版本的单元格在大合并时被清除。

五、HBase表结构设计
1、HBase中没有join的概念,所以不支持join操作。
2、RowKey设计:应该基于预期的访问模式来为RowKey建模。关系型数据库可以在多列上建立索引,但是HBase只能在Rowkey上建立索引。设计原则:
(1)RowKey是以字典顺序从大到小排序
(2)RowKey尽量散列RowKey设计
(3)RowKey的长度尽量短
3、布隆过滤器
4、数据压缩:数据只在硬盘上压缩,在内存中(Memstore或BlockCache)或者网络传输时是没有压缩的。
5、单元时间版本:在默认情况下,HBase的每个单元格只维护三个时间版本。
6、生存时间(TTL):用于设置单元格的生存周期。

六、实战一从HBase的表中读取数据写到Hadoop中
(1)HBaseHadoopTest.java代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HBaseHadoopTest {

    public static final String NAME = "Example Test1";
    public static final String TEMP_INDEX_PATH = "test/example";  //本地文件系统,并非HDFS中的
    public static String inputTable = "member";

    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();

        Scan scan = new Scan();
        scan.setBatch(0);
        scan.setCaching(10000);
        scan.setMaxVersions();
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"));

        conf.set("hbase.zookeeper.quorum", "hbasestudy");

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        Path tempIndexPath = new Path(TEMP_INDEX_PATH);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(tempIndexPath)){
            fs.delete(tempIndexPath, true);
        }

        // hadoop job
        Job job = new Job(conf, NAME);
        job.setJarByClass(HBaseHadoopTest.class);

        TableMapReduceUtil.initTableMapperJob(inputTable, scan, ExampleMapper.class, Text.class, Text.class, job);
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, tempIndexPath);

        int success = job.waitForCompletion(true) ? 0 : 1;
        System.exit(success);
    }
}
(2)ExampleMapper.java文件
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.IOException;

public class ExampleMapper extends TableMapper {

    private Text k = new Text();
    private Text v = new Text();
    public static final String FIELD_COMMON_SEPARATOR = "\u0001";

    @Override
    protected void setup(Context context) throws IOException, InterruptedException{

    }

    public void map(ImmutableBytesWritable row, Result columns, Context context) throws IOException{  // first parameter is key,same key will combine
        String value = null;
        String rowkey = new String(row.get());
        System.out.println("rowkey:" + rowkey);

        byte[] columnFamily = null;
        byte[] columnQualifier = null;
        long ts = 0L;

        try{
            for(KeyValue kv : columns.list()){
                value = Bytes.toStringBinary(kv.getValue());
                System.out.println("value:" + value);

                columnFamily = kv.getFamily();
                columnQualifier = kv.getQualifier();
                ts = kv.getTimestamp();

                k.set(rowkey);
                v.set(Bytes.toString(columnFamily) + FIELD_COMMON_SEPARATOR
                + Bytes.toString(columnQualifier)
                + FIELD_COMMON_SEPARATOR + value
                + FIELD_COMMON_SEPARATOR + ts);

                context.write(k, v);
                break;
            }
        }catch(Exception e){
            e.printStackTrace();
            System.err.println("Error: " + e.getMessage() + ", Row: " + Bytes.toString(row.get()) + ",value: " + value);
        }
    }
}
上述代码中需要注意,写入的hadoop地址是在本地的,通过hadoop fs xx是查询不到的。当时定位了好久,最后是通过debug hbase源码找到最终存储位置的。参考如下:
HBase基本使用以及和hadoop的交互

七、实战二从Hadoop读取文件写入HBase中
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import java.io.IOException;

public class WordCountHBase {

    public static class Map extends Mapper {
        private IntWritable i = new IntWritable(1);
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String s[] = value.toString().trim().split(" ");
            for(String m : s){
                context.write(new Text(m), i);
            }
        }
    }

    public static class Reduce extends TableReducer {
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable i : values){
                sum += i.get();
            }

            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
            context.write(NullWritable.get(), put);

        }
    }

    public static void createHBaseTable(String tableName) throws IOException{
        HTableDescriptor htd = new HTableDescriptor(tableName);
        HColumnDescriptor col = new HColumnDescriptor("content");
        htd.addFamily(col);

        HBaseConfiguration config = new HBaseConfiguration();
        HBaseAdmin admin = new HBaseAdmin(config);
        if(admin.tableExists(tableName)){
            System.out.println("table exists, trying recreate table!");
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        System.out.println("create new table:" + tableName);
        admin.createTable(htd);
    }

    public static void main(String[] args) throws Exception{
        String tableName = "wordcountH";
        Configuration conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        createHBaseTable(tableName);

        Job job = new Job(conf, "WordCountHBase");
        job.setJarByClass(WordCountHBase.class);
        job.setNumReduceTasks(3);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path("hbasetest")); // add a source for map input
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}


0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有