加载中…
个人资料
小亮
小亮
  • 博客等级:
  • 博客积分:0
  • 博客访问:2,125
  • 关注人气:2
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

HiveMetaStore模块代码分析

(2014-03-21 09:43:58)
标签:

it

分类: HIVE

http://s10/mw690/002jZ1Unzy6HYetJEw189&690

从package结构来看,主要的5个package,让我们来看看这几个package的内容

(1)metastore:是metastore模块的入口,也是整个metastore模块的核心所在,里面包含了HiveMetaStore类作为整个模块的核心,接收来自hive的请求,返回需要的信息。

(2)metastore.api:包含了调用和访问metastore模块的接口以及接口参数和返回值类型,metastore模块的用户可以通过api对metastore模块进行访问。

(3)metastore.events:用于metastore模块内部的观察者模式。因为metastore模块是支持notification通知机制和一些其他的后续处理的。通过观察者模式,当metastore对元信息进行一些操作以后,会同时产生一些event,这些event会被它们的listener捕获,并作出一些相应的处理,如发出一些通知等。

(4)metastore.model:与数据持久化相关,metastore模块通过datanucleus库将model持久化到数据库,这里的model与数据库中的表是对应的。

(5)metastore.tools:是供后台的元数据管理员对元信息进行查看和修改的工具。

原理结构:

http://s13/mw690/002jZ1Unzy6HYf4iz5W4c&690

http://s12/mw690/002jZ1Unzy6Htx6rsBtab&690

Hive

     hive内所有的对于元数据的操作都通过生成一个DDL Task,在task中完成。Hive 是对Hive元数据的最顶层的抽象。DDL Task通过操作Hive类,完成各种DDL操作,例如createTasb,dropTable等。Hive中提供的接口几乎与DDL的操作一一对应。

HiveMetaStoreClient

     是Hive连接MetaStore的客户端。HiveMetaStoreClient首先会检查hive.metastore.local是否为true,就是说metastore是否是在本地。如果在本地,则生成一个服务端,并通过函数调用的方式连接。如果为false,metastore不再本地,则生成一个Thrift的客户端,并连接Thrift的服务端(服务端必须首先在远程被启动,通过hive --server metastore命令)。HiveMetaStoreClient继承自IMetaStoreClient接口,HiveMetaStoreClient可以通过本地和远程两种方式访问和调用HiveMetaStore的Server。

     如果是远程连接,用户可以指定多个uri,用户连接thrift server。HiveMetaStoreClient会依次尝试连接,直到找到一个可用连接。

     无论是本地还是远程,接口都是统一的。

HiveMetaHook

     MetaStoreHook提供了从HiveMetaStoreClient向StorageHandler通知的机制,每当HiveMetaStoreClient对MetaStore进行操作时,会首先调用Hook中的接口,通知StorageHandler采取相应的准备操作。这样当数据存储方式不是文件时,可以保持数据结构和元数据信息是一致的。

HiveMetaStore

     HiveMetaStore是HiveMetaStoreClient的服务端,可能与HiveMetaStoreClient在同一台服务器上,也可以不再统一服务器上。但是HiveMetaStore一定与实际存储Meta信息的数据库(或其他存储介质,这里以数据库为例)在一台服务器上。

     核心部分是HiveMetaStore的内部类HMSHandler,它继承自IHMSHandler接口,IHMSHandler又继承自ThrifHiveMetastore.Iface接口,提供通过Thrift方式进行远程的调用。在HiveMetaStore.HMSHandler内实现了接口的所有metastore模块对外的方法。

     HiveMetaStore将Hive的业务逻辑转换为实际的数据操作。并调用RawStore的接口进行存储和读写操作。

     因为HiveMetaStore与数据在同一端,所以可以进行错误检测以及重试操作。例如要创建的表是否已经存在,等等。HiveMetaStore也会利用RawStore提供的transaction接口,在操作前都要open一个transaction。

