加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

小型消息订阅/发布系统

(2022-04-12 16:43:06)
分类: unix
    在有些场景下,对性能要求不高,资源比较缺乏,又需要多个进程间建立ipc通信,可以考虑该设计,而不用引入kafka这样大型的分布式系统。

    总体设计:
    服务端与客户端建立tcp连接。
    服务进程:用来接收客户端socket连接及消息数据。
    提供so给客户端:用来连接服务端socket,可以订阅消息/取消订阅消息/发布消息。

    服务端类设计:
    
    IPCManager: 管理类,包含一个map管理IPCChannel列表和一个set集合IPCPeer列表
    IPCChannel: 管理channel,一个消息id对应一个channel,使用set成员来管理订阅了该channel的客户端socket fd。当一个客户端往服务端发布消息时,服务端接收消息,通过消息id查找channel,然后遍历channel里的set集合IPCPeer发送消息,达到发布目的。
    IPCPeer: 管理与客户端连接的socket fd。每一个客户端连接上来时,创建一个IPCPeer。

    服务端无限循环select监听的socket fd与所有IPCPeer的socket fd,如果是监听socket fd可读,调用accept()建立新socket连接,创建新IPCPeer。如果是某IPCPeer的socket fd可读,调用recv读取消息,对消息进行处理。
    消息设计:
    固定头部长度(12字节 消息部长度+命令类型+消息类型)+变长消息体
    服务端根据消息头部的命令类型(订阅消息/取消订阅消息/发布消息)进行处理:
    订阅消息:根据消息类型找到对应channel,往channel里的所有IPCPeer集合加入fd(对应一个客户端socket连接),表示该fd订阅了该消息。
    取消订阅消息:根据消息类型找到对应channel,从channel的IPCPeer集合移除fd,表示该fd取消了订阅该消息。
    发布消息:根据消息类型找到对应channel,遍历channel的IPCPeer集合,调用sendMsg()发送消息到该fd,达到将数据发送到各个应用的目的。

    客户端设计:
    设计几个有用的api:
    ipc_initialize(): 设置收到服务端数据的回调函数,创建与服务端的socket连接。
    ipc_subscribe(): 创建ipc消息,只有头部,通过socket发送给服务端,订阅消息。
    ipc_unsubscribe():创建ipc消息,只有头部,通过socket发送给服务端,取消订阅消息。
    ipc_publish(): 创建ipc消息,通过socket发送给服务端,由服务端发送给其它订阅该消息的应用。
    ipc_loop():无限循环select客户端socket fd,fd可读时,读出消息,调用注册的回调函数进行处理。
    客户端应该另起一个线程来运行ipc_loop(),回调函数应由客户端自己给出,出来处理收到的ipc消息。

    

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有