加载中…
个人资料
一直在沉睡
一直在沉睡
  • 博客等级:
  • 博客积分:0
  • 博客访问:33,407
  • 关注人气:6
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
相关博文
推荐博文
谁看过这篇博文
加载中…
正文 字体大小:

alisoft-xplatform-asf-cache使用2

(2011-03-03 15:03:33)
标签:

杂谈

分类: 缓存
                 memcache数据缓存系统
一 目的:
 在服务器集群环境下,设置一个共享数据的另一机制。
 减少对数据库的直接操作,减轻数据库负担。
 提高系统的响应能力。
二 系统整体结构:
三 系统设计说明
1.Memcache服务器集群:
     Memcache服务器是逻辑上的划分,在分析业务数据的基础上,将数据安排存放在不同的Memcache服务器上,有必要的话可以将不同硬件服务器上的多个Memcache服务器再做成一个数据互相备份的组,避免数据的单点丢失的风险。
Memcache服务器安装在不同的硬件服务器上,比如可以在一台硬件服务器上安装3个Memcache服务器,在另外一台硬件服务器上安装4个Memcache服务器。
2.缓存数据的配置信息:
   在数据库中建一张表来说明Memcache服务器集群中缓存数据的存放逻辑,提供给memCache管理程序使用,以便实现从缓存到数据库,数据库到缓存双向的数据存取。
表结构设计如下:
cache_data_propertits_1(读数据库写缓存)
列 列名 类型 说明
mc_server MC服务器 CHAR(10) 决定数据存放在哪个MC服务器,和MC的客户端配置文件要求一致。
Object_ key 缓存对象的主键 Char(30) 存放在MC对象的主键。
Object_key _suffix 缓存对象的主键后缀 Char(20) 存放在MC对象的主键后缀(给使用者作为参考,该值一般对应系统中对象的具体id,比如customer_no,emp_no等)。
Proc_name 过程名 Char(80) Memcache管理程序定期执行生成缓存数据。
Proc_control_type 过程控制类型 Char(10) “定时”,“间隔”。
Proc_control_parm 过程控制参数 Char(50) 若“定时”:1,17等,若“间隔”:
6,12等。
Proc_flag 是否立即执行 Char(2) 是/否
Status 状态 Char(4) 有效/无效
Note 说明 Char(100) 说明用途
Proc_status 是否在运行状态 Char(2) 如果这个任务正在运行就标志出来。
Last_proc_time 最近执行完成时间 Datetime 这个任务最近执行成功时间。

cache_data_propertits_2(读缓存写数据库)
列 列名 类型 说明
mc_server MC服务器 CHAR(10) 决定数据存放在哪个MC服务器,和MC的客户端配置文件要求一致。
Object_list 存放缓存对象的主键的列表 Char(30) 例如 ep_log_list(存放日志数据,这样可以将日志文件先写到缓存,到空闲的时候再复制到数据库)
Object_ key 缓存对象的主键 Char(30) 例如:ep_log
Object_key _suffix 缓存对象的主键后缀 Char(20) 例如:唯一码。
Proc_name 过程名 Char(80) 例如:proc_ep_log_mc ? ??
Proc_parm 过程参数 Char(200) 例如: start_time, end_time, proc_time,
sql
Proc_control_type 过程控制类型 Char(10) “定时”,“间隔”。
Proc_control_parm 过程控制参数 Char(50) 若“定时”:1,17等,若“间隔”:
6,12等。
Proc_flag 是否立即执行 Char(2) 是/否
Status 状态 Char(4) 有效/无效
Note 说明 Char(100) 说明用途
Proc_status 是否在运行状态 Char(2) 如果这个任务正在运行就标志出来。
Last_proc_time 最近执行完成时间 Datetime 这个任务最近执行成功时间。
3.memCache管理程序
     memCache管理程序最为独立的运行的应用程序,它主要负责读取配置表中的配置信息,根据配置文件的设置,交换数据库和缓存中的数据。
memCache管理程序可以利用目前比较完善的memCache客户端程序,以便加强功能。
4.业务系统
 业务系统开发人员参考配置文件,决定如何从缓存取得数据,并采用合适的策略。(比如:是利用本地缓存,还是直接利用memCache缓存)
 开发代理访问缓存的程序。
5.存放在缓冲中的数据格式
采用 ArrayList <Map m>的形式,目的是为了通用。
但是这个结构额外的内存开销会比较大,而且序列化对象也会增加系统的响应负担。

