加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

HAProxy 代码阅读(一)

(2013-04-09 21:12:37)
标签:

haproxy

代码阅读

it

分类: linux
HAproxy是一个简单强大的http/tcp负载均衡工具,其代码量只有5w多行,以下是一些阅读代码的过程的记录,方便以后查看。
 
HAProxy是一个单进程事件驱动的模型。
 
首先看一个比较重要的结构体:
struct poller {
    void   *private;                                     
    int  REGPRM2 (*is_set)(const int fd, int dir);       
    int  REGPRM2    (*set)(const int fd, int dir);       
    int  REGPRM2    (*clr)(const int fd, int dir);       
    int  REGPRM2 (*cond_s)(const int fd, int dir);       
    int  REGPRM2 (*cond_c)(const int fd, int dir);       
    void REGPRM1    (*rem)(const int fd);                
    void REGPRM1    (*clo)(const int fd);                
        void REGPRM2   (*poll)(struct poller *p, int exp);   
    int  REGPRM1   (*init)(struct poller *p);            
    void REGPRM1   (*term)(struct poller *p);            
    int  REGPRM1   (*test)(struct poller *p);            
    int  REGPRM1   (*fork)(struct poller *p);            
    const char   *name;                                  
    int    pref;                                         
};  
 
该结构体是事件循环的核心,各个字段都有比较详细的注释,就不解释了,注意一下函数指针前面的 REGPRM2和REGPRM1,这两个宏指示编译器,在函数调用时,参数传递采用寄存器传递。
 
 为 了使用不同的事件监听函数select/poll等,haproxy 和 libevent等一样,采用结构体+函数指针的方式来实现,在libevent是将结构体数组定义为全局变量,然后进行显示的赋值,在Haproxy中 同样定义了全局的结构体数组,但是却没有找到在那里进行初始化,最后在看ev_epoll.c的代码的时候,看到了以下这个函数:
 
__attribute__((constructor))
static void _do_register(void)
{
struct poller *p;
 
if (nbpollers >= MAX_POLLERS)
return;
 
epoll_fd -1;
&pollers[nbpollers++];
 
p->name "epoll";
p->pref 300;
p->private NULL;
 
p->test _do_test;
p->init _do_init;
p->term _do_term;
p->poll _do_poll;
p->fork _do_fork;
 
p->is_set  __fd_is_set;
p->cond_s p->set __fd_set;
p->cond_c p->clr __fd_clr;
p->rem __fd_rem;
p->clo __fd_clo;
}
 
该函数很明显就是给结构体中的函数指针赋值,但是却没有地方调用这个函数,原来被__attribute__((constructor))修饰的函数是在main函数之前自动调用的,见我的前一篇博客。
 
使用readelf 查看可执行文件,发现在我的系统上,haproxy调用了4个_do_register,分别是select/poll/epoll/sepoll,这几个pref值分别是100/200/300/400,可以看到sepoll的优先级是最高的。
sepoll在底层还是采用的epoll的系统调用如epoll_wait等,只是在外面加了一下优化,这个以后有机会再分析。
 
haproxy的主循环是 run_poll_loop();
 
展开该函数:
  void run_poll_loop()     
  
      int next;            
    
      tv_update_date(0,1); 
      while (1)          
                                                                                    
          signal_process_queue();                                                                                                         
    
          
          wake_expired_tasks(&next);                                                                                                      
    
                
          process_runnable_tasks(&next);                                                                                                  
    
                
          maintain_proxies(&next);                                                                                                        
    
                                                                
          if (!actconn && listeners == 0)    
              break;       
    
          
          cur_poller.poll(&cur_poller, next);                                                                                             
      }
   
 
cur_poller 是个全局变量,指向正在使用的poller结构。该函数中主要分析 process_runnable_tasks 函数。
 
process_runable_tasks主要是从运行队列中取出task,调用其process函数,process函数返回之后将task放入等待队列。
为了保证所有task都及时响应,每次调用运行队列取出的task最大为运行队列的1/4,同时个数不超过200个。
 
task结构体如下:

struct task {
 struct eb32_node wq;  
 struct eb32_node rq;  
 int state;   
 int expire;   
 unsigned int calls;  
 struct task * (*process)(struct task *t); 
 void *context;   
 int nice;   
};
 
process为函数指针,大部分情况下都是使用process_session对其赋值。

上 面介绍了HAProxy的主循环 run_poll_loop(),在该函数的最后是调用 cur_poller.poll(&cur_poller, next); ,其中cur_poller是指当前被使用的poller结构,本篇 以linux的epoll为例来看一下poller的工作方式。
 
