在團隊協同工具 Worktile的使用過程中,你會發現無論是右上角的消息通知,還是在任務面板中拖動任務,還有用戶的在線狀態,都是實時刷新。Worktile中的推送服務是採用的是基於XMPP協議、Erlang語言實現的Ejabberd,並在其源碼基礎上,結合我們的業務,對源碼作了修改以適配我們自身的需求。另外,基於AMQP協議也可以作為實時消息推送的一種選擇,踢踢網就是採用 RabbitMQ+STOMP協議實現的消息推送服務。本文將結合我在Worktile和踢踢網的項目實踐,介紹下消息推送服務的具體實現。
實時推送的幾種實現方式相較於手機端的消息推送(一般都是以Socket方式實現),WEB端是基於HTTP協議,很難像TCP一樣保持長連接。但隨著技術的發展,出現了WebSocket,Comet等新的技術可以達到類似長連接的效果,這些技術大體可分為以下幾類:
1)短輪詢。頁面端通過JS定時異步刷新,這種方式實時效果較差。
2)長輪詢。頁面端通過JS異步請求服務端,服務端在接收到請求後,如果該次請求沒有數據,則掛起這次請求,直到有數據到達或時間片(服務端設定)到,則返回本次請求,客戶端接著下一次請求。示例如下:
3)WebSocket。瀏覽器通過WebSocket協議連接服務端,實現了瀏覽器和伺服器端的全雙工通信。需要服務端和瀏覽器都支持WebSocket協議。
以上幾種方式中,方式1實現較簡單,但效率和實時效果較差。方式2對服務端實現的要求比較高,尤其是並發量大的情況下,對服務端的壓力很大。方式3效率較高,但對較低版本的瀏覽器不支持,另外服務端也需要有支持WebSocket的實現。Worktile的WEB端實時消息推送,採用的是XMPP擴展協議XEP-0124 BOSH( http://xmpp.org/extensions/xep-0124.html),本質是採用方式2長輪詢的方式。踢踢網則採用了WebSocket連接RabbitMQ的方式實現,下面我會具體介紹如何用這兩種方式實現Server Push。
運行時環境準備服務端的實現中,無論採用Ejabberd還是RabbitMQ,都是基於Erlang語言開發的,所以必須安裝Erlang運行時環境。Erlang是一種函數式語言,具有容錯、高並發的特點,藉助OTP的函數庫,很容易構建一個健壯的分布式系統。目前,基於Erlang開發的產品有,資料庫方面:Riak(Dynamo實現)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中間件有RabbitMQ等。對於服務端程式設計師來說,Erlang提供的高並發、容錯、熱部署等特性是其他語言無法達到的。無論在實時通信還是在遊戲程序中,用Erlang可以很容易為每一個上線用戶創建一個對應的Process,對一臺4核8個G的伺服器來說,承載上百萬個這樣的Process是非常輕鬆的事。下圖是Erlang程序發起Process的一般性示意圖:
如圖所示,Session Manager(or Gateway)負責為每個用戶(UID)創建相對應的Process, 並把這個對應關係(MAP)存放到數據表中。每個Process則對應用戶數據,並且他們之間可以相互發送消息。Erlang的優勢就是在內存足夠的情況下創建上百萬個這樣的Process,而且它的創建和銷毀比JAVA的Thread要輕量的多,兩者不是一個數量級的。
好了,我們現在開始著手Erlang環境的搭建(實驗的系統為Ubuntu 12.04, 4核8個G內存):
1、依賴庫安裝
sudo apt-get install build-essentialsudo apt-get install libncurses5-devsudo apt-get install libssl-dev libyaml-devsudo apt-get install m4sudo apt-get install unixodbc unixodbc-devsudo apt-get install freeglut3-dev libwxgtk2.8-devsudo apt-get install xsltprocsudo apt-get install fop tk8.5 libxml2-utils2、官網下載OTP源碼包(
http://www.erlang.org/download.html), 解壓並安裝:
tar zxvf otpsrcR16B01.tar.gzcd otpsrcR16B01configuremake & make install至此,erlang運行環境就完成了。下面將分別介紹rabbitmq和ejabberd構建實時消息服務。
基於RabbitMQ的實時消息服務RabbitMQ是在業界廣泛應用的消息中間件,也是對AMQP協議實現最好的一種中間件。AMQP協議中定義了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等實體,他們的關係如下圖所示:
消息發布者(Producer)連接交換器(Exchange), 交換器和消息隊列(Message Queue)通過KEY進行Binding,Binding是根據Exchange的類型(分為Fanout、Direct、Topic、Header)分別對消息作不同形式的派發。Message Queue又分為Durable、Temporary、Auto-Delete三種類型,Durable Queue是持久化隊列,不會因為服務ShutDown而消失,Temporary Queue則服務重啟後會消失,Auto-Delete則是在沒有Consumer連接時自動刪除。另外RabbitMQ有很多第三方插件,可以基於AMQP協議基礎之上做出很多擴展的應用。下面我們將介紹WEB STOMP插件構建基於AMQP之上的STOMP文本協議,通過瀏覽器WebSocket達到實時的消息傳輸。系統的結構如圖:
如圖所示,WEB端我們使用STOMP.JS和SockJS.JS與RabbitMQ的WEB STOMP Plugin通信,手機端可以用STOMPj, Gozirra(Android)或者Objc-STOMP(IOS)通過STOMP協議與RabbitMQ收發消息。因為我們是實時消息系統通常都是要與已有的用戶系統結合,RabbitMQ可以通過第三方插件RabbitMQ-AYTH-Backend-HTTP來適配已有的用戶系統,這個插件可以通過HTTP接口完成用戶連接時的認證過程。當然,認證方式還有LDAP等其他方式。下面介紹具體步驟:
從官網( http://rabbitmq.com/download.html)下載最新版本的源碼包,解壓並安裝:
tar zxf rabbitmq-server-x.x.x.tar.gzcd rabbitmq-server-x.x.xmake & make install為RabbitMQ安裝WEB-STOMP插件
cd /path/to/your/rabbitmq./sbin/rabbitmq-plugins enable rabbitmq_web_stomp./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples./sbin/rabbitmqctl stop./sbin/rabbitmqctl start./sbin/rabbitmqctl status將會顯示下圖所示的運行的插件列表
安裝用戶授權插件
cd /path/to/your/rabbitmq/pluginswget <a href="http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez">http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez</a>cd .../sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http編輯RabbitMQ.Config文件(默認存放於/ECT/RabbitMQ/下),添加:
[ ... {rabbit, [{auth_backends, [rabbit_auth_backend_http]}]}, ... {rabbitmq_auth_backend_http, [{user_path, 「http://your-server/auth/user」}, {vhost_path, 「http://your-server/auth/vhost」}, {resource_path, 「http://your-server/auth/resource」} ]} ...].其中,User_Path是根據用戶名密碼進行校驗,VHOST_Path是校驗是否有權限訪問VHOST, Resource_Path是校驗用戶對傳入的Exchange、Queue是否有權限。我下面的代碼是用Node.js實現的這三個接口的示例:
var express = require('express'); var app = express(); app.get('/auth/user', function(req, res){ var name = req.query.username; var pass = req.query.password; console.log("name : " + name + ", pass : " + pass); if(name === 'guest' && pass === "guest"){ console.log("allow"); res.send("allow"); }else{ res.send('deny'); } }); app.get('/auth/vhost', function(req, res){ console.log("/auth/vhost"); res.send("allow"); }); app.get('/auth/resource', function(req, res){ console.log("/auth/resource"); res.send("allow"); }); app.listen(3000);瀏覽器端JS實現,示例代碼如下:
...... var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp'); var client = Stomp.over(ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0; client.heartbeat.incoming = 0; client.debug = pipe('#second'); var print_first = pipe('#first', function(data) { client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data); }); var on_connect = function(x) { id = client.subscribe("/exchange/feed/user_x", function(d) { print_first(d.body); }); }; var on_error = function() { console.log('error'); }; client.connect('guest1', 'guest1', on_connect, on_error, '/'); ......需要說明的時,在這裡我們首先要在RabbitMQ實例中創建Feed這個Exchange,我們用STOMP.JS連接成功後,根據當前登陸用戶的ID(user_x)綁定到這個Exchange,即Subscribe(「/exchange/feed/user_x」, …) 這個操作的行為,這樣在向RabbitMQ中Feed Exchange發送消息並指定用戶ID(user_x)為KEY,頁面端就會通過WEB Socket實時接收到這條消息。
到目前為止,基於RabbitMQ+STOMP實現WEB端消息推送就已經完成,其中很多的細節需要小夥伴們親自去實踐了,這裡就不多說了。實踐過程中可以參照官方文檔:
http://rabbitmq.com/stomp.html
http://rabbitmq.com/web-stomp.html
https://github.com/simonmacmullen/rabbitmq-auth-backend-http以上的實現是我本人在踢踢網時採用的方式,下面接著介紹一下現在在Worktile中如何通過Ejabberd實現消息推送。
基於Ejabberd的實時消息推送與RabbitMQ不同,Ejabberd是XMPP協議的一種實現,與AMQP相比,XMPP廣泛應用於即時通信領域。XMPP協議的實現有很多種,比如JAVA的OpenFire,但相較其他實現,Ejabberd的並發性能無疑使最優秀的。XMPP協議的前身是Jabber協議,早期的Jabber協議主要包括在線狀態(Presence)、好友花名冊(Roster)、IQ(Info/Query)幾個部分。現在Jabber已經成為RFC的官方標準,如RFC2799,RFC4622,RFC6121,以及XMPP的擴展協議(XEP)。Worktile Web端的消息提醒功能就是基於XEP-0124、XEP-0206定義的BOSH擴展協議。
由於自身業務的需要,我們對Ejabberd的用戶認證和好友列表模塊的源碼進行修改,通過Redis保存用戶的在線狀態,而不是Mnesia和MySQL。另外好友這塊我們是從已有的資料庫中(MongoDB)中獲取項目或團隊的成員。Web端通過Strophe.JS來連接(HTTP-BIND),Strophe.JS可以以長輪詢和WebSocket兩種方式來連接,由於Ejabberd還沒有好的WebSocket的實現,就採用了BOSH的方式模擬長連接。整個系統的結構如下:
Web端用Strophe.JS通過HTTP-BIND進行連接Nginx代理,Nginx反向代理EjabberdCluster。iOS用XMPP-FramWork連接, Android可以用Smack直接連Ejabberd伺服器集群。這些都是現有的庫,無需對Client進行開發。在線狀態根據用戶UID作為KEY定義了在線、離線、忙等狀態存放於Redis中。好友列表從MongoDB的Project表中獲取。用戶認證直接修改了Ejabberd_Auth_Internal.erl文件,通過MongoDB驅動連接用戶庫,在線狀態等功能是新加了模塊,其部分代碼如下:
-module(wt_mod_proj). -behaviour(gen_mod). -behaviour(gen_server). -include("ejabberd.hrl"). -include("logger.hrl"). -include("jlib.hrl"). -define(SUPERVISOR, ejabberd_sup). ... -define(ONLINE, 1). -define(OFFLINE, 0). -define(BUSY, 2). -define(LEAVE, 3). ... %% API -export([start_link/2, get_proj_online_users/2]). %% gen_mod callbacks -export([start/2, stop/1]). %% gen_server callbacks -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). %% Hook callbacks -export([user_available/1, unset_presence/3, set_presence/4]). -export([get_redis/1, remove_online_user/3, append_online_user/3]). ... -record(state,{host = <<"">>, server_host, rconn, mconn}).start_link(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). user_available(New) -> LUser = New#jid.luser, LServer = New#jid.lserver, Proc = gen_mod:get_module_proc(LServer, ?MODULE), gen_server:cast(Proc, {user_available, LUser, LServer}).append_online_user(Uid, Proj, Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, {append_online_user, Uid, Proj}). remove_online_user(Uid, Proj, Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, {remove_online_user, Uid, Proj}). ... set_presence(User, Server, Resource, Packet) -> Proc = gen_mod:get_module_proc(Server, ?MODULE), gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}). ...start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, 2000, worker, [?MODULE]}, supervisor:start_child(?SUPERVISOR, ChildSpec).stop(Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, stop), supervisor:delete_child(?SUPERVISOR, Proc).init([Host, Opts]) -> MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>), RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST), RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT), ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100), ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50), ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50), MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST), MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT), {ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}), C = c(RedisHost, RedisPort), ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}. terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) -> ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100), ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50), ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50), eredis:stop(C), ok. ... handle_call({append_online_user, Uid, ProjId}, _From, State) -> C = State#state.rconn, Key = <<!--?PRE_RPOJ_ONLINE_USERS /binary, ProjId/binary-->>, Resp = eredis:q(C, ["SADD", Key, Uid]), {reply, Resp, State}; handle_call({remove_online_user, Uid, ProjId}, _From, State) -> ... handle_call({get_proj_online_users, ProjId}, _From, State) -> ... handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) -> C = State#state.rconn, Key = <<!--?USER_PRESENCE /binary, User/binary-->>, Pids = get_user_projs(User, Mongo), Cmd = get_proj_key(Pids, ["SUNION"]), case xml:get_subtag_cdata(Packet, <<"show">>) of <<"away">> -> eredis:q(C, ["SET", Key, ?LEAVE]); <<"offline">> -> ... handle_cast(_Msg, State) -> {noreply, State}.handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) -> case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of {'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]); _ -> ok end, {noreply, State};handle_info(_Info, State) -> {noreply, State}.code_change(_OldVsn, State, _Extra) -> {ok, State}. ...其中,User\_Available\_HOOK和SM\_Remove\_Connection\_HOOK 就是用戶上線和用戶斷開連接觸發的事件,Ejabberd 中正是由於這些HOOK,才能很容易擴展功能。
在用Tsung對Ejabberd進行壓力測試,測試機器為4核心8G內存的普通PC,以3臺客戶機模擬用戶登錄、設置在線狀態、發送一條文本消息、關閉連接操作,在同時在線達到30w時,CPU佔用不到3%,內存大概到3個G左右,隨著用戶數增多,主要內存的損耗較大。由於壓力測試比較耗時,再等到有時間的時候,會在做一些更深入的測試。
免費訂閱「CSDN雲計算(左)和CSDN大數據(右)」微信公眾號,實時掌握第一手雲中消息,了解最新的大數據進展!
CSDN發布虛擬化、Docker、OpenStack、CloudStack、數據中心等相關雲計算資訊, 分享Hadoop、Spark、NoSQL/NewSQL、HBase、Impala、內存計算、流計算、機器學習和智能算法等相關大數據觀點,提供雲計算和大數據技術、平臺、實踐和產業信息等服務。