RawStore

     RawStore定义了一套接口,定义了对元数据操作的一套机制。每一个接口可以看作是一个原子操作。RawStore提供了transaction机制。确保数据的一致性的同时,保证了性能。

     RawStore可以有多种实现,Hive目前实现的是ObjectStore类,即对象存储。通过JDO技术,把每一个要存储数据通过Model定义成一个对象。JDO负责实际存储一个对象。JDO底层可以连接多种存储方式,存储方式可以通过配置参数的javax.jdo.option直接传递给JDO。目前常用的是derby和mysql。

     JDO会将一个对象当作表进行存储。ObjectStore中的transaction机制也是通过JDO提供的transaction实现的。当commit失败时,将rollback所有操作。

ObjectStore

     继承自RowStore接口,是用于对数据进行持久化的部分,Object可以从数据库中获取数据并映射到model的对象中,或者将model中的对象存入数据库。

HiveAlterHandler

     继承自AlterHandler接口,从HMSHandler分享出来专门进行Alter一类操作。

Warehouse

     主要作用是对HDFS上的文件进行操作。因为在修改元信息的同时可能会涉及到HDFS的一些文件操作,如mkdir,delteDir等操作。

MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener

     通过观察者模式对产生的event进行相应的处理的观察者。这三个类都是抽象类,由其他一些具体的类来继承和实现。

MetaStore模块与其他模块间的耦合

     MetaStore是一个独立的模块,但由于Hive的逻辑,在进行DDL操作时,不仅会对元数据操作,同时也需要存储数据的系统进行配合。

     问题在于,Hive目前默认数据都是存储在文件系统中(虽然没有定义具体的文件格式),也利用了文件系统的中的一些特性。例如,元数据中的DataBase,Table,Partition都是通过目录实现的。任何一个对DataBase,Table,Partition的操作,除了需要对元数据进行修改意外,可能还需要对文件系统进行操作。例如创建一个partition,同时需要在文件系统中创建一个目录。这是MetaStore与其他模块耦合的地方。

     目前,HiveMetaHook实现了对部分耦合的剥离,允许通过hook,减少MetaStore与数据存储方式的依赖。但是目前并没有实现完全剥离。在Hive.java,HiveMetaStore.java中依然会调用FileSystem和WareHouse的接口(WareHouse是数据存储的抽象,假设数据全部存储在文件中,WareHouse也会调用FileSystem接口)。

     如果要支持其他数据存储方式,需要将这些耦合全部剥离,或者提供一个临时文件目录,模拟数据存储的文件系统。

 

 

 

其中PersistenceManager负责控制一组持久化对象包括创建持久化对象和查询对象,它是ObjectStore的一个实例变量,每个ObjectStore拥有一个pm,RawStore是metastore逻辑层和物理底层元数据库(比如derby)交互的接口类,ObjectStore是RawStore的默认实现类。Hive Metastore Server启动的时候会指定一个TProcessor,包装了一个HMSHandler,内部有一个ThreadLocal threadLocalMS实例变量,每个thread维护一个RawStore

 

    private final ThreadLocal threadLocalMS =
        new ThreadLocal() {
          @Override
          protected synchronized RawStore initialValue() {
            return null;
          }
        };


每一个从hive metastore client过来的请求都会从线程池中分配一个WorkerProcess来处理,在HMSHandler中每一个方法都会通过getMS()获取rawstore instance来做具体操作

 

    public RawStore getMS() throws MetaException {
      RawStore ms = threadLocalMS.get();
      if (ms == null) {
        ms = newRawStore();
        ms.verifySchema();
        threadLocalMS.set(ms);
        ms = threadLocalMS.get();
      }
      return ms;
     }

看得出来RawStore是延迟加载,初始化后绑定到threadlocal变量中可以为以后复用


 

    private RawStore newRawStore() throws MetaException {
       LOG.info(addPrefix("Opening raw store with implemenation class:"
          + rawStoreClassName));
      Configuration conf = getConf();
      return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get());
    }


