当前位置:首页 > 网站旧栏目 > 学习园地 > 设计软件教程 > gen_server tasting 之超简单名称服务(续)

gen_server tasting 之超简单名称服务(续)
2010-01-13 23:16:25  作者:  来源:
    前几天写了篇《gen_server tasting 之超简单名称服务 》东西,亲身体验了 erlang otp 的强悍威力。不过正所谓“超简单”,那个版本还是很初级的,所以这两天边继续研究边动手,开发迭代版本的名称服务。

 

在这个版本中,需要提供如下功能:

 

  1. 使用 otp 的 supervisor 监控树,保证服务可靠性。
  2. 添加日志功能,通过定制 sasl alarm_handler 来记录警告事件。
  3. 将名称服务打包为 application,暂且叫 vsns 吧,very stabilization name server 呵呵。
  4. 开放 socket 服务 (使用半阻塞的混合模式),使用 vsns://verb /param 自定义协议对外提供访问支持。

最终验证性的功能测试用例如下,主要的测试代码位于 test/0 方法中,其上的几个方法都用于 socket 通信:

 

Erlang代码 复制代码
  1. -module(vsns_tcp_client).   
  2.   
  3. -author(lzy).   
  4. -email(lzy.dev@gmail.com).   
  5. -date("2009.02.06").   
  6. -vsn(0.11).   
  7.   
  8. -compile(export_all).   
  9.   
  10. conn() ->   
  11.     {ok, Socket} = gen_tcp:connect("localhost"8304,   
  12.         [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),   
  13.     Socket.   
  14.   
  15. eval(Socket, Args, AssertVal) ->   
  16.     ok = gen_tcp:send(Socket, Args),   
  17.     receive   
  18.         {tcp, _, AssertVal} ->   
  19.             io:format("Ok. ~p = ~p.~n", [Args, AssertVal]);   
  20.         {tcp_closed, _} ->   
  21.             case Args of   
  22.                 <<"vsns://kernel_oops">> ->   
  23.                     io:format("Ok. kernel_oops = tcp_closed.~n");   
  24.                 _Other ->   
  25.                     io:format("Connection abort by server.~n")   
  26.             end;   
  27.         Other  ->   
  28.             io:format("Assert faild. ~p != ~p.~n", [Other, AssertVal])   
  29.     end,   
  30.     inet:setopts(Socket, [{active, once}]).   
  31.   
  32. close(Socket) ->   
  33.     gen_tcp:close(Socket).   
  34.   
  35. test() ->   
  36.     S = conn(),   
  37.   
  38.     eval(S, <<"vsns://remove_all">>, <<"ack">>),   
  39.   
  40.     eval(S, <<"vsns://save/abc/123">>, <<"">>),   
  41.     eval(S, <<"vsns://save/abc/456">>, <<"123">>),   
  42.     eval(S, <<"vsns://save/abc/789">>, <<"456">>),   
  43.   
  44.     eval(S, <<"vsns://load_all">>, <<"ack">>),   
  45.   
  46.     eval(S, <<"vsns://remove/abc">>, <<"789">>),   
  47.     eval(S, <<"vsns://remove/not_value">>, <<"">>),   
  48.   
  49.     eval(S, <<"foo">>, <<"unknow">>),   
  50.   
  51.     eval(S, <<"vsns://kernel_oops">>, <<"">>),   
  52.   
  53.     ok = close(S),   
  54.   
  55.     pass.   
  56.   
  57. %% File end.  

 

          实际实现 supervisor 监控树、日志和警告事件功能的过程,也是学习 《Erlang 程序设计》的过程。

 

          首先,为名称服务添加监控进程。erlang otp 监控树很简单,只需要实现一个 supervisor behaviour module 提供给 otp supervisor 模块就可以,前面版本的名称服务是通过 erlang shell 启动的,在以后将由这个监控进程来启动她,主要的启动代码在 init/1 方法中,监控模块代码如下:

 

Erlang代码 复制代码
  1. -module(name_server_sup).   
  2.   
  3. -author(lzy).   
  4. -email(lzy.dev@gmail.com).   
  5. -date("2009.02.04").   
  6. -vsn(0.1).   
  7.   
  8. -behaviour(supervisor).   
  9.   
  10. %% gen_supervisor behaviour callback functions.   
  11. -export([init/1]).   
  12.   
  13. %% Interface functions.   
  14. -export([start/0, start_in_shell/0, start_link/1]).   
  15.   
  16. start() ->   
  17.     spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end).   
  18.   
  19. start_in_shell() ->   
  20.     {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []),   
  21.     unlink(Pid).   
  22.   
  23. start_link(Args) ->   
  24.     supervisor:start_link({local, ?MODULE}, ?MODULE, Args).   
  25.   
  26. init([]) ->   
  27.     gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {vsns_alarm_handler, foo}),   
  28.   
  29.     {ok, {   
  30.             {one_for_one, 310},   
  31.             [{   
  32.                 vsns_name_server,   
  33.                 {name_server, start_link, []},   
  34.                 permanent,   
  35.                 1,   
  36.                 worker,   
  37.                 [name_server]   
  38.             }]         
  39.     }}.   
  40.   
  41. %% File end.  

 

          有了这个 name_server_sup 就不怕 name_server 崩溃了,supervisor 进程会负责重新启动,对于描述监控策略的数据结构可参考 erlang doc。其中的 vsns_alarm_handler 是定制的警告事件处理模块,负责将服务中的报警记录到 erlang sasl 日志中,后期可以使用 rb 工具来查看处理。接下来就是警告日志处理模块代码:

 