四 具体实现说明
   根据业务实际需要,目前只实现了后台管理端读数据库写缓存,然后业务程序读缓存,当缓存中没有业务数据数据的时候,写入缓存。

1 缓冲服务器
    在多台服务器上安装memcached服务器,服务器的安装步骤,参与我的另一篇文章《建立集群架构的步骤》。
2 管理端程序:
用到的jar 包:
a. alisoft-xplatform-asf-cache-2.5.jar
b. commons-logging.jar
c. j2ee.jar
d. jconn3.jar
e. log4j-1.2.11.jar
f. proxool-0.9.1.jar
g. proxool-cglib.jar

类McTranslateServer.java

public class McTranslateServer {
public static String LOG_PATH = "";         //日志存放目录
    private static SimpleDateFormat dtformat;
public static void main(String[] args) throws Exception{
McTranslateServer mcts = new McTranslateServer();
mcts.go();
}
public void log(String s)throws Exception{
dtformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(dtformat.format(new Date()) + " " + s);
}
    public void loadConfig() throws Exception{
        InputStream is = getClass().getResourceAsStream("mcts.properties");
        Properties pp = new Properties();
        pp.load(is);
        LOG_PATH = pp.getProperty("log_path","");
     }

public void go() throws Exception{
log("config file loading......");
loadConfig();
McTranslateTimerTask t = new McTranslateTimerTask(0);
t.start(1,1200000);
McTranslateTimerTask t2 = new McTranslateTimerTask(1);
t2.start(300000,1200000);

log("DataTranslateServer start......");
}
}
类McTranslateTimerTask.java

public class McTranslateTimerTask {
private DataAccess da = null;
private TransactionContext dbtxt = null;

    //for log
    private SimpleDateFormat dtFormater;   //日期时间格式化
    private PrintStream logger = null ;    //日志输出
    private String logDate = "";           //当前日志的记录日期,当前日期字符串
                                           // 不等于此值则要重建logger和logDate
    private Base base = new Base();
    private Timer timer;
    private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
   
    private int step = 0;

    public McTranslateTimerTask(int i) {
    timer = new Timer();  
dbtxt = new TransactionContext("wahaha",true);
da = new DataAccess();
da._TransactionContxt = dbtxt;
step = i;

  
    }
    public void start(int delay,int period){
    timer.scheduleAtFixedRate(ttask,delay,period);    
    }
   