RawStore使用了动态代理模式(继承InvocationHandler接口),内部实现了invoke函数,通过method.invoke()执行真正的逻辑,这样的好处是可以在method.invoke()上下文中添加自己其他的逻辑,RetryingRawStore就是在通过捕捉invoke函数抛出的异常,来达到重试的效果。由于使用reflection机制,异常是wrap在InvocationTargetException中的,不过在hive 0.9中竟然在捕捉到此异常后直接throw出来了,而不是retry,明显不对啊。我对它修改了下,拿出wrap的target exception,判断是不是instance of jdoexception的,再做相应的处理

 

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Object ret = null;

    boolean gotNewConnectUrl = false;
    boolean reloadConf = HiveConf.getBoolVar(hiveConf,
        HiveConf.ConfVars.METASTOREFORCERELOADCONF);
    boolean reloadConfOnJdoException = false;

    if (reloadConf) {
      updateConnectionURL(getConf(), null);
    }

    int retryCount = 0;
    Exception caughtException = null;
    while (true) {
      try {
        if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException) {
          initMS();
        }
        ret = method.invoke(base, args);
        break;
      } catch (javax.jdo.JDOException e) {
        caughtException = (javax.jdo.JDOException) e.getCause();
      } catch (UndeclaredThrowableException e) {
        throw e.getCause();
      } catch (InvocationTargetException e) {
        Throwable t = e.getTargetException();
        if (t instanceof JDOException){
          caughtException = (JDOException) e.getTargetException();
          reloadConfOnJdoException = true;
          LOG.error("rawstore jdoexception:" + caughtException.toString());
        }else {
            throw e.getCause();
        }
      }

      if (retryCount >= retryLimit) {
        throw caughtException;
      }

      assert (retryInterval >= 0);
      retryCount++;
      LOG.error(
          String.format(
              "JDO datastore error. Retrying metastore command " +
                  "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));
      Thread.sleep(retryInterval);
      // If we have a connection error, the JDO connection URL hook might
      // provide us with a new URL to access the datastore.
      String lastUrl = getConnectionURL(getConf());
      gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);
    }
    return ret;
  }


初始化RawStore有两种方式,一种是在RetryingRawStore的构造函数中调用"this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf);" 因为ObjectStore实现了Configurable,在newInstance方法中主动调用里面的setConf(conf)方法初始化RawStore,还有一种情况是在捕捉到异常后retry,也会调用base.setConf(getConf());

 

private void initMS() {
    base.setConf(getConf());
  }


ObjectStore的setConf方法中,先将PersistenceManagerFactory锁住,pm close掉,设置成NULL,再初始化pm

 

public void setConf(Configuration conf) {
    // Although an instance of ObjectStore is accessed by one thread, there may
    // be many threads with ObjectStore instances. So the static variables
    // pmf and prop need to be protected with locks.
    pmfPropLock.lock();
    try {
      isInitialized = false;
      hiveConf = conf;
      Properties propsFromConf = getDataSourceProps(conf);
      boolean propsChanged = !propsFromConf.equals(prop);

      if (propsChanged) {
        pmf = null;
        prop = null;
      }

      assert(!isActiveTransaction());
      shutdown();
      // Always want to re-create pm as we don't know if it were created by the
      // most recent instance of the pmf
      pm = null;
      openTrasactionCalls = 0;
      currentTransaction = null;
      transactionStatus = TXN_STATUS.NO_STATE;

      initialize(propsFromConf);

      if (!isInitialized) {
        throw new RuntimeException(
        "Unable to create persistence manager. Check dss.log for details");
      } else {
        LOG.info("Initialized ObjectStore");
      }
    } finally {
      pmfPropLock.unlock();
    }
  }
private void initialize(Properties dsProps) {
    LOG.info("ObjectStore, initialize called");
    prop = dsProps;
    pm = getPersistenceManager();
    isInitialized = pm != null;
    return;
  }