Erlang代码 复制代码
  1. -module(vsns_alarm_handler).   
  2.   
  3. -author(lzy).   
  4. -email(lzy.dev@gmail.com).   
  5. -date("2009.02.04").   
  6. -vsn(0.11).   
  7.   
  8. -behaviour(gen_event).   
  9.   
  10. %% gen_event behaviour callback functions.   
  11. -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).   
  12.   
  13. init(Args) ->   
  14.     io:format("vsns_alarm_handler init : ~p.~n", [Args]),   
  15.     {ok, Args}.   
  16.   
  17. handle_event({set_alarm, {remove_all, From}}, _State) ->   
  18.     error_logger:error_msg("vsns depot clear by ~p started.~n.", [From]),   
  19.     {ok, _State};   
  20.   
  21. handle_event({clear_alarm, {remove_all, From}}, _State) ->   
  22.     error_logger:error_msg("vsns depot clear by ~p done.~n.", [From]),   
  23.     {ok, _State};   
  24.   
  25. handle_event(Event, State) ->   
  26.     error_logger:error_msg("unmatched event: ~p.~n", [Event, State]),   
  27.     {ok, State}.   
  28.   
  29. handle_call(_Req, State) ->   
  30.     {ok, State, State}.   
  31.        
  32. handle_info(_Info, State) ->   
  33.     {ok, State}.   
  34.   
  35. terminate(_Reason, _State) ->   
  36.     ok.   
  37.   
  38. code_change(_OldVsn, State, _Extra) ->   
  39.     {ok, State}.   
  40.   
  41. %% File end.  
 

          归根到底,就是通过 error_logger:error_msg 调用来记录日志。当然还涉及到 erlang sasl 的配置:

 

Config代码 复制代码
  1. %% file name: sasl_log.config   
  2. %% auther: lzy   
  3. %% email: lzy.dev@gmail.com   
  4. %% date: 2009.02.04  
  5. %% version: 0.1  
  6.   
  7. [{sasl, [   
  8.     {sasl_error_logger, false},    
  9.     {errlog_type, error},   
  10.     {error_logger_mf_dir, "./logs"},   
  11.     %% 10M per log file.   
  12.     {error_logger_mf_maxbytes, 1048760},   
  13.     {error_logger_mf_maxfiles, 5}   
  14. ]}].   
  15.   
  16. %% File end.  
 

          该配置文件可以通过 erlang shell 的 启动启动参数指定。-boot start_sasl -config .\sasl_log。再接下来就是打包 vsns application,这需要一个 application 描述文件和一个 application behavior 模块,很简单具体配置参数语意可参考 erlang doc。

 