    private TimerTask ttask = new TimerTask(){
    public void run(){
    //因为是两个db,conn单个控制free
   
            try{
            //取配置表数据
            ArrayList properties = null;
            String sql = "select * from cache_data_propertits_1 where status = '1' and proc_status='1' and convert(int,id) % 2 ="+step;
            ResultSet rs = null;
            try{
            rs = da.doQuery(sql);
            properties= da.getArrByRs4(rs);
            //log("info:...读取配置文件");
            if (properties.size()>0){
                    sql = " update cache_data_propertits_1 set proc_status='0' where status='1' and proc_status='1' and convert(int,id) % 2 ="+step;
                da.doUpdate(sql);
                    dbtxt.commit();
                //log("info:...锁定配置文件>proc_status='0'");
            }
                }catch(Exception e){
                    dbtxt.rollback();
                    log("error:...锁定配置文件出错 >" + e.toString());
            throw new Exception(e.toString());
            }finally{
            dbtxt.freeConnection();
            }
           
           
            Date now = new java.util.Date();
            for(int k=0;k<properties.size();k++){
            try{
                Map mm = (HashMap)properties.get(k);
            String id = String.valueOf(mm.get("id"));
            String mc_server = (String)mm.get("mc_server");
            String object_key = (String)mm.get("object_key");          
            String object_key_suffix = (String)mm.get("object_key_suffix");
            String proc_name = (String)mm.get("proc_name");
            String proc_parm = (String)mm.get("proc_parm");
            String proc_control_type = (String)mm.get("proc_control_type");
            String proc_control_parm = (String)mm.get("proc_control_parm");
            String proc_flag = (String)mm.get("proc_flag");
            String last_proc_time = (String)mm.get("last_proc_time");
            if (last_proc_time.equals(""))
            {last_proc_time="1900-1-1 00:00:00";}
            Date last_proc_time_d = df.parse(last_proc_time);
            boolean run_flag = false;
            if (proc_flag.equals("1")){
            run_flag = true;
            }else if (proc_control_type.equals("定时")){
            String[] proc_control_parms = proc_control_parm.split(",");
            for(int i=0;i<proc_control_parms.length;i++){
            proc_control_parms[i]=df2.format(new java.util.Date())+" "+ proc_control_parms[i];
                if (last_proc_time_d.before(df.parse(proc_control_parms[i])) && now.after(df.parse(proc_control_parms[i]))){
                run_flag = true;
                break;
                }
            }
            }else if (proc_control_type.equals("间隔")){
            if ((now.getTime() - last_proc_time_d.getTime())/(60*60*1000) >Integer.parseInt(proc_control_parm)){
            run_flag = true;
            }
           
            }
            if (run_flag){
            //log("info:...开始>id:"+ id+";object_key:"+object_key);
try {
databaseToMc(mc_server,object_key,object_key_suffix,proc_name,proc_parm);
log("info:...复制>id:"+ id+";object_key:"+object_key+";参数:"+proc_parm+"\r\n");
} catch (Throwable e) {
log("error:...出错>id:"+ id+";object_key:"+object_key+";参数:"+proc_parm +";错误信息:"+ e.toString());
throw new Exception(e.toString());
}finally{
            dbtxt.freeConnection();
            }
            }
                //更新配置表中proc_mark = '1' 和 syn_date = getdate()
                try{
                if (run_flag){
                    sql = " update cache_data_propertits_1 set proc_status = '1' ,last_proc_time='"+df.format(new java.util.Date())+"',proc_flag='0' where id= "+id;                   
                }else{
                sql = " update cache_data_propertits_1 set proc_status = '1' ,proc_flag='0' where id= "+id;                   
                }
                //sql = " update cache_data_propertits_1 set proc_status = '1' ,last_proc_time='"+df.format(new java.util.Date())+"' where id= "+id;                   
                da.doUpdate(sql);
                             dbtxt.commit();         
                             //log("info:...恢复标记>id:"+ id+";object_key:"+object_key+";参数:"+proc_parm+"\r\n");
                }catch(Exception e){               
                dbtxt.rollback();              
                log("error:...恢复标记出错>id:"+ id+";object_key:"+object_key +";参数:"+proc_parm+";错误信息:"+ e.toString());
                throw new Exception();
                }finally{
                dbtxt.freeConnection();
                }
            }catch(Exception e){
            throw new Exception(e.toString());
            }//--一个进程结尾
            }
               
            }catch(Exception e){
            try{
                    //出错时恢复标记。
                    try{
                        String sql = " update cache_data_propertits_1 set proc_status='1' "+
                              " where status = '1' and proc_status='0' and convert(int,id) % 2 ="+step;
                        da.doUpdate(sql);
                        dbtxt.commit();
                        //log("info:...恢复标记此次接口执行出错,将未执行的过程标志全重置为1\r\n");
                    }catch(Exception ex){
                        dbtxt.rollback();
                        log("error: ...恢复标记出错>错误信息:"+ e.toString());
                    }finally{
            dbtxt.freeConnection();
            }
                   
            base.sendMail("","dataTrans@wahaha.com.cn","zhoucw@wahaha.com.cn;","Data Trans Error!",e.toString());
            }catch(Exception ee){};//最后统一扔出异常
            }
            finally{
        try {
dbtxt.freeConnection();
    } catch (Exception e) {
}
            }
    }
   
    };
    private void log(String s) {
        Map obj = new HashMap();
        obj.put("logger",logger);
        obj.put("dtf",dtFormater);
        logDate = base.getLogFile(McTranslateServer.LOG_PATH,obj,logDate,"dts");
        logger = (PrintStream)obj.get("logger");
        dtFormater = (SimpleDateFormat)obj.get("dtf");
        logger.println(dtFormater.format(new java.util.Date()) + " ---" + s);
    }