回到一开始报错的那段信息,怎么会Persistence Manager会被关闭呢,仔细排查后才发现是由于HCatalog使用HiveMetastoreClient用完后主动调用了close方法,而一般Hive里面内部不会调这个方法.

HiveMetaStoreClient.java

 

public void close() {
    isConnected = false;
    try {
      if (null != client) {
        client.shutdown();
      }
    } catch (TException e) {
      LOG.error("Unable to shutdown local metastore client", e);
    }
    // Transport would have got closed via client.shutdown(), so we dont need this, but
    // just in case, we make this call.
    if ((transport != null) && transport.isOpen()) {
      transport.close();
    }
  }


对应server端HMSHandler中的shutdown方法

@Override
    public void shutdown() {
      logInfo("Shutting down the object store...");
      RawStore ms = threadLocalMS.get();
      if (ms != null) {
        ms.shutdown();
        ms = null;
      }
      logInfo("Metastore shutdown complete.");
    }


ObjectStore的shutdown方法

 

public void shutdown() {
    if (pm != null) {
      pm.close();
    }
  }

 

我们看到shutdown方法里面只是把当前thread的ObjectStore拿出来后,做了一个ObjectStore shutdown方法,把pm关闭了。但是并没有把ObjectStore销毁掉,它还是存在于threadLocalMS中,下次还是会被拿出来,下一次这个thread服务于另外一个请求的时候又会被get出ObjectStore来,但是由于里面的pm已经close掉了所以肯定抛异常。正确的做法是应该加上threadLocalMS.remove()或者threadLocalMS.set(null),主动将其从ThreadLocalMap中删除。

修改后的shutdown方法

 

public void shutdown() {
      logInfo("Shutting down the object store...");
      RawStore ms = threadLocalMS.get();
      if (ms != null) {
        ms.shutdown();
        ms = null;
        threadLocalMS.remove();
      }
      logInfo("Metastore shutdown complete.");
    }
 
一、             简述原理 
    metastore服务端配置metastore相关参数,并启动metastore服务进程.hive客户端配置连接服务端metastore参数。客户端读取hive元数据,请求(HiveMetaStoreClient)发送到metastore服务端(HiveMetaStore),服务端查询(JDO-mysql)并返回元数据信息给客户端。好处是:hive客户端配置文件中无mysql地址及口令信息,提高元数据安全性。
如果第一个MetaSoreServer请求失败,收到回执信息,才会请求下一个MetaSoreServer服务,为非广播形式发送读取请求。
已解决,查看修改后客户端源码。 
二、             测试部署
 
Hive:0.8.1  Hadoop: hadoop-0.20.2-cdh3u3-x   
a)         Mysql元数据IP为:172.XX.XX.137  Hive metastore service: 172.XX.XX.137,172.XX.XX.136     Hive client: 172.XX.XX.134
b)         在172.XX.XX.137(部署方式和部署Hive客户端一致)
监听端口(默认是 9083)
通过环境变量METASTORE_PORT指定或者通过-p指定
c)         启动metastoreServer :nohup hive –service metastore & (注意service前是2个-)
d)         修改Hive客户端134配置文件,指向metastore Server提供服务的地址
相关配置参数
  
  hive.metastore.warehouse.dir  
  hdfs://BJ-YW-test-HA-6126.jd.com:54310/user/user/warehouse  
  
  
 hive.metastore.local  
  false  
  
  
  
  hive.metastore.uris  
    thrift://172.XX.XX.XX:9083, thrift://172.17.6.136:9083  
  
 
测试内容
   
     客户端134正常连接hive,读取表元数据信息
     断开137 metastore服务,客户端134正常读取元数据信息
     断开 136 metastore服务,客户端134正常读取元数据信息
 
 