App代码 复制代码
  1. %% file name: vsns.app   
  2. %% auther: lzy   
  3. %% email: lzy.dev@gmail.com   
  4. %% date: 2009.02.05  
  5. %% version: 0.1  
  6.   
  7. {   
  8.     application, vsns,   
  9.     [   
  10.         {description, "very stabilization name service."},   
  11.         {vsn, "1.0a"},   
  12.         {modules, [vsns_app, vsns_supervisor, name_server, vsns_alarm_handler]},   
  13.         {registered, [vsns_supervisor, name_server]},   
  14.         {applications, [kernel, stdlib]},   
  15.         {mod, {vsns_app, []}},   
  16.         {start_phases, []}   
  17.     ]   
  18. }.   
  19.   
  20. %% File end.  
 
Erlang代码 复制代码
  1. -module(vsns_app).   
  2.   
  3. -author(lzy).   
  4. -email(lzy.dev@gmail.com).   
  5. -date("2009.02.05").   
  6. -vsn(0.1).   
  7.   
  8. -behavior(application).   
  9.   
  10. -export([start/2, stop/1]).   
  11.   
  12. start(_Type, Args) ->   
  13.     name_server_sup:start_link(Args).   
  14.   
  15. stop(_State) ->   
  16.     void.   
  17.   
  18. %% File end.  

 

          经过这样的包装,就可以通过 application:start(vsns) 调用来启动 vsns 服务。通过 appmon 工具可以看到如下进程树:

 

vsns 进程树

 

到这里,我们就可以通过 erlang 来使用 vsns 了。

 

Erlang代码 复制代码
  1. C:\Program Files\erl5.6.4\usr\lzy_app\vsns>..\..\..\bin\erl.exe -sname vsns +P 1  
  2. 02400 -smp enable +S 1 -boot start_sasl -config sasl_log   
  3. Eshell V5.6.4  (abort with ^G)   
  4. (vsns@srclzy)1> application:start(vsns).   
  5. vsns_alarm_handler init : {foo,{alarm_handler,[]}}.   
  6. name_server starting.   
  7. ok   
  8. (vsns@srclzy)2> name_server:save(abc, 123).   
  9. undefined   
  10. (vsns@srclzy)3> name_server:load_all().   
  11. [{abc,123}]  
 

          最后还需要一个 socket tcp 服务器,来将 vsns 暴露出来,允许其它 client 来使用服务。otp 中没有类似的 socket server behavior,但可以通过 gen_server 来实现,当然甚至可以实现一个非 otp 相关的 socket 服务器。这里 Serge Aleynikov 实现了一个很好 tcp 服务器,基于有限状态机模式来处理请求,在此做了很好的阐述:Building a Non-blocking TCP server using OTP principles ,不过恐怕需要代理来打开连接。在他给出的代码中,我添加了几行代码,将 socket server 提供的服务是做为可配置的,通过 application 环境来配置 socket server 使用的 gen_fsm behaviour module,大约位于 tcp_server_app 模块的 15 和 27 行。

 

Erlang代码 复制代码
  1. -module(tcp_server_app).   
  2.   
  3. ... ...   
  4.   
  5. -define(DEF_SERVICE, tcp_echo_fsm).   
  6.   
  7. ... ...   
  8.   
  9. start(_Type,  _Args) ->   
  10.     ListenPort = get_app_env(listen_port, ?DEF_PORT),    
  11.     ServiceMod = get_app_env(service_mod, ?DEF_SERVICE),    
  12.     supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, ServiceMod]).   
  13.   
  14. ... ...  
 

          在 saleyn_tcp_server 中提供的是 echo 服务。为了将 saleyn_tcp_server 服务指定成 vsns,除了上面的修改外,剩下就只需要实现一个调用 vsns 的 gen_fsm behaviour module 了,代码很简单,是基于 tcp_echo_fsm 修改得来的,呵呵。

 

