在这个版本中,需要提供如下功能:
- 使用 otp 的 supervisor 监控树,保证服务可靠性。
- 添加日志功能,通过定制 sasl alarm_handler 来记录警告事件。
- 将名称服务打包为 application,暂且叫 vsns 吧,very stabilization name server 呵呵。
- 开放 socket 服务 (使用半阻塞的混合模式),使用 vsns://verb /param 自定义协议对外提供访问支持。
最终验证性的功能测试用例如下,主要的测试代码位于 test/0 方法中,其上的几个方法都用于 socket 通信:
- -module(vsns_tcp_client).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.06").
- -vsn(0.11).
- -compile(export_all).
- conn() ->
- {ok, Socket} = gen_tcp:connect("localhost", 8304,
- [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
- Socket.
- eval(Socket, Args, AssertVal) ->
- ok = gen_tcp:send(Socket, Args),
- receive
- {tcp, _, AssertVal} ->
- io:format("Ok. ~p = ~p.~n", [Args, AssertVal]);
- {tcp_closed, _} ->
- case Args of
- <<"vsns://kernel_oops">> ->
- io:format("Ok. kernel_oops = tcp_closed.~n");
- _Other ->
- io:format("Connection abort by server.~n")
- end;
- Other ->
- io:format("Assert faild. ~p != ~p.~n", [Other, AssertVal])
- end,
- inet:setopts(Socket, [{active, once}]).
- close(Socket) ->
- gen_tcp:close(Socket).
- test() ->
- S = conn(),
- eval(S, <<"vsns://remove_all">>, <<"ack">>),
- eval(S, <<"vsns://save/abc/123">>, <<"">>),
- eval(S, <<"vsns://save/abc/456">>, <<"123">>),
- eval(S, <<"vsns://save/abc/789">>, <<"456">>),
- eval(S, <<"vsns://load_all">>, <<"ack">>),
- eval(S, <<"vsns://remove/abc">>, <<"789">>),
- eval(S, <<"vsns://remove/not_value">>, <<"">>),
- eval(S, <<"foo">>, <<"unknow">>),
- eval(S, <<"vsns://kernel_oops">>, <<"">>),
- ok = close(S),
- pass.
- %% File end.
实际实现 supervisor 监控树、日志和警告事件功能的过程,也是学习 《Erlang 程序设计》的过程。
首先,为名称服务添加监控进程。erlang otp 监控树很简单,只需要实现一个 supervisor behaviour module 提供给 otp supervisor 模块就可以,前面版本的名称服务是通过 erlang shell 启动的,在以后将由这个监控进程来启动她,主要的启动代码在 init/1 方法中,监控模块代码如下:
- -module(name_server_sup).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.04").
- -vsn(0.1).
- -behaviour(supervisor).
- %% gen_supervisor behaviour callback functions.
- -export([init/1]).
- %% Interface functions.
- -export([start/0, start_in_shell/0, start_link/1]).
- start() ->
- spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end).
- start_in_shell() ->
- {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []),
- unlink(Pid).
- start_link(Args) ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
- init([]) ->
- gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {vsns_alarm_handler, foo}),
- {ok, {
- {one_for_one, 3, 10},
- [{
- vsns_name_server,
- {name_server, start_link, []},
- permanent,
- 1,
- worker,
- [name_server]
- }]
- }}.
- %% File end.
有了这个 name_server_sup 就不怕 name_server 崩溃了,supervisor 进程会负责重新启动,对于描述监控策略的数据结构可参考 erlang doc。其中的 vsns_alarm_handler 是定制的警告事件处理模块,负责将服务中的报警记录到 erlang sasl 日志中,后期可以使用 rb 工具来查看处理。接下来就是警告日志处理模块代码:
- -module(vsns_alarm_handler).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.04").
- -vsn(0.11).
- -behaviour(gen_event).
- %% gen_event behaviour callback functions.
- -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
- init(Args) ->
- io:format("vsns_alarm_handler init : ~p.~n", [Args]),
- {ok, Args}.
- handle_event({set_alarm, {remove_all, From}}, _State) ->
- error_logger:error_msg("vsns depot clear by ~p started.~n.", [From]),
- {ok, _State};
- handle_event({clear_alarm, {remove_all, From}}, _State) ->
- error_logger:error_msg("vsns depot clear by ~p done.~n.", [From]),
- {ok, _State};
- handle_event(Event, State) ->
- error_logger:error_msg("unmatched event: ~p.~n", [Event, State]),
- {ok, State}.
- handle_call(_Req, State) ->
- {ok, State, State}.
- handle_info(_Info, State) ->
- {ok, State}.
- terminate(_Reason, _State) ->
- ok.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% File end.
归根到底,就是通过 error_logger:error_msg 调用来记录日志。当然还涉及到 erlang sasl 的配置:
- %% file name: sasl_log.config
- %% auther: lzy
- %% email: lzy.dev@gmail.com
- %% date: 2009.02.04
- %% version: 0.1
- [{sasl, [
- {sasl_error_logger, false},
- {errlog_type, error},
- {error_logger_mf_dir, "./logs"},
- %% 10M per log file.
- {error_logger_mf_maxbytes, 1048760},
- {error_logger_mf_maxfiles, 5}
- ]}].
- %% File end.
该配置文件可以通过 erlang shell 的 启动启动参数指定。-boot start_sasl -config .\sasl_log。再接下来就是打包 vsns application,这需要一个 application 描述文件和一个 application behavior 模块,很简单具体配置参数语意可参考 erlang doc。
- %% file name: vsns.app
- %% auther: lzy
- %% email: lzy.dev@gmail.com
- %% date: 2009.02.05
- %% version: 0.1
- {
- application, vsns,
- [
- {description, "very stabilization name service."},
- {vsn, "1.0a"},
- {modules, [vsns_app, vsns_supervisor, name_server, vsns_alarm_handler]},
- {registered, [vsns_supervisor, name_server]},
- {applications, [kernel, stdlib]},
- {mod, {vsns_app, []}},
- {start_phases, []}
- ]
- }.
- %% File end.
- -module(vsns_app).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.05").
- -vsn(0.1).
- -behavior(application).
- -export([start/2, stop/1]).
- start(_Type, Args) ->
- name_server_sup:start_link(Args).
- stop(_State) ->
- void.
- %% File end.
经过这样的包装,就可以通过 application:start(vsns) 调用来启动 vsns 服务。通过 appmon 工具可以看到如下进程树:
到这里,我们就可以通过 erlang 来使用 vsns 了。
- C:\Program Files\erl5.6.4\usr\lzy_app\vsns>..\..\..\bin\erl.exe -sname vsns +P 1
- 02400 -smp enable +S 1 -boot start_sasl -config sasl_log
- Eshell V5.6.4 (abort with ^G)
- (vsns@srclzy)1> application:start(vsns).
- vsns_alarm_handler init : {foo,{alarm_handler,[]}}.
- name_server starting.
- ok
- (vsns@srclzy)2> name_server:save(abc, 123).
- undefined
- (vsns@srclzy)3> name_server:load_all().
- [{abc,123}]
最后还需要一个 socket tcp 服务器,来将 vsns 暴露出来,允许其它 client 来使用服务。otp 中没有类似的 socket server behavior,但可以通过 gen_server 来实现,当然甚至可以实现一个非 otp 相关的 socket 服务器。这里 Serge Aleynikov 实现了一个很好 tcp 服务器,基于有限状态机模式来处理请求,在此做了很好的阐述:Building a Non-blocking TCP server using OTP principles ,不过恐怕需要代理来打开连接。在他给出的代码中,我添加了几行代码,将 socket server 提供的服务是做为可配置的,通过 application 环境来配置 socket server 使用的 gen_fsm behaviour module,大约位于 tcp_server_app 模块的 15 和 27 行。
- -module(tcp_server_app).
- ... ...
- -define(DEF_SERVICE, tcp_echo_fsm).
- ... ...
- start(_Type, _Args) ->
- ListenPort = get_app_env(listen_port, ?DEF_PORT),
- ServiceMod = get_app_env(service_mod, ?DEF_SERVICE),
- supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, ServiceMod]).
- ... ...
在 saleyn_tcp_server 中提供的是 echo 服务。为了将 saleyn_tcp_server 服务指定成 vsns,除了上面的修改外,剩下就只需要实现一个调用 vsns 的 gen_fsm behaviour module 了,代码很简单,是基于 tcp_echo_fsm 修改得来的,呵呵。
- -module(vsns_tcp_fsm).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.06").
- -vsn(0.1).
- -remark("vsns_tcp_fsm used by saleyn_tcp_server appliction to support vsns socket server.").
- -remark("It referenced from saleyn_tcp_server/tcp_echo_fsm module.").
- -behaviour(gen_fsm).
- -export([start_link/0, set_socket/2]).
- %% gen_fsm callbacks
- -export([init/1, handle_event/3,
- handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
- %% FSM States
- -export([
- 'WAIT_FOR_SOCKET'/2,
- 'WAIT_FOR_DATA'/2
- ]).
- -record(state, {
- socket, % client socket
- addr % client address
- }).
- -define(TIMEOUT, 120000).
- %%%------------------------------------------------------------------------
- %%% API
- %%%------------------------------------------------------------------------
- %%-------------------------------------------------------------------------
- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
- %% @doc To be called by the supervisor in order to start the server.
- %% If init/1 fails with Reason, the function returns {error,Reason}.
- %% If init/1 returns {stop,Reason} or ignore, the process is
- %% terminated and the function returns {error,Reason} or ignore,
- %% respectively.
- %% @end
- %%-------------------------------------------------------------------------
- start_link() ->
- gen_fsm:start_link(?MODULE, [], []).
- set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
- gen_fsm:send_event(Pid, {socket_ready, Socket}).
- %%%------------------------------------------------------------------------
- %%% Callback functions from gen_server
- %%%------------------------------------------------------------------------
- %%-------------------------------------------------------------------------
- %% Func: init/1
- %% Returns: {ok, StateName, StateData} |
- %% {ok, StateName, StateData, Timeout} |
- %% ignore |
- %% {stop, StopReason}
- %% @private
- %%-------------------------------------------------------------------------
- init([]) ->
- process_flag(trap_exit, true),
- {ok, 'WAIT_FOR_SOCKET', #state{}}.
- %%-------------------------------------------------------------------------
- %% Func: StateName/2
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
- % Now we own the socket
- inet:setopts(Socket, [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
- {ok, {IP, _Port}} = inet:peername(Socket),
- {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
- 'WAIT_FOR_SOCKET'(Other, State) ->
- error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
- %% Allow to receive async messages
- {next_state, 'WAIT_FOR_SOCKET', State}.
- %% Notification event coming from client
- 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
- ok = handle_data(S, string:tokens(binary_to_list(Data), "/")),
- inet:setopts(S, [{active, once}]),
- {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
- 'WAIT_FOR_DATA'(timeout, State) ->
- error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
- {stop, normal, State};
- 'WAIT_FOR_DATA'(Data, State) ->
- io:format("~p Ignoring data: ~p\n", [self(), Data]),
- {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.
- %%-------------------------------------------------------------------------
- %% Func: handle_event/3
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_event(Event, StateName, StateData) ->
- {stop, {StateName, undefined_event, Event}, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: handle_sync_event/4
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {reply, Reply, NextStateName, NextStateData} |
- %% {reply, Reply, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData} |
- %% {stop, Reason, Reply, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_sync_event(Event, _From, StateName, StateData) ->
- {stop, {StateName, undefined_event, Event}, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: handle_info/3
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
- % Flow control: enable forwarding of next TCP message
- inet:setopts(Socket, [{active, once}]),
- ?MODULE:StateName({data, Bin}, StateData);
- handle_info({tcp_closed, Socket}, _StateName,
- #state{socket=Socket, addr=Addr} = StateData) ->
- error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
- {stop, normal, StateData};
- handle_info(_Info, StateName, StateData) ->
- {noreply, StateName, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: terminate/3
- %% Purpose: Shutdown the fsm
- %% Returns: any
- %% @private
- %%-------------------------------------------------------------------------
- terminate(_Reason, _StateName, #state{socket=Socket}) ->
- (catch gen_tcp:close(Socket)),
- ok.
- %%-------------------------------------------------------------------------
- %% Func: code_change/4
- %% Purpose: Convert process state when code is changed
- %% Returns: {ok, NewState, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- code_change(_OldVsn, StateName, StateData, _Extra) ->
- {ok, StateName, StateData}.
- handle_data(S, ["vsns:", "save", Key, Value]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:save(Key, Value))));
- handle_data(S, ["vsns:", "load", Key]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:load(Key))));
- handle_data(S, ["vsns:", "load_all"]) ->
- name_server:load_all(),
- gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:load_all())
- handle_data(S, ["vsns:", "remove", Key]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:remove(Key))));
- handle_data(S, ["vsns:", "remove_all"]) ->
- name_server:remove_all(),
- gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:remove_all())
- handle_data(S, ["vsns:", "kernel_oops"]) ->
- gen_tcp:send(S, list_to_binary(name_server:kernel_oops()));
- handle_data(S, _Data) ->
- gen_tcp:send(S, <<"unknow">>).
- swap_undefined(undefined) ->
- "";
- swap_undefined(Other) ->
- Other.
- % File end.