Hive:0.12.0  Hadoop: hadoop-2.0.0-cdh4.3.0    
Mysql元数据IP为:172.XX.XX.116  Hive metastore service: 172.XX.XX.116 ,172.XX.XX.84    Hive client: 172.XX.XX.116
e)         在172.XX.XX.86(部署方式和部署Hive客户端一致)
监听端口(默认是 9083)
通过环境变量METASTORE_PORT指定或者通过-p指定
f)          启动metastoreServer :nohup hive –service metastore & (注意service前是2个-)
g)         修改Hive客户端134配置文件,指向metastore Server提供服务的地址
相关配置参数
  
  hive.metastore.warehouse.dir  
   hdfs://ns1/user/hadp/yw  
  
  
  
   hive.metastore.client.socket.timeout   
  3600  
  
  
  hive.metastore.uris  
    thrift://172.XX.XX.116:9083, thrift://172.XX.XX.84:9083  
  
 
测试内容
   
        客户端86正常连接hive,读取表元数据信息
        断开116 metastore服务,客户端86正常读取元数据信息
        断开 84 metastore服务,客户端86正常读取元数据信息
        断开所有metastore服务,客户端86无法读取元数据信息
        恢复其中一台metastore服务,客户端正常读取元数据信息
        客户端调用metastore服务新建表、删表成功
 
metastore相关参数(红色字段为本次使用属性):
 
服务端
 
hive.metastore.event.listeners:metastore的事件监听器列表,逗号隔开,默认是空;

hive.metastore.authorization.storage.checks:在做类似drop partition操作时,metastore是否要认证权限,默认是false;

hive.metastore.event.expiry.duration:事件表中事件的过期时间,默认是0;

hive.metastore.event.clean.freq:metastore中清理过期事件的定时器的运行周期,默认是0;

javax.jdo.option.Multithreaded:是否支持并发访问metastore,默认是true;
 
hive.metastore.server.min.threads:在thrift服务池中最小的工作线程数,默认是200;

hive.metastore.server.max.threads:最大线程数,默认是100000;

hive.metastore.server.tcp.keepalive:metastore的server是否开启长连接,长连可以预防半连接的积累,默认是true;

hive.metastore.sasl.enabled:metastore thrift接口的安全策略,开启则用SASL加密接口,客户端必须要用Kerberos机制鉴权,默认是不开启false;

hive.metastore.kerberos.keytab.file:在开启sasl后kerberos的keytab文件存放路径,默认是空;

hive.metastore.ds.connection.url.hook:查找JDO连接url时hook的名字,默认是javax.jdo.option.ConnectionURL;

hive.metastore.kerberos.principal:kerberos的principal,_HOST部分会动态替换,默认是hive-metastore/_HOST@EXAMPLE.COM
hive.metastore.batch.retrieve.max:在一个批处理获取中,能从metastore里取出的最大记录数,默认是300
hive.metastore.cache.pinobjtypes:在cache中支持的metastore的对象类型,由逗号分隔,默认是Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order;
hive.metastore.schema.verification:强制metastore的schema一致性,开启的话会校验在metastore中存储的信息的版本和hive的jar包中的版本一致性,并且关闭自动schema迁移,用户必须手动的升级hive并且迁移schema,关闭的话只会在版本不一致时给出警告,默认是false不开启;
hive.metastore.archive.intermediate.original :用于归档压缩的原始中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。 
hive.metastore.archive.intermediate.archived :用于归档压缩的压缩后的中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。 
hive.metastore.archive.intermediate.extracted :用于归档压缩的解压后的中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。 
hive.metastore.partition.inherit.table.properties:当新建分区时自动继承的key列表,默认是空;
hive.metastore.end.function.listeners:metastore函数执行结束时的监听器列表,默认是空;
hive.metastore.execute.setugi:非安全模式,设置为true会令metastore以客户端的用户和组权限执行DFS操作,默认是false,这个属性需要服务端和客户端同时设置;

hive.metastore.rawstore.impl:原始metastore的存储实现类,默认是org.apache.hadoop.hive.metastore.ObjectStore;
 
 
客户端
hive.metastore.uris: 客户端连接远程metastore服务地址端口

