HBase基本使用以及和hadoop的交互
 (2017-11-28 15:26:11)
	
			
					(2017-11-28 15:26:11)		| 标签: hbasehadoop列族高并发 | 分类: 大数据处理 | 
			一、为什么需要HBase   
      
						
		
		
		
		
		
		
							
		
				
		
				
	HBase是NoSQL数据库,是Not only
SQL的缩写,泛指非关系型的数据库。其数据存储可以不需要固定的表模式,也通常会避免使用SQL的JOIN操作,一般又都具备水平可扩展的特性。
  大数据之所以重要,是因为其具备解决显示问题的三个关键方面。 
  NoSQL在大数据中扮演的角色: 
 
HBase作为一个典型的NoSQL数据库,可以通过行键(RowKey)检索数据,仅支持单行事务,主要用于存储非结构化的松散数据。HBase设计目标主要依靠横向扩展,通过不断增加廉价的商用服务器来增加计算和存储能力。 
  Zookeeper
Quorum(队列)负责管理HBase中多HMaster的选举、服务器之间状态同步等。主节点HMaster主要负责Table和Region的管理工作。Region节点HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据。 
 
HBase的表由行(Row)和列(Column)共同构成,与关系型数据库不同的是HBase有一个列族(Column
Family)的概念,它将一列或者多列组织在一期,HBase的列必须属于某一个列族。HBase没有数据类型,任何列值都被转换成字节数组进行存储。HBase表中的行是通过行键(RowKey)进行区分的,行键也是用来唯一确定一行的标识。两类数据模型,表是HBase表达数据的逻辑组织方式,而基于列的存储则是数据在底层的组织方式。 
 
HBase中的一个表有若干行,每行有很多列,列中的值有多个版本,每个版本的值称为一个单元格,每个单元存储的是不同时刻该列的值。列需要用“列族前缀+修饰符”的方式。HBase同一列可以存储不同时刻的值,同时多个列可以组成一个列族(Column
Family)。 
 
HBase的列是按列族分组的,HFile是面向列的,存放行的不同列的物理文件,一个列族的数据存放在多个HFile中,最重要的是一个列族的数据会被同一个Region管理,物理上存放在一起。 
 
从物理结构上看,表存储在不同的分区,即不同的Region。每个Region只在一个RegionServer中提供服务,而Region直接向客户端提供存储和读取服务。 
 
一个列族的成员在文件系统上都是存储在一起的。因为存储优化都是针对列族级别的,这就意味着,一个列族的所有成员是通过相同的方式访问的。 
 
 HBase中的单元格由行键、列族、列、时间戳唯一确定的。  
 
HBase中一个表的数据会被划分成很多的Region,Region可以动态扩展并且HBase保证Region的负载均衡。Region实际上是行键排序后的按规则分割的连续的存储空间。如果Region太大,会被动态拆分,相反多个Region会合并成一个较大的Region,以减少HDFS上存储文件的数量。这两个过程就是HBase的split和compaction。 
 
按照现在主流硬件的配置,每个RegionServer可以管理大约100~1000个Region,每个Region的大小可以是1~20G。 
 
HBase对数据模型的4个主要操作包括Get、Put、Scan和Delete。其中Scan是迭代获取部分或者全部数据(一般是多行),而Get是获取特定条件的数据。 
 
HBase的Delete操作并不是真正地从磁盘删除数据,而是通过创建墓碑(tombstones)标志进行处理。这些墓碑标记的值和小于该时间版本的单元格在大合并时被清除。 
 
  public static final String NAME = "Example
Test1";  
 
  public static final String TEMP_INDEX_PATH =
"test/example"; 
//本地文件系统,并非HDFS中的   
 
  public static String inputTable =
"member";  
 
  public static void main(String[] args) throws
Exception{  
 
     
Configuration conf = HBaseConfiguration.create();    
 
      Scan scan
= new Scan();    
 
     
scan.setBatch(0);    
 
     
scan.setCaching(10000);    
 
     
scan.setMaxVersions();    
 
     
scan.addColumn(Bytes.toBytes("info"),
Bytes.toBytes("birthday"));    
 
     
conf.set("hbase.zookeeper.quorum", "hbasestudy");    
 
     
conf.setBoolean("mapred.map.tasks.speculative.execution",
false);    
 
     
conf.setBoolean("mapred.reduce.tasks.speculative.execution",
false);    
 
      Path
tempIndexPath = new Path(TEMP_INDEX_PATH);    
 
      FileSystem
fs = FileSystem.get(conf);    
 
     
if(fs.exists(tempIndexPath)){    
 
     
    fs.delete(tempIndexPath,
true);      
 
     
}    
 
      // hadoop
job    
 
      Job job =
new Job(conf, NAME);    
 
     
job.setJarByClass(HBaseHadoopTest.class);    
 
     
TableMapReduceUtil.initTableMapperJob(inputTable, scan,
ExampleMapper.class, Text.class, Text.class, job);    
 
     
job.setNumReduceTasks(0);    
 
     
job.setOutputFormatClass(TextOutputFormat.class);    
 
     
FileOutputFormat.setOutputPath(job,
tempIndexPath);    
 
      int
success = job.waitForCompletion(true) ? 0 : 1;    
 
     
System.exit(success);    
 
  }  
 
  private Text k = new Text();  
 
  private Text v = new Text();  
 
  public static final String
FIELD_COMMON_SEPARATOR = "\u0001";  
 
  @Override  
 
  protected void setup(Context context) throws
IOException, InterruptedException{  
 
  }  
 
  public void map(ImmutableBytesWritable row,
Result columns, Context context) throws
IOException{  // first parameter is key,same key
will combine   
 
      String
value = null;    
 
      String
rowkey = new String(row.get());    
 
     
System.out.println("rowkey:" + rowkey);    
 
      byte[]
columnFamily = null;    
 
      byte[]
columnQualifier = null;    
 
      long ts =
0L;    
 
     
try{    
 
     
    for(KeyValue kv :
columns.list()){      
 
     
     
  value =
Bytes.toStringBinary(kv.getValue());        
 
     
     
  System.out.println("value:" +
value);        
 
     
     
  columnFamily = kv.getFamily();        
 
     
     
  columnQualifier =
kv.getQualifier();        
 
     
     
  ts = kv.getTimestamp();        
 
     
     
  k.set(rowkey);        
 
     
     
  v.set(Bytes.toString(columnFamily) +
FIELD_COMMON_SEPARATOR        
 
     
     
  +
Bytes.toString(columnQualifier)        
 
     
     
  + FIELD_COMMON_SEPARATOR +
value        
 
     
     
  + FIELD_COMMON_SEPARATOR + ts);        
 
     
     
  context.write(k, v);        
 
     
     
  break;        
 
     
    }      
 
     
}catch(Exception e){    
 
     
   
e.printStackTrace();      
 
     
    System.err.println("Error: "
+ e.getMessage() + ", Row: " + Bytes.toString(row.get()) + ",value:
" + value);      
 
     
}    
 
  }  
 
  public static class Map extends Mapper
{  
 
      private
IntWritable i = new IntWritable(1);    
 
      public
void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException{    
 
     
    String s[] =
value.toString().trim().split(" ");      
 
     
    for(String m :
s){      
 
     
     
  context.write(new Text(m), i);        
 
     
    }      
 
     
}    
 
  }  
 
  public static class Reduce extends TableReducer
{  
 
      public
void reduce(Text key, Iterable values, Context context) throws
IOException, InterruptedException{    
 
     
    int sum = 0;      
 
     
    for(IntWritable i :
values){      
 
     
     
  sum += i.get();        
 
     
    }      
 
     
    Put put = new
Put(Bytes.toBytes(key.toString()));      
 
     
   
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));      
 
     
   
context.write(NullWritable.get(), put);      
 
     
}    
 
  }  
 
  public static void createHBaseTable(String
tableName) throws IOException{  
 
     
HTableDescriptor htd = new
HTableDescriptor(tableName);    
 
     
HColumnDescriptor col = new
HColumnDescriptor("content");    
 
     
htd.addFamily(col);    
 
     
HBaseConfiguration config = new
HBaseConfiguration();    
 
      HBaseAdmin
admin = new HBaseAdmin(config);    
 
     
if(admin.tableExists(tableName)){    
 
     
    System.out.println("table
exists, trying recreate table!");      
 
     
   
admin.disableTable(tableName);      
 
     
   
admin.deleteTable(tableName);      
 
     
}    
 
     
System.out.println("create new table:" +
tableName);    
 
     
admin.createTable(htd);    
 
  }  
 
  public static void main(String[] args) throws
Exception{  
 
      String
tableName = "wordcountH";    
 
     
Configuration conf = new Configuration();    
 
     
conf.set(TableOutputFormat.OUTPUT_TABLE,
tableName);    
 
     
createHBaseTable(tableName);    
 
      Job job =
new Job(conf, "WordCountHBase");    
 
     
job.setJarByClass(WordCountHBase.class);    
 
     
job.setNumReduceTasks(3);    
 
     
job.setMapperClass(Map.class);    
 
     
job.setReducerClass(Reduce.class);    
 
     
job.setMapOutputKeyClass(Text.class);    
 
     
job.setMapOutputValueClass(IntWritable.class);    
 
     
job.setInputFormatClass(TextInputFormat.class);    
 
     
job.setOutputFormatClass(TableOutputFormat.class);    
 
     
FileInputFormat.addInputPath(job, new Path("hbasetest")); // add a
source for map input    
 
     
System.exit(job.waitForCompletion(true) ? 0 : 1);    
 
  }  
							
		1、分析各种不同来源的结构化、半结构化和非结构化数据的理想选择
2、当需要分析所有或大部分数据,或者对一个数据抽样分析效果不明显时,大数据解决方案是理想的选择
3、未预先确定数据的业务度量指标时,是进行迭代式和探索式分析的理想选择
1、高并发读写
传统关系型数据库每秒应付上万次SQL查询还勉强顶得住,但是应付上万次SQL写数据请求,硬盘I/O却无法承受。NoSQL数据库具有非常良好的读写性能。因为:一般MySQL使用query
cache,每当表发生更新操作时,Cache就会失效,这是一种大粒度的Cache,这种Cache性能并不高。而NoSQL的Cache是记录级的,是一种细粒度的Cache,所以NoSQL在这个层面上来说性能要高很多。
2、可扩展性
传统数据库系统升级和扩展是非常痛苦的,往往需要停机维护和数据迁移,而不能通过横向添加节点的方式实现无缝扩展。NoSQL无须事先为要存储的数据建立字段,随时可以存储自定义的数据格式。NoSQL允许使用者随时随地添加字段,并且字段类型可以是任意格式的。HBase的列式存储的特性支撑它实时随机读取、基于KEY的特殊访问需求。
二、HBase核心组件
1、文件存储:HBase严重依赖Hadoop的HDFS组件,HBase使用HDFS作为底层存储系统。
2、协调服务组件:
三、数据模型
(1)逻辑模型
(2)物理模型
举例说明:
我们创建一个表叫member:create
'member','member_id','address','info',并往里面插入数据。此时我们查看member的物理存储,发现有3个子目录分别为member_id,address,info。见下图所示:
(3)列族和单元格
(4)自动分区
四、数据模型的操作
五、HBase表结构设计
1、HBase中没有join的概念,所以不支持join操作。
2、RowKey设计:应该基于预期的访问模式来为RowKey建模。关系型数据库可以在多列上建立索引,但是HBase只能在Rowkey上建立索引。设计原则:
(1)RowKey是以字典顺序从大到小排序
(2)RowKey尽量散列RowKey设计
(3)RowKey的长度尽量短
3、布隆过滤器
4、数据压缩:数据只在硬盘上压缩,在内存中(Memstore或BlockCache)或者网络传输时是没有压缩的。
5、单元时间版本:在默认情况下,HBase的每个单元格只维护三个时间版本。
6、生存时间(TTL):用于设置单元格的生存周期。
六、实战一从HBase的表中读取数据写到Hadoop中
(1)HBaseHadoopTest.java代码
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.client.Scan;
import
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HBaseHadoopTest
{
}
(2)ExampleMapper.java文件
import
org.apache.hadoop.hbase.KeyValue;
import
org.apache.hadoop.hbase.client.Result;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.TableMapper;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.Writable;
import
java.io.IOException;
public class ExampleMapper
extends TableMapper {
}
上述代码中需要注意,写入的hadoop地址是在本地的,通过hadoop fs
xx是查询不到的。当时定位了好久,最后是通过debug hbase源码找到最终存储位置的。参考如下:
七、实战二从Hadoop读取文件写入HBase中
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.HColumnDescriptor;
import
org.apache.hadoop.hbase.HTableDescriptor;
import
org.apache.hadoop.hbase.client.HBaseAdmin;
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import
org.apache.hadoop.hbase.mapreduce.TableReducer;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
java.io.IOException;
public class WordCountHBase
{
}

 加载中…
加载中…