    public void databaseToMc(String mc_server,String object_key,String object_key_suffix,String proc_name,String proc_parm) throws Throwable{
    //从database收集数据
    proc_name = " {call "+proc_name+"}";
    CallableStatement cs = da.callProc(proc_name);
    if (!proc_parm.equals("")){
    String[] parms = proc_parm.split(",");
    for (int i=0;i<parms.length;i++){
    cs.setString(i+1, da.toDB(parms[i]));
    }
    }
   
    ResultSet rs = cs.executeQuery();
   
    ResultSetMetaData rsmd = rs.getMetaData();
    int numberofcolumns = rsmd.getColumnCount();
    int i;
    String[] columntype = new String[numberofcolumns+1];
    columntype[0]="";
        for(i=1; i<= numberofcolumns; i++){
        columntype[i]=rsmd.getColumnTypeName(i).toLowerCase();
        }
        String[] columnname = new String[numberofcolumns+1];
        columnname[0]="";       
        for(i=1; i<= numberofcolumns; i++){
        columnname[i]=rsmd.getColumnLabel(i);
       
        }

    //将数据放入Mc
    IMemcachedCache cache0 = McClient.getMemcachedCache(mc_server);
   
    String objectKey ="";
    String temp_objectKey ="";
    ArrayList arr= new ArrayList();
    Thread.sleep(1000);
    while(rs.next()){
          Map mm = new HashMap();
    for(i =1;i<= numberofcolumns; i++)
   
           {
             
               if ((columntype[i].equals("binary"))||(columntype[i].equals("varbinary")))
               mm.put(columnname[i],rs.getBinaryStream(i) == null ? "&nbsp;":rs.getBinaryStream(i));
               else if (columntype[i].equals("blob"))
               mm.put(columnname[i],rs.getBlob(i));
               else if ((columntype[i].equals("char"))||(columntype[i].equals("varchar"))||(columntype[i].equals("longvarchar"))||(columntype[i].equals("text")))
                  
                   String s = rs.getString(i);
                   String r = "";
                   if( s != null && !( s.trim()).equals("") )
                        r = new String((s.trim()).getBytes("ISO8859_1"),"GBK" ) ;
                   mm.put(columnname[i],r);
               }
               else if ((columntype[i].equals("date")))
               {
               mm.put(columnname[i],rs.getDate(i) == null ? "":rs.getDate(i));                   
               }else if (columntype[i].equals("datetime")||columntype[i].equals("smalldatetime")){
               mm.put(columnname[i],rs.getString(i) == null ? "":rs.getString(i));
               }
               else if ((columntype[i].equals("decimal"))||(columntype[i].equals("numeric"))||(columntype[i].equals("real")))
               {
                   BigDecimal bd = rs.getBigDecimal(i);
                   if(bd==null){
                bd=new BigDecimal(0.0);
                mm.put(columnname[i],bd);
                   }
                   else{
                   bd = bd.setScale(2,BigDecimal.ROUND_HALF_UP);
                   if(bd.floatValue()==bd.intValue()){
                   bd = bd.setScale(0,BigDecimal.ROUND_HALF_UP);
                   }
                   mm.put(columnname[i],bd);
                   }
               }
               else if ((columntype[i].equals("float"))||(columntype[i].equals("money")))
               {                    
                    Float b = new Float(rs.getFloat(i));
                    mm.put(columnname[i],b.equals(new Float(0)) ? "":b);
               }
               else if ((columntype[i].equals("integer"))||(columntype[i].equals("smallint"))||(columntype[i].equals("int"))||(columntype[i].equals("tinyint")))
               {
                    
                     Integer c = new Integer(rs.getInt(i));
                     mm.put(columnname[i],c.equals(new Integer(0)) ? "":c);
               }
               else if (columntype[i].equals("null"))
               mm.put(columnname[i],"");
               else   mm.put("error","无法识别的类型");            
              
           }
   
    objectKey = object_key + (String)mm.get(object_key_suffix);
    if (temp_objectKey.equals("")){
    temp_objectKey = objectKey;
    }
    if (!temp_objectKey.equals(objectKey)){
   
    cache0.put(temp_objectKey, arr,24*60*60);   
    for(int k = 0;k<arr.size();k++){
    Map mm2=(HashMap)arr.get(k) ;
    mm2 = null;
   
    }
    System.gc();
    arr.clear();
   
    }
    temp_objectKey = objectKey;
    arr.add(mm);
   
       
    }
    if (arr!=null){
    cache0.put(temp_objectKey, arr,24*60*60);
    for(int k = 0;k<arr.size();k++){
    Map mm2=(HashMap)arr.get(k) ;
    mm2 = null;
    }    
    arr=null;
    System.gc();
    }
   
    columntype =null;
    columnname =null;
    System.gc();
   
    }

}


0

阅读 评论 收藏 转载 喜欢 打印举报/Report
  • 评论加载中,请稍候...
发评论

    发评论

    以上网友发言只代表其个人观点,不代表新浪网的观点或立场。

      

    新浪BLOG意见反馈留言板 电话:4000520066 提示音后按1键(按当地市话标准计费) 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 会员注册 | 产品答疑

    新浪公司 版权所有