HBase基本使用以及和hadoop的交互

标签:
hbasehadoop列族高并发 |
分类: 大数据处理 |
一、为什么需要HBase
HBase是NoSQL数据库,是Not only
SQL的缩写,泛指非关系型的数据库。其数据存储可以不需要固定的表模式,也通常会避免使用SQL的JOIN操作,一般又都具备水平可扩展的特性。
大数据之所以重要,是因为其具备解决显示问题的三个关键方面。
NoSQL在大数据中扮演的角色:
HBase作为一个典型的NoSQL数据库,可以通过行键(RowKey)检索数据,仅支持单行事务,主要用于存储非结构化的松散数据。HBase设计目标主要依靠横向扩展,通过不断增加廉价的商用服务器来增加计算和存储能力。
Zookeeper
Quorum(队列)负责管理HBase中多HMaster的选举、服务器之间状态同步等。主节点HMaster主要负责Table和Region的管理工作。Region节点HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据。
HBase的表由行(Row)和列(Column)共同构成,与关系型数据库不同的是HBase有一个列族(Column
Family)的概念,它将一列或者多列组织在一期,HBase的列必须属于某一个列族。HBase没有数据类型,任何列值都被转换成字节数组进行存储。HBase表中的行是通过行键(RowKey)进行区分的,行键也是用来唯一确定一行的标识。两类数据模型,表是HBase表达数据的逻辑组织方式,而基于列的存储则是数据在底层的组织方式。
HBase中的一个表有若干行,每行有很多列,列中的值有多个版本,每个版本的值称为一个单元格,每个单元存储的是不同时刻该列的值。列需要用“列族前缀+修饰符”的方式。HBase同一列可以存储不同时刻的值,同时多个列可以组成一个列族(Column
Family)。
HBase的列是按列族分组的,HFile是面向列的,存放行的不同列的物理文件,一个列族的数据存放在多个HFile中,最重要的是一个列族的数据会被同一个Region管理,物理上存放在一起。
从物理结构上看,表存储在不同的分区,即不同的Region。每个Region只在一个RegionServer中提供服务,而Region直接向客户端提供存储和读取服务。
一个列族的成员在文件系统上都是存储在一起的。因为存储优化都是针对列族级别的,这就意味着,一个列族的所有成员是通过相同的方式访问的。
HBase中的单元格由行键、列族、列、时间戳唯一确定的。
HBase中一个表的数据会被划分成很多的Region,Region可以动态扩展并且HBase保证Region的负载均衡。Region实际上是行键排序后的按规则分割的连续的存储空间。如果Region太大,会被动态拆分,相反多个Region会合并成一个较大的Region,以减少HDFS上存储文件的数量。这两个过程就是HBase的split和compaction。
按照现在主流硬件的配置,每个RegionServer可以管理大约100~1000个Region,每个Region的大小可以是1~20G。
HBase对数据模型的4个主要操作包括Get、Put、Scan和Delete。其中Scan是迭代获取部分或者全部数据(一般是多行),而Get是获取特定条件的数据。
HBase的Delete操作并不是真正地从磁盘删除数据,而是通过创建墓碑(tombstones)标志进行处理。这些墓碑标记的值和小于该时间版本的单元格在大合并时被清除。
public static final String NAME = "Example
Test1";
public static final String TEMP_INDEX_PATH =
"test/example";
//本地文件系统,并非HDFS中的
public static String inputTable =
"member";
public static void main(String[] args) throws
Exception{
Configuration conf = HBaseConfiguration.create();
Scan scan
= new Scan();
scan.setBatch(0);
scan.setCaching(10000);
scan.setMaxVersions();
scan.addColumn(Bytes.toBytes("info"),
Bytes.toBytes("birthday"));
conf.set("hbase.zookeeper.quorum", "hbasestudy");
conf.setBoolean("mapred.map.tasks.speculative.execution",
false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution",
false);
Path
tempIndexPath = new Path(TEMP_INDEX_PATH);
FileSystem
fs = FileSystem.get(conf);
if(fs.exists(tempIndexPath)){
fs.delete(tempIndexPath,
true);
}
// hadoop
job
Job job =
new Job(conf, NAME);
job.setJarByClass(HBaseHadoopTest.class);
TableMapReduceUtil.initTableMapperJob(inputTable, scan,
ExampleMapper.class, Text.class, Text.class, job);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job,
tempIndexPath);
int
success = job.waitForCompletion(true) ? 0 : 1;
System.exit(success);
}
private Text k = new Text();
private Text v = new Text();
public static final String
FIELD_COMMON_SEPARATOR = "\u0001";
@Override
protected void setup(Context context) throws
IOException, InterruptedException{
}
public void map(ImmutableBytesWritable row,
Result columns, Context context) throws
IOException{ // first parameter is key,same key
will combine
String
value = null;
String
rowkey = new String(row.get());
System.out.println("rowkey:" + rowkey);
byte[]
columnFamily = null;
byte[]
columnQualifier = null;
long ts =
0L;
try{
for(KeyValue kv :
columns.list()){
value =
Bytes.toStringBinary(kv.getValue());
System.out.println("value:" +
value);
columnFamily = kv.getFamily();
columnQualifier =
kv.getQualifier();
ts = kv.getTimestamp();
k.set(rowkey);
v.set(Bytes.toString(columnFamily) +
FIELD_COMMON_SEPARATOR
+
Bytes.toString(columnQualifier)
+ FIELD_COMMON_SEPARATOR +
value
+ FIELD_COMMON_SEPARATOR + ts);
context.write(k, v);
break;
}
}catch(Exception e){
e.printStackTrace();
System.err.println("Error: "
+ e.getMessage() + ", Row: " + Bytes.toString(row.get()) + ",value:
" + value);
}
}
public static class Map extends Mapper
{
private
IntWritable i = new IntWritable(1);
public
void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException{
String s[] =
value.toString().trim().split(" ");
for(String m :
s){
context.write(new Text(m), i);
}
}
}
public static class Reduce extends TableReducer
{
public
void reduce(Text key, Iterable values, Context context) throws
IOException, InterruptedException{
int sum = 0;
for(IntWritable i :
values){
sum += i.get();
}
Put put = new
Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));
context.write(NullWritable.get(), put);
}
}
public static void createHBaseTable(String
tableName) throws IOException{
HTableDescriptor htd = new
HTableDescriptor(tableName);
HColumnDescriptor col = new
HColumnDescriptor("content");
htd.addFamily(col);
HBaseConfiguration config = new
HBaseConfiguration();
HBaseAdmin
admin = new HBaseAdmin(config);
if(admin.tableExists(tableName)){
System.out.println("table
exists, trying recreate table!");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
System.out.println("create new table:" +
tableName);
admin.createTable(htd);
}
public static void main(String[] args) throws
Exception{
String
tableName = "wordcountH";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE,
tableName);
createHBaseTable(tableName);
Job job =
new Job(conf, "WordCountHBase");
job.setJarByClass(WordCountHBase.class);
job.setNumReduceTasks(3);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("hbasetest")); // add a
source for map input
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
1、分析各种不同来源的结构化、半结构化和非结构化数据的理想选择
2、当需要分析所有或大部分数据,或者对一个数据抽样分析效果不明显时,大数据解决方案是理想的选择
3、未预先确定数据的业务度量指标时,是进行迭代式和探索式分析的理想选择
1、高并发读写
传统关系型数据库每秒应付上万次SQL查询还勉强顶得住,但是应付上万次SQL写数据请求,硬盘I/O却无法承受。NoSQL数据库具有非常良好的读写性能。因为:一般MySQL使用query
cache,每当表发生更新操作时,Cache就会失效,这是一种大粒度的Cache,这种Cache性能并不高。而NoSQL的Cache是记录级的,是一种细粒度的Cache,所以NoSQL在这个层面上来说性能要高很多。
2、可扩展性
传统数据库系统升级和扩展是非常痛苦的,往往需要停机维护和数据迁移,而不能通过横向添加节点的方式实现无缝扩展。NoSQL无须事先为要存储的数据建立字段,随时可以存储自定义的数据格式。NoSQL允许使用者随时随地添加字段,并且字段类型可以是任意格式的。HBase的列式存储的特性支撑它实时随机读取、基于KEY的特殊访问需求。
二、HBase核心组件
1、文件存储:HBase严重依赖Hadoop的HDFS组件,HBase使用HDFS作为底层存储系统。
2、协调服务组件:
三、数据模型
(1)逻辑模型
(2)物理模型
举例说明:
我们创建一个表叫member:create
'member','member_id','address','info',并往里面插入数据。此时我们查看member的物理存储,发现有3个子目录分别为member_id,address,info。见下图所示:
(3)列族和单元格
(4)自动分区
四、数据模型的操作
五、HBase表结构设计
1、HBase中没有join的概念,所以不支持join操作。
2、RowKey设计:应该基于预期的访问模式来为RowKey建模。关系型数据库可以在多列上建立索引,但是HBase只能在Rowkey上建立索引。设计原则:
(1)RowKey是以字典顺序从大到小排序
(2)RowKey尽量散列RowKey设计
(3)RowKey的长度尽量短
3、布隆过滤器
4、数据压缩:数据只在硬盘上压缩,在内存中(Memstore或BlockCache)或者网络传输时是没有压缩的。
5、单元时间版本:在默认情况下,HBase的每个单元格只维护三个时间版本。
6、生存时间(TTL):用于设置单元格的生存周期。
六、实战一从HBase的表中读取数据写到Hadoop中
(1)HBaseHadoopTest.java代码
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.client.Scan;
import
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HBaseHadoopTest
{
}
(2)ExampleMapper.java文件
import
org.apache.hadoop.hbase.KeyValue;
import
org.apache.hadoop.hbase.client.Result;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.TableMapper;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.Writable;
import
java.io.IOException;
public class ExampleMapper
extends TableMapper {
}
上述代码中需要注意,写入的hadoop地址是在本地的,通过hadoop fs
xx是查询不到的。当时定位了好久,最后是通过debug hbase源码找到最终存储位置的。参考如下:
七、实战二从Hadoop读取文件写入HBase中
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.HColumnDescriptor;
import
org.apache.hadoop.hbase.HTableDescriptor;
import
org.apache.hadoop.hbase.client.HBaseAdmin;
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import
org.apache.hadoop.hbase.mapreduce.TableReducer;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
java.io.IOException;
public class WordCountHBase
{
}