Erlang代码 复制代码
  1. -module(vsns_tcp_fsm).   
  2.   
  3. -author(lzy).   
  4. -email(lzy.dev@gmail.com).   
  5. -date("2009.02.06").   
  6. -vsn(0.1).   
  7. -remark("vsns_tcp_fsm used by saleyn_tcp_server appliction to support vsns socket server.").   
  8. -remark("It referenced from saleyn_tcp_server/tcp_echo_fsm module.").   
  9.   
  10. -behaviour(gen_fsm).   
  11.   
  12. -export([start_link/0, set_socket/2]).   
  13.   
  14. %% gen_fsm callbacks   
  15. -export([init/1, handle_event/3,   
  16.          handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).   
  17.   
  18. %% FSM States   
  19. -export([   
  20.     'WAIT_FOR_SOCKET'/2,   
  21.     'WAIT_FOR_DATA'/2  
  22. ]).   
  23.   
  24. -record(state, {   
  25.                 socket,    % client socket   
  26.                 addr       % client address   
  27.                }).   
  28.   
  29. -define(TIMEOUT, 120000).   
  30.   
  31. %%%------------------------------------------------------------------------   
  32. %%% API   
  33. %%%------------------------------------------------------------------------   
  34.   
  35. %%-------------------------------------------------------------------------   
  36. %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}   
  37. %% @doc To be called by the supervisor in order to start the server.   
  38. %%      If init/1 fails with Reason, the function returns {error,Reason}.   
  39. %%      If init/1 returns {stop,Reason} or ignore, the process is   
  40. %%      terminated and the function returns {error,Reason} or ignore,   
  41. %%      respectively.   
  42. %% @end   
  43. %%-------------------------------------------------------------------------   
  44. start_link() ->   
  45.     gen_fsm:start_link(?MODULE, [], []).   
  46.   
  47. set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->   
  48.     gen_fsm:send_event(Pid, {socket_ready, Socket}).   
  49.   
  50. %%%------------------------------------------------------------------------   
  51. %%% Callback functions from gen_server   
  52. %%%------------------------------------------------------------------------   
  53.   
  54. %%-------------------------------------------------------------------------   
  55. %% Func: init/1  
  56. %% Returns: {ok, StateName, StateData}          |   
  57. %%          {ok, StateName, StateData, Timeout} |   
  58. %%          ignore                              |   
  59. %%          {stop, StopReason}   
  60. %% @private   
  61. %%-------------------------------------------------------------------------   
  62. init([]) ->   
  63.     process_flag(trap_exit, true),   
  64.     {ok, 'WAIT_FOR_SOCKET', #state{}}.   
  65.   
  66. %%-------------------------------------------------------------------------   
  67. %% Func: StateName/2  
  68. %% Returns: {next_state, NextStateName, NextStateData}          |   
  69. %%          {next_state, NextStateName, NextStateData, Timeout} |   
  70. %%          {stop, Reason, NewStateData}   
  71. %% @private   
  72. %%-------------------------------------------------------------------------   
  73. 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->   
  74.     % Now we own the socket   
  75.     inet:setopts(Socket, [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),   
  76.     {ok, {IP, _Port}} = inet:peername(Socket),   
  77.     {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};   
  78.   
  79. 'WAIT_FOR_SOCKET'(Other, State) ->   
  80.     error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),   
  81.     %% Allow to receive async messages   
  82.     {next_state, 'WAIT_FOR_SOCKET', State}.   
  83.   
  84. %% Notification event coming from client   
  85. 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->   
  86.     ok = handle_data(S, string:tokens(binary_to_list(Data), "/")),   
  87.     inet:setopts(S, [{active, once}]),   
  88.     {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};   
  89.   
  90. 'WAIT_FOR_DATA'(timeout, State) ->   
  91.     error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),   
  92.     {stop, normal, State};   
  93.   
  94. 'WAIT_FOR_DATA'(Data, State) ->   
  95.     io:format("~p Ignoring data: ~p\n", [self(), Data]),   
  96.     {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.   
  97.   
  98. %%-------------------------------------------------------------------------   
  99. %% Func: handle_event/3  
  100. %% Returns: {next_state, NextStateName, NextStateData}          |   
  101. %%          {next_state, NextStateName, NextStateData, Timeout} |   
  102. %%          {stop, Reason, NewStateData}   
  103. %% @private   
  104. %%-------------------------------------------------------------------------   
  105. handle_event(Event, StateName, StateData) ->   
  106.     {stop, {StateName, undefined_event, Event}, StateData}.   
  107.   
  108. %%-------------------------------------------------------------------------   
  109. %% Func: handle_sync_event/4  
  110. %% Returns: {next_state, NextStateName, NextStateData}            |   
  111. %%          {next_state, NextStateName, NextStateData, Timeout}   |   
  112. %%          {reply, Reply, NextStateName, NextStateData}          |   
  113. %%          {reply, Reply, NextStateName, NextStateData, Timeout} |   
  114. %%          {stop, Reason, NewStateData}                          |   
  115. %%          {stop, Reason, Reply, NewStateData}   
  116. %% @private   
  117. %%-------------------------------------------------------------------------   
  118. handle_sync_event(Event, _From, StateName, StateData) ->   
  119.     {stop, {StateName, undefined_event, Event}, StateData}.   
  120.   
  121. %%-------------------------------------------------------------------------   
  122. %% Func: handle_info/3  
  123. %% Returns: {next_state, NextStateName, NextStateData}          |   
  124. %%          {next_state, NextStateName, NextStateData, Timeout} |   
  125. %%          {stop, Reason, NewStateData}   
  126. %% @private   
  127. %%-------------------------------------------------------------------------   
  128. handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->   
  129.     % Flow control: enable forwarding of next TCP message   
  130.     inet:setopts(Socket, [{active, once}]),   
  131.     ?MODULE:StateName({data, Bin}, StateData);   
  132.   
  133. handle_info({tcp_closed, Socket}, _StateName,   
  134.             #state{socket=Socket, addr=Addr} = StateData) ->   
  135.     error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),   
  136.     {stop, normal, StateData};   
  137.   
  138. handle_info(_Info, StateName, StateData) ->   
  139.     {noreply, StateName, StateData}.   
  140.   
  141. %%-------------------------------------------------------------------------   
  142. %% Func: terminate/3  
  143. %% Purpose: Shutdown the fsm   
  144. %% Returns: any   
  145. %% @private   
  146. %%-------------------------------------------------------------------------   
  147. terminate(_Reason, _StateName, #state{socket=Socket}) ->   
  148.     (catch gen_tcp:close(Socket)),   
  149.     ok.   
  150.   
  151. %%-------------------------------------------------------------------------   
  152. %% Func: code_change/4  
  153. %% Purpose: Convert process state when code is changed   
  154. %% Returns: {ok, NewState, NewStateData}   
  155. %% @private   
  156. %%-------------------------------------------------------------------------   
  157. code_change(_OldVsn, StateName, StateData, _Extra) ->   
  158.     {ok, StateName, StateData}.   
  159.   
  160. handle_data(S, ["vsns:""save", Key, Value]) ->   
  161.     gen_tcp:send(S, list_to_binary(swap_undefined(name_server:save(Key, Value))));   
  162.   
  163. handle_data(S, ["vsns:""load", Key]) ->   
  164.     gen_tcp:send(S, list_to_binary(swap_undefined(name_server:load(Key))));   
  165.   
  166. handle_data(S, ["vsns:""load_all"]) ->   
  167.     name_server:load_all(),   
  168.     gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:load_all())   
  169.   
  170. handle_data(S, ["vsns:""remove", Key]) ->   
  171.     gen_tcp:send(S, list_to_binary(swap_undefined(name_server:remove(Key))));   
  172.   
  173. handle_data(S, ["vsns:""remove_all"]) ->   
  174.     name_server:remove_all(),   
  175.     gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:remove_all())   
  176.   
  177. handle_data(S, ["vsns:""kernel_oops"]) ->   
  178.     gen_tcp:send(S, list_to_binary(name_server:kernel_oops()));   
  179.   
  180. handle_data(S, _Data) ->   
  181.     gen_tcp:send(S, <<"unknow">>).   
  182.   
  183. swap_undefined(undefined) ->   
  184.     "";   
  185.   
  186. swap_undefined(Other) ->   
  187.     Other.   
  188.   
  189. %   File end.