首先看其初始化函数:
__attribute__((constructor))
static void _do_register(void)
{
struct poller *p;
 
    if (nbpollers >= MAX_POLLERS)
        return;
 
    epoll_fd -1;
&pollers[nbpollers++];
 
p->name "epoll";
    p->pref 300;
p->private NULL;
 
p->test _do_test;
    p->init _do_init;
    p->term _do_term;
p->poll _do_poll;
    p->fork _do_fork;
 
    p->is_set  __fd_is_set;
p->cond_s p->set __fd_set;
p->cond_c p->clr __fd_clr;
p->rem __fd_rem;
    p->clo __fd_clo;
}
 
其中poll 初始化为 _do_poll,在_do_poll中主做了两件事情:1、调用epoll_wait等待事件;2、调用相应fd的接收/发送函数。
调 用epoll_wait之前需要计算出最大允许事件个数,这里稍微有点奇怪的是,并没有检查epoll_wait的返回码,即当epoll_wait返回 -1时没有做处理,当然如果是因为信号中断而返回-1,run_poll_loop里面还是有处理的,但是其他错误,好像没有处理。
 
调用fd的接收/发送函数也是通过函数指针调用的,类似于下面的情形:
fdtab[fd].cb[DIR_RD].f(fd);
 
fdtab是一个结构体名称,也是一个全局变量名称,全局变量在main函数中进行初始化,fdtab结构体定义如下:
struct fdtab {
    struct {
 int (*f)(int fd);            
        struct buffer *b;            
    cb[DIR_SIZE];
    void *owner;                         
    struct                             
        unsigned char e;             
        unsigned int s1;             
    spec;
    unsigned short flags;                
    unsigned char state;                 
    unsigned char ev;                    
};
 
其中cb 有接收/发送的函数和缓冲区初始化,owner 在处理客户端连接时初始化为task,处理listen时初始化为listener。
 
fdtab初始化主要是进行了空间的分配,以及state的初始化,代码如下:
     fdtab = (struct fdtab *)calloc(1,
                        sizeof(struct fdtab) * (global.maxsock));
     for (i 0; global.maxsock; i++) {
         fdtab[i].state FD_STCLOSE;
     }
 
接下来查看cb 以及owner在哪里初始化,在client.c里面:
int event_accept(int fd) {
struct listener *l = fdtab[fd].owner;
.......
struct task *t;
......
 
task_new()
.......
 
s->si[0].owner = t;
 
fdtab[cfd].owner &s->si[0];   //owner 初始化为 task
 
fdtab[cfd].cb[DIR_RD].f l->proto->read;
fdtab[cfd].cb[DIR_RD].b s->req;
fdtab[cfd].cb[DIR_WR].f l->proto->write;
fdtab[cfd].cb[DIR_WR].b s->rep;
......
}
 
可见,初始化cb的是lintener里面的proto结构体。而该结构体是在哪里初始化呢?
 
在proto_tcp.c 中有如下代码:
void tcpv4_add_listener(struct listener *listener)                                                      
 
     if (listener->state != LI_INIT)
         return;
     listener->state = LI_ASSIGNED;                                                                      
     listener->proto = &proto_tcpv4;                                                                     
     LIST_ADDQ(&proto_tcpv4.listeners, &listener->proto_list);                                           
     proto_tcpv4.nb_listeners++;                                                                         
}
 
再看proto_tcpv4是一个static变量,在proto_tcp.c的开头初始化。
static struct protocol proto_tcpv4 {
    .name "tcpv4",     
    .sock_domain AF_INET,
    .sock_type SOCK_STREAM,
    .sock_prot IPPROTO_TCP,
    .sock_family AF_INET,
    .sock_addrlen sizeof(struct sockaddr_in),
    .l3_addrlen 32/8,  
    .read &stream_sock_read,
    .write &stream_sock_write,   
    .bind_all tcp_bind_listeners,
    .unbind_all unbind_all_listeners,
    .enable_all enable_all_listeners,
    .listeners LIST_HEAD_INIT(proto_tcpv4.listeners),                                                                                 
    .nb_listeners 0,
};
 
而tcpv4_add_listener的调用是在读取配置文件的时候,在cfgparse.c 中:
 
static int str2listener(char *str, struct proxy *curproxy)
{
......
tcpv4_add_listener(l);
......
}
 
到此为止,已经清楚了socket接收和发送的函数就是stream_sock_read 和 stream_sock_write。
 
继续往下看,stream_sock_read 和 stream_sock_write 在stream_sock.c 中,stream_sock_read 的代码比较长,其实干的事情比较简单明确:
1、计算接收缓冲区的剩余空间;
2、调用recv进行接收;
3、将fd相关的task放入run queue;
 
放入run queue的task在接下来的 run_poll_loop()里面被处理。到此一次poll完成。

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有