hive.metastore.local:控制hive是否连接一个远程metastore服务器还是开启一个本地客户端jvm,默认是true,Hive0.10已经取消了该配置项;
hive.metastore.warehouse.dir:指定Hive的存储目录

hive.metastore.client.socket.timeout:客户端socket超时时间,默认20秒;

hive.metastore.ds.retry.attempts:当出现连接错误时重试连接的次数,默认是1次;

hive.metastore.ds.retry.interval:metastore重试连接的间隔时间,默认1000毫秒

datanucleus.connectionPoolingType:使用连接池来访问JDBC metastore,默认是DBCP

hive.metastore.execute.setugi:非安全模式,设置为true会令metastore以客户端的用户和组权限执行DFS操作,默认是false,这个属性需要服务端和客户端同时设置;
hive.metastore.client.connect.retry.delay:客户端在连续的重试连接等待的时间,默认1;

hive.metastore.connect.retries:创建metastore连接时的重试次数,默认是5;
 
可能风险:
服务端有可能出现宕机,原因当获取hive表相关partition太多,返回partition信息太大,会导致JVM heap space ,这里的partition为5W。实际生产中按天做partition,partition不会太多。
 
 
压力测试:详见(org.apache.hadoop.hive.metastore.test.MetaStoreTest)

前置:
 
1个Metastore service(116)
 
测试功能:多线程访问service,每个线程里都新建client,调用getDatabase(mydb)
返回Database对象,判断对象getName().equals("mydb"),如果是,调用同步方法,
让外界变量+1,最后得出结果:所有线程执行总时间+外界变量数值(成功返回个数)
 
结果:
单台机器116(本地)
service5000线程 time:176368毫秒 OK:5000
单台机器 84(远程)
13:57:09-13:57:52 time:280382 OK:5000
 
 
双台机器86\84访问 service 10000*2 共4getDatabase请求
Client 84:
12:51:07-12:52:09 time:277613 OK:10000
12:47:53-12:49:32 time:114669 OK:10000
Client 86:
12:50:36-12:51:36 time:251133 OK:10000
12:47:57-12:49:06 time:83487  OK:10000
 
经日志验证 116 服务端 84:20000 86:20000 全部正常
 
 
双台机器86\136\137访问 service 10000*2 共6getDatabase请求
Client 86:
15:08:43-15:09:59 time: 90461    OK:10000
15:09:04-15:10:12 time: 88821    OK:10000
Client 136:
15:09:01-15:10:27 time: 117757    OK:10000
15:09:15-15:10:46 time: 120695    OK:10000
Client 137:
15:14:26-15:15:46 time: 435969    OK:10000
15:16:33-15:17:48 time: 544234    OK:10000
 
经日志验证 116 服务端 86:20000 136:20000 137:20000 全部正常
 
 
测试功能:多线程访问service,每个线程里都新建client,调用listPartitions (“gdm”,” gdm_online_log”),每个线程listPartitions会生成10 条mysql select 语句
返回List对象,判断对象个数(60)>0,如果是,调用同步方法,
让外界变量+1,最后得出结果:所有线程执行总时间+外界变量数值(成功返回个数)
 
一台机器136访问 service 1500 listPartitions请求
Client 136:
14:09:24-14:15:44 time :379958    OK:1500
三台机器86\136\137访问 service 1000*2 共6000 listPartitions请求
Client 86:当时该机器正在执行MapReduce,所以速度慢
15:30:38-15:50:58 time: 1221170    OK:1000
15:18:20-15:51:55 time: 1399362    OK:1000
Client 136:
15:39:44-15:47:48 time: 483550     OK:1000
15:35:30-15:48:02 time: 741888     OK:1000
Client 137:
15:39:36-15:47:48 time: 494850     OK:1000
15-32:30-15:48:04 time: 934510     OK:1000 

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有