HiveMetaStore模块代码分析

标签:
it |
分类: HIVE |
http://s10/mw690/002jZ1Unzy6HYetJEw189&690
从package结构来看,主要的5个package,让我们来看看这几个package的内容
(1)metastore:是metastore模块的入口,也是整个metastore模块的核心所在,里面包含了HiveMetaStore类作为整个模块的核心,接收来自hive的请求,返回需要的信息。
(2)metastore.api:包含了调用和访问metastore模块的接口以及接口参数和返回值类型,metastore模块的用户可以通过api对metastore模块进行访问。
(3)metastore.events:用于metastore模块内部的观察者模式。因为metastore模块是支持notification通知机制和一些其他的后续处理的。通过观察者模式,当metastore对元信息进行一些操作以后,会同时产生一些event,这些event会被它们的listener捕获,并作出一些相应的处理,如发出一些通知等。
(4)metastore.model:与数据持久化相关,metastore模块通过datanucleus库将model持久化到数据库,这里的model与数据库中的表是对应的。
(5)metastore.tools:是供后台的元数据管理员对元信息进行查看和修改的工具。
原理结构:
http://s13/mw690/002jZ1Unzy6HYf4iz5W4c&690
http://s12/mw690/002jZ1Unzy6Htx6rsBtab&690
Hive
HiveMetaStoreClient
HiveMetaHook
HiveMetaStore
RawStore
ObjectStore
HiveAlterHandler
Warehouse
MetaStorePreEventListene
MetaStore模块与其他模块间的耦合
其中PersistenceManager负责控制一组持久化对象包括创建持久化对象和查询对象,它是ObjectStore的一个实例变量,每个ObjectStore拥有一个pm,RawStore是metastore逻辑层和物理底层元数据库(比如derby)交互的接口类,ObjectStore是RawStore的默认实现类。Hive Metastore Server启动的时候会指定一个TProcessor,包装了一个HMSHandler,内部有一个ThreadLocal threadLocalMS实例变量,每个thread维护一个RawStore
private final ThreadLocal threadLocalMS =
new ThreadLocal() {
@Override
protected synchronized RawStore initialValue() {
return null;
}
};
每一个从hive metastore
client过来的请求都会从线程池中分配一个WorkerProcess来处理,在HMSHandler中每一个方法都会通过getMS()获取rawstore
instance来做具体操作
public RawStore getMS() throws MetaException {
RawStore ms = threadLocalMS.get();
if (ms == null) {
ms = newRawStore();
ms.verifySchema();
threadLocalMS.set(ms);
ms = threadLocalMS.get();
}
return ms;
}
看得出来RawStore是延迟加载,初始化后绑定到threadlocal变量中可以为以后复用
private RawStore newRawStore() throws MetaException {LOG.info(addPrefix("Opening raw store with implemenation class:"
+ rawStoreClassName));
Configuration conf = getConf();
return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get());
}
RawStore使用了动态代理模式(继承InvocationHandler接口),内部实现了invoke函数,通过method.invoke()执行真正的逻辑,这样的好处是可以在method.invoke()上下文中添加自己其他的逻辑,RetryingRawStore就是在通过捕捉invoke函数抛出的异常,来达到重试的效果。由于使用reflection机制,异常是wrap在InvocationTargetExceptio
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; boolean gotNewConnectUrl = false; boolean reloadConf = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.METASTOREFORCERELOADCONF); boolean reloadConfOnJdoException = false; if (reloadConf) { updateConnectionURL(getConf(), null); } int retryCount = 0; Exception caughtException = null; while (true) { try { if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException ) { initMS(); } ret = method.invoke(base, args); break; } catch (javax.jdo.JDOException e) { caughtException = (javax.jdo.JDOException) e.getCause(); } catch (UndeclaredThrowableExcep tion e) { throw e.getCause(); } catch (InvocationTargetExceptio n e) { Throwable t = e.getTargetException(); if (t instanceof JDOException){ caughtException = (JDOException) e.getTargetException(); reloadConfOnJdoException = true; LOG.error("rawstore jdoexception:" + caughtException.toString()); }else { throw e.getCause(); } } if (retryCount >= retryLimit) { throw caughtException; } assert (retryInterval >= 0); retryCount++; LOG.error( String.format( "JDO datastore error. Retrying metastore command " + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); Thread.sleep(retryInterval); // If we have a connection error, the JDO connection URL hook might // provide us with a new URL to access the datastore. String lastUrl = getConnectionURL(getConf()); gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); } return ret; }
初始化RawStore有两种方式,一种是在RetryingRawStore的构造函数中调用"this.base
= (RawStore) ReflectionUtils.newInstance(rawStoreClass,
conf);"
因为ObjectStore实现了Configurable,在newInstance方法中主动调用里面的setConf(conf)方法初始化RawStore,还有一种情况是在捕捉到异常后retry,也会调用base.setConf(getConf());
private void initMS() { base.setConf(getConf()); }
ObjectStore的setConf方法中,先将PersistenceManagerFactor
public void setConf(Configuration conf) { // Although an instance of ObjectStore is accessed by one thread, there may // be many threads with ObjectStore instances. So the static variables // pmf and prop need to be protected with locks. pmfPropLock.lock(); try { isInitialized = false; hiveConf = conf; Properties propsFromConf = getDataSourceProps(conf); boolean propsChanged = !propsFromConf.equals(prop); if (propsChanged) { pmf = null; prop = null; } assert(!isActiveTransaction()); shutdown(); // Always want to re-create pm as we don't know if it were created by the // most recent instance of the pmf pm = null; openTrasactionCalls = 0; currentTransaction = null; transactionStatus = TXN_STATUS.NO_STATE; initialize(propsFromConf); if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); } else { LOG.info("Initialized ObjectStore"); } } finally { pmfPropLock.unlock(); } }
private void initialize(Properties dsProps) { LOG.info("ObjectStore, initialize called"); prop = dsProps; pm = getPersistenceManager(); isInitialized = pm != null; return; }
回到一开始报错的那段信息,怎么会Persistence
Manager会被关闭呢,仔细排查后才发现是由于HCatalog使用HiveMetastoreClient用完后主动调用了close方法,而一般Hive里面内部不会调这个方法.
HiveMetaStoreClient.java
public void close() { isConnected = false; try { if (null != client) { client.shutdown(); } } catch (TException e) { LOG.error("Unable to shutdown local metastore client", e); } // Transport would have got closed via client.shutdown(), so we dont need this, but // just in case, we make this call. if ((transport != null) && transport.isOpen()) { transport.close(); } }
对应server端HMSHandler中的shutdown方法
@Override public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; } logInfo("Metastore shutdown complete."); }
ObjectStore的shutdown方法
public void shutdown() { if (pm != null) { pm.close(); } }
我们看到shutdown方法里面只是把当前thread的ObjectStore拿出来后,做了一个ObjectStore shutdown方法,把pm关闭了。但是并没有把ObjectStore销毁掉,它还是存在于threadLocalMS中,下次还是会被拿出来,下一次这个thread服务于另外一个请求的时候又会被get出ObjectStore来,但是由于里面的pm已经close掉了所以肯定抛异常。正确的做法是应该加上threadLocalMS.remove()或者threadLocalMS.set(null),主动将其从ThreadLocalMap中删除。
修改后的shutdown方法
public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; threadLocalMS.remove(); } logInfo("Metastore shutdown complete."); }
一、简述原理
metastore服务端配置metastore相关参数,并启动metastore服务进程.hive客户端配置连接服务端metastore参数。客户端读取hive元数据,请求(HiveMetaStoreClient)发送到metastore服务端(HiveMetaStore),服务端查询(JDO-mysql)并返回元数据信息给客户端。好处是:hive客户端配置文件中无mysql地址及口令信息,提高元数据安全性。
如果第一个MetaSoreServer请求失败,收到回执信息,才会请求下一个MetaSoreServer服务,为非广播形式发送读取请求。
已解决,查看修改后客户端源码。
二、测试部署
Hive:0.8.1Hadoop: hadoop-0.20.2-cdh3u3-x
a)Mysql元数据IP为:172.XX.XX.137 Hive metastore service: 172.XX.XX.137,172.XX.XX.136 Hive client: 172.XX.XX.134
b)在172.XX.XX.137(部署方式和部署Hive客户端一致)
监听端口(默认是 9083)
通过环境变量METASTORE_PORT指定或者通过-p指定
c)启动metastoreServer :nohup hive –service metastore & (注意service前是2个-)
d)修改Hive客户端134配置文件,指向metastore Server提供服务的地址
相关配置参数
hive.metastore.warehouse.dir
hdfs://BJ-YW-test-HA-6126.jd.com:54310/user/user/warehouse
hive.metastore.local
false
hive.metastore.uris
thrift://172.XX.XX.XX:9083, thrift://172.17.6.136:9083
测试内容
客户端134正常连接hive,读取表元数据信息
断开137 metastore服务,客户端134正常读取元数据信息
断开 136 metastore服务,客户端134正常读取元数据信息
Hive:0.12.0Hadoop: hadoop-2.0.0-cdh4.3.0
Mysql元数据IP为:172.XX.XX.116Hive metastore service: 172.XX.XX.116 ,172.XX.XX.84 Hive client: 172.XX.XX.116
e)在172.XX.XX.86(部署方式和部署Hive客户端一致)
监听端口(默认是 9083)
通过环境变量METASTORE_PORT指定或者通过-p指定
f)启动metastoreServer :nohup hive –service metastore & (注意service前是2个-)
g)修改Hive客户端134配置文件,指向metastore Server提供服务的地址
相关配置参数
hive.metastore.warehouse.dir
hdfs://ns1/user/hadp/yw
hive.metastore.client.socket.timeout
3600
hive.metastore.uris
thrift://172.XX.XX.116:9083, thrift://172.XX.XX.84:9083
测试内容
客户端86正常连接hive,读取表元数据信息
断开116 metastore服务,客户端86正常读取元数据信息
断开 84 metastore服务,客户端86正常读取元数据信息
断开所有metastore服务,客户端86无法读取元数据信息
恢复其中一台metastore服务,客户端正常读取元数据信息
客户端调用metastore服务新建表、删表成功
metastore相关参数(红色字段为本次使用属性):
服务端
hive.metastore.event.listeners:metastore的事件监听器列表,逗号隔开,默认是空;
hive.metastore.authorization.storage.checks:在做类似drop partition操作时,metastore是否要认证权限,默认是false;
hive.metastore.event.expiry.duration:事件表中事件的过期时间,默认是0;
hive.metastore.event.clean.freq:metastore中清理过期事件的定时器的运行周期,默认是0;
javax.jdo.option.Multithreaded:是否支持并发访问metastore,默认是true;
hive.metastore.server.min.threads:在thrift服务池中最小的工作线程数,默认是200;
hive.metastore.server.max.threads:最大线程数,默认是100000;
hive.metastore.server.tcp.keepalive:metastore的server是否开启长连接,长连可以预防半连接的积累,默认是true;
hive.metastore.sasl.enabled:metastore thrift接口的安全策略,开启则用SASL加密接口,客户端必须要用Kerberos机制鉴权,默认是不开启false;
hive.metastore.kerberos.keytab.file:在开启sasl后kerberos的keytab文件存放路径,默认是空;
hive.metastore.ds.connection.url.hook:查找JDO连接url时hook的名字,默认是javax.jdo.option.ConnectionURL;
hive.metastore.kerberos.principal:kerberos的principal,_HOST部分会动态替换,默认是hive-metastore/_HOST@EXAMPLE.COM;
hive.metastore.batch.retrieve.max:在一个批处理获取中,能从metastore里取出的最大记录数,默认是300
hive.metastore.cache.pinobjtypes:在cache中支持的metastore的对象类型,由逗号分隔,默认是Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order;
hive.metastore.schema.verification:强制metastore的schema一致性,开启的话会校验在metastore中存储的信息的版本和hive的jar包中的版本一致性,并且关闭自动schema迁移,用户必须手动的升级hive并且迁移schema,关闭的话只会在版本不一致时给出警告,默认是false不开启;
hive.metastore.archive.intermediate.original :用于归档压缩的原始中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。
hive.metastore.archive.intermediate.archived :用于归档压缩的压缩后的中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。
hive.metastore.archive.intermediate.extracted :用于归档压缩的解压后的中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。
hive.metastore.partition.inherit.table.properties:当新建分区时自动继承的key列表,默认是空;
hive.metastore.end.function.listeners:metastore函数执行结束时的监听器列表,默认是空;
hive.metastore.execute.setugi:非安全模式,设置为true会令metastore以客户端的用户和组权限执行DFS操作,默认是false,这个属性需要服务端和客户端同时设置;
hive.metastore.rawstore.impl:原始metastore的存储实现类,默认是org.apache.hadoop.hive.metastore.ObjectStore;
客户端
hive.metastore.uris: 客户端连接远程metastore服务地址端口
hive.metastore.local:控制hive是否连接一个远程metastore服务器还是开启一个本地客户端jvm,默认是true,Hive0.10已经取消了该配置项;
hive.metastore.warehouse.dir:指定Hive的存储目录
hive.metastore.client.socket.timeout:客户端socket超时时间,默认20秒;
hive.metastore.ds.retry.attempts:当出现连接错误时重试连接的次数,默认是1次;
hive.metastore.ds.retry.interval:metastore重试连接的间隔时间,默认1000毫秒
datanucleus.connectionPoolingType:使用连接池来访问JDBC metastore,默认是DBCP
hive.metastore.execute.setugi:非安全模式,设置为true会令metastore以客户端的用户和组权限执行DFS操作,默认是false,这个属性需要服务端和客户端同时设置;
hive.metastore.client.connect.retry.delay:客户端在连续的重试连接等待的时间,默认1;
hive.metastore.connect.retries:创建metastore连接时的重试次数,默认是5;
可能风险:
服务端有可能出现宕机,原因当获取hive表相关partition太多,返回partition信息太大,会导致JVM heap space ,这里的partition为5W。实际生产中按天做partition,partition不会太多。
压力测试:详见(org.apache.hadoop.hive.metastore.test.MetaStoreTest)
前置:
1个Metastore service(116)
测试功能:多线程访问service,每个线程里都新建client,调用getDatabase(mydb)
返回Database对象,判断对象getName().equals("mydb"),如果是,调用同步方法,
让外界变量+1,最后得出结果:所有线程执行总时间+外界变量数值(成功返回个数)
结果:
单台机器116(本地)
service5000线程 time:176368毫秒 OK:5000
单台机器 84(远程)
13:57:09-13:57:52 time:280382 OK:5000
双台机器86\84访问 service 10000*2 共4万getDatabase请求
Client 84:
12:51:07-12:52:09 time:277613 OK:10000
12:47:53-12:49:32 time:114669 OK:10000
Client 86:
12:50:36-12:51:36 time:251133 OK:10000
12:47:57-12:49:06 time:83487OK:10000
经日志验证 116 服务端 84:20000 86:20000 全部正常
双台机器86\136\137访问 service 10000*2 共6万getDatabase请求
Client 86:
15:08:43-15:09:59 time: 90461OK:10000
15:09:04-15:10:12 time: 88821OK:10000
Client 136:
15:09:01-15:10:27 time: 117757OK:10000
15:09:15-15:10:46 time: 120695OK:10000
Client 137:
15:14:26-15:15:46 time: 435969OK:10000
15:16:33-15:17:48 time: 544234OK:10000
经日志验证 116 服务端 86:20000 136:20000 137:20000 全部正常
测试功能:多线程访问service,每个线程里都新建client,调用listPartitions (“gdm”,” gdm_online_log”),每个线程listPartitions会生成10 条mysql select 语句
返回List对象,判断对象个数(60)>0,如果是,调用同步方法,
让外界变量+1,最后得出结果:所有线程执行总时间+外界变量数值(成功返回个数)
一台机器136访问 service 1500 listPartitions请求
Client 136:
14:09:24-14:15:44 time :379958OK:1500
三台机器86\136\137访问 service 1000*2 共6000 listPartitions请求
Client 86:当时该机器正在执行MapReduce,所以速度慢
15:30:38-15:50:58 time: 1221170OK:1000
15:18:20-15:51:55 time: 1399362OK:1000
Client 136:
15:39:44-15:47:48 time: 483550OK:1000
15:35:30-15:48:02 time: 741888OK:1000
Client 137:
15:39:36-15:47:48 time: 494850OK:1000
15-32:30-15:48:04 time: 934510OK:1000