gen_server源码阅读
gen_server是Erlang/OTP的通用服务器框架,使用最为广泛。源代码很少,只有几百行。抽空详细梳理了一遍流程,以简单的gen_server应用worker为例。
先看我们自己的worker.erl文件,根据OTP设计规范,既是回调模块,又是用户接口。对外导出 start_link/0,1 , query/1 , set/1 , show/1 , stop/1,同时实现gen_server行为模式的6个必要的回调函数:
%% gen_server
-module(worker).
-behaviour(gen_server).
-export([start_link/1, start_link/0, query/1, set/1, show/1, stop/0]).
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
-record(state, {key::term(), value::term()}).
-define(SERVER, ?MODULE).
-define(PARAM, {'0x01f', 'test'}).
%% export
start_link() ->
start_link(?PARAM).
start_link(Param) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Param], [{timeout, 10}]).
stop() ->
gen_server:stop(?SERVER).
query(Key) ->
gen_server:call(?SERVER, {query, Key}).
set({Key, Value}) ->
gen_server:call(?SERVER, {set, Key, Value}).
show(Key) ->
gen_server:cast(?SERVER, {show, Key}).
%% callbacks
init([{Key, Value}]) ->
io:format("Saving key: ~p with value ~p, self: ~p~n", [Key, Value, self()]),
{ok, #state{key=Key, value=Value}, 0}.
handle_call({query,Key}, From, State=#state{key=Key, value=Value}) ->
io:format("query from ~p, self: ~p~n", [From, self()]),
{reply, {ok, Value}, State};
handle_call({set,Key,Value}, From, State) ->
io:format("set from ~p, self: ~p~n", [From, self()]),
{reply, {ok, {Key, Value}}, State#state{key=Key, value=Value}}.
handle_cast({show, Key}, State=#state{key=Key, value=Value}) ->
io:format("show, key ~p value ~p, self: ~p~n", [Key, Value, self()]),
{noreply, State}.
handle_info(timeout, State) ->
io:format("Timeout~n"),
{noreply, State}.
terminate(Reason, _State) ->
io:format("terminate reason ~p, self: ~p~n", [Reason, self()]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
有必要先看一下gen_server的注释和回调函数定义,简单介绍了gen_server设计理念和调用流程。源码永远比任何解释要精确。
%% gen_server.erl
%%% ---------------------------------------------------
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%
%%% The user module should export:
%%%
%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% ignore
%%% {stop, Reason}
%%%
%%% handle_call(Msg, {From, Tag}, State)
%%%
%%% ==> {reply, Reply, State}
%%% {reply, Reply, State, Timeout}
%%% {noreply, State}
%%% {noreply, State, Timeout}
%%% {stop, Reason, Reply, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_cast(Msg, State)
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
%%% always called when server terminates
%%%
%%% ==> ok
%%%
%%%
%%% The work flow (of the server) can be described as follows:
%%%
%%% User module Generic
%%% ----------- -------
%%% start -----> start
%%% init <----- .
%%%
%%% loop
%%% handle_call <----- .
%%% -----> reply
%%%
%%% handle_cast <----- .
%%%
%%% handle_info <----- .
%%%
%%% terminate <----- .
%%%
%%% -----> reply
%%%
%%%
%%% ---------------------------------------------------
-callback init(Args :: term()) ->
{ok, State :: term()} | {ok, State :: term(), timeout() | hibernate} |
{stop, Reason :: term()} | ignore.
-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply, Reply :: term(), NewState :: term(), timeout() | hibernate} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}.
-callback handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate} |
{stop, Reason :: term(), NewState :: term()}.
-callback handle_info(Info :: timeout | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate} |
{stop, Reason :: term(), NewState :: term()}.
-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} |
term()),
State :: term()) ->
term().
-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(),
Extra :: term()) ->
{ok, NewState :: term()} | {error, Reason :: term()}.
-callback format_status(Opt, StatusData) -> Status when
Opt :: 'normal' | 'terminate',
StatusData :: [PDict | State],
PDict :: [{Key :: term(), Value :: term()}],
State :: term(),
Status :: term().
-optional_callbacks([format_status/2]).
gen_server(或者其他模式)的核心思想就是用户提供回调函数,gen_server框架提供进程管理、消息分发等,尽可能的把功能无关的代码封装在背后。
现在开始解释我们的五个导出函数如何和gen_server框架以及Erlang交互。
启动
gen_server:start_link({local, ?SERVER}, ?MODULE, [Param], [{timeout, 10}]).
调用gen_server模块的start_link/4,具体到代码:
%% gen_server.erl
start_link(Name, Mod, Args, Options) ->
gen:start(?MODULE, link, Name, Mod, Args, Options).
又进一步调用了gen模块的start/6函数,link 表示链接程序,gen_server:start/3,4 调用时会传入nolink。注意传给gen:start的第一个参数是 ?MODULE, 即 gen_server原子。
%% gen.erl
%%-----------------------------------------------------------------
%% Starts a generic process.
%% start(GenMod, LinkP, Mod, Args, Options)
%% start(GenMod, LinkP, Name, Mod, Args, Options)
%% GenMod = atom(), callback module implementing the 'real' fsm
%% LinkP = link | nolink
%% Name = {local, atom()} | {global, term()} | {via, atom(), term()}
%% Args = term(), init arguments (to Mod:init/1)
%% Options = [{timeout, Timeout} | {debug, [Flag]} | {spawn_opt, OptionList}]
%% Flag = trace | log | {logfile, File} | statistics | debug
%% (debug == log && statistics)
%% Returns: {ok, Pid} | ignore |{error, Reason} |
%% {error, {already_started, Pid}} |
%% The 'already_started' is returned only if Name is given
%%-----------------------------------------------------------------
-spec start(module(), linkage(), emgr_name(), module(), term(), options()) ->
start_ret().
start(GenMod, LinkP, Name, Mod, Args, Options) ->
case where(Name) of
undefined ->
do_spawn(GenMod, LinkP, Name, Mod, Args, Options);
Pid ->
{error, {already_started, Pid}}
end.
where({global, Name}) -> global:whereis_name(Name);
where({via, Module, Name}) -> Module:whereis_name(Name);
where({local, Name}) -> whereis(Name).
start函数首先判断Name是否已经被注册,这里判断的是worker原子,提供注册名的gen_server进程只能存在一个。
随后调用do_spawn/6函数,参数不变。GenMode是gen_server、LinkP是link、Name是{local, worker}、Mod是我们自己的模块名worker、Args我们传入的参数[Param]、Options是[{timeout, 10}]。
%% gen.erl
do_spawn(GenMod, link, Name, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start_link(?MODULE, init_it,
[GenMod, self(), self(), Name, Mod, Args, Options],
Time,
spawn_opts(Options));
timeout(Options) ->
case lists:keyfind(timeout, 1, Options) of
{_,Time} ->
Time;
false ->
infinity
end.
gen:do_spawn调用proc_lib:start_link/5,传入的?MODULE即gen,Time为10,我们自己通过worker传入的参数被放到第三个列表中。
%% proc_lib.erl
-spec start_link(Module, Function, Args, Time, SpawnOpts) -> Ret when
Module :: module(),
Function :: atom(),
Args :: [term()],
Time :: timeout(),
SpawnOpts :: [spawn_option()],
Ret :: term() | {error, Reason :: term()}.
start_link(M,F,A,Timeout,SpawnOpts) when is_atom(M), is_atom(F), is_list(A) ->
Pid = ?MODULE:spawn_opt(M, F, A, ensure_link(SpawnOpts)),
sync_wait(Pid, Timeout).
sync_wait(Pid, Timeout) ->
receive
{ack, Pid, Return} ->
Return;
{'EXIT', Pid, Reason} ->
{error, Reason}
after Timeout ->
unlink(Pid),
exit(Pid, kill),
flush(Pid),
{error, timeout}
end.
ensure_link(SpawnOpts) ->
case lists:member(link, SpawnOpts) of
true ->
SpawnOpts;
false ->
[link|SpawnOpts]
end.
这里首先通过调用proc_lib:spawn_opt/4创建一个进程,然后调用sync_wait(Pid, Timeout)同步调用,ensure_link确保link选项存在。sync_wait的receive消息接收我们回头介绍,先看进程的创建。
%% proc_lib.erl
-spec spawn_opt(Module, Function, Args, SpawnOpts) -> pid() when
Module :: module(),
Function :: atom(),
Args :: [term()],
SpawnOpts :: [spawn_option()].
spawn_opt(M, F, A, Opts) when is_atom(M), is_atom(F), is_list(A) ->
Parent = get_my_name(),
Ancestors = get_ancestors(),
check_for_monitor(Opts),
erlang:spawn_opt(?MODULE, init_p, [Parent,Ancestors,M,F,A], Opts).
get_my_name() ->
case proc_info(self(),registered_name) of
{registered_name,Name} -> Name;
_ -> self()
end.
-spec get_ancestors() -> [pid()].
get_ancestors() ->
case get('$ancestors') of
A when is_list(A) -> A;
_ -> []
end.
proc_info(Pid,Item) when node(Pid) =:= node() ->
process_info(Pid,Item);
proc_info(Pid,Item) ->
case lists:member(node(Pid),nodes()) of
true ->
check(rpc:call(node(Pid), erlang, process_info, [Pid, Item]));
_ ->
hidden
end.
check({badrpc,nodedown}) -> undefined;
check({badrpc,Error}) -> Error;
check(Res) -> Res.
check_for_monitor(SpawnOpts) ->
case lists:member(monitor, SpawnOpts) of
true ->
erlang:error(badarg);
false ->
false
end.
get_my_name返回当前进程pid或注册名,get_ancestors返回祖先进程列表,check_for_monitor确保monitor选项不存在。然后调用系统函数erlang:spawn_opt生成新进程。新的进程执行proc_lib:init_p(Parent, Ancestors, M, F, A)。
%% proc_lib.erl
-spec init_p(pid(), [pid()], atom(), atom(), [term()]) -> term().
init_p(Parent, Ancestors, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
put('$ancestors', [Parent|Ancestors]),
put('$initial_call', trans_init(M, F, A)),
init_p_do_apply(M, F, A).
init_p_do_apply(M, F, A) ->
try
apply(M, F, A)
catch
Class:Reason ->
exit_p(Class, Reason)
end.
%% -----------------------------------------------------
%% Translate the initial call to some useful information.
%% -----------------------------------------------------
trans_init(gen,init_it,[gen_server,_,_,supervisor,{_,Module,_},_]) ->
{supervisor,Module,1};
trans_init(gen,init_it,[gen_server,_,_,_,supervisor,{_,Module,_},_]) ->
{supervisor,Module,1};
trans_init(gen,init_it,[gen_server,_,_,supervisor_bridge,[Module|_],_]) ->
{supervisor_bridge,Module,1};
trans_init(gen,init_it,[gen_server,_,_,_,supervisor_bridge,[Module|_],_]) ->
{supervisor_bridge,Module,1};
trans_init(gen,init_it,[gen_event|_]) ->
{gen_event,init_it,6};
trans_init(gen,init_it,[_GenMod,_,_,Module,_,_]) when is_atom(Module) ->
{Module,init,1};
trans_init(gen,init_it,[_GenMod,_,_,_,Module|_]) when is_atom(Module) ->
{Module,init,1};
trans_init(M, F, A) when is_atom(M), is_atom(F) ->
{M,F,length(A)}.
init_p首先更新新进程的祖先进程'$ancestors’环境变量和'$initial_call',trans_init只是做一个简单的转换,这里的M 是 gen,F 是 init_it,A 是上面详细介绍过的参数[GenMod, self(), self(), Name, Mod, Args, Options],因此返回{Module, init, 1}。
init_p_do_apply直接调用M:F(A),即gen:init_it(GenMod, self(), self(), Name, Mod, Args, Options),注意这里的self()并不是新进程的pid,而是父进程pid,也就是调用sync_wait进程的pid,sync_wait的第一个参数Pid才是执行gen:init_it的新进程的pid。目前为止,其实只涉及到两个进程——运行gen_server框架的进程、新生成的进程。
回到gen模块,在新进程上下文执行init_it/7。
%%-----------------------------------------------------------------
%% Initiate the new process.
%% Register the name using the Rfunc function
%% Calls the Mod:init/Args function.
%% Finally an acknowledge is sent to Parent and the main
%% loop is entered.
%%-----------------------------------------------------------------
init_it(GenMod, Starter, Parent, Mod, Args, Options) ->
init_it2(GenMod, Starter, Parent, self(), Mod, Args, Options).
init_it(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
case register_name(Name) of
true ->
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options);
{false, Pid} ->
proc_lib:init_ack(Starter, {error, {already_started, Pid}})
end.
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
GenMod:init_it(Starter, Parent, Name, Mod, Args, Options).
这里做了简单解释——初始化新进程、注册为{local, worker}、给父进程发送确认、进入loop循环。
init_it2调用GenMod:init_it(Starter, Parent, Name, Mod, Args, Options),Starter是父进程pid,而Parent在链接时跟Starter一样,不链接时是原子self。GenMod是gen_server,回到gen_server模块。
%%% ---------------------------------------------------
%%% Initiate the new process.
%%% Register the name using the Rfunc function
%%% Calls the Mod:init/Args function.
%%% Finally an acknowledge is sent to Parent and the main
%%% loop is entered.
%%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = gen:name(Name0),
Debug = gen:debug_options(Name, Options),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, infinity, Debug);
{ok, State, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, Timeout, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
%% the parent process is notified about the failure.
%% (Otherwise, the parent process could get
%% an 'already_started' error if it immediately
%% tried starting the process again.)
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
end.
调用gen:name返回注册名,在这里是worker;gen:debug_options和调试相关,先不讨论。有意思的是这里用到了老的catch风格代码,Mod:init(Args)即worker:init([Param]),终于回调到了我们自己写的代码。根据返回值,调用proc_lib:init_ack给Starter进程发送消息。
%% proc_lib.erl
-spec init_ack(Parent, Ret) -> 'ok' when
Parent :: pid(),
Ret :: term().
init_ack(Parent, Return) ->
Parent ! {ack, self(), Return},
ok.
sync_wait(Pid, Timeout) ->
receive
{ack, Pid, Return} ->
Return;
{'EXIT', Pid, Reason} ->
{error, Reason}
after Timeout ->
unlink(Pid),
exit(Pid, kill),
flush(Pid),
{error, timeout}
end.
早先提到的proc_lib:start_link调用sync_wait等待确认,这时会根据收到的消息执行不同的操作,直接返回或者终止进程。我们传入的超时值,在这里发挥作用。
至此gen_server进程启动。简单总结:worker:start_link -> gen_server:start_link -> gen:start -> gen:do_spawn -> proc_lib:start_link -> proc_lib:spawn_opt -> erlang:spawn_opt => new process -> proc_lib:init_p -> gen:init_it -> gen_server:init_it -> worker:init -> send ack to parent -> gen_server:loop。
loop循环
总体来说,gen_server的启动还是很简单的,只是一连串的调用流程,最后进入gen_server 的loop循环。
loop(Parent, Name, State, Mod, Time, Debug) ->
Msg = receive
Input ->
Input
after Time ->
timeout
end,
decode_msg(Msg, Parent, Name, State, Mod, Time, Debug, false).
decode_msg(Msg, Parent, Name, State, Mod, Time, Debug, Hib) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, State, Mod, Time], Hib);
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
handle_msg(Msg, Parent, Name, State, Mod);
_Msg ->
Debug1 = sys:handle_debug(Debug, fun print_event/3,
Name, {in, Msg}),
handle_msg(Msg, Parent, Name, State, Mod, Debug1)
end.
新进程会阻塞在loop的receive,等到消息到来。我们假设已经收到消息,看下面的处理过程。对于{system, From, Req}系统消息,调用sys:handle_system_msg处理,用于在暂停、恢复或者终止进程等;{‘EXIT’, Parent, Reason}退出信号导致进程退出,并调用terminate函数;其他消息都会进入handle_msg函数。
进程退出
先来看进程退出,即gen_server:terminate函数。
-spec terminate(_, _, _, _, _, _) -> no_return().
terminate(Reason, Name, Msg, Mod, State, Debug) ->
terminate(Reason, Reason, Name, Msg, Mod, State, Debug).
-spec terminate(_, _, _, _, _, _, _) -> no_return().
terminate(ExitReason, ReportReason, Name, Msg, Mod, State, Debug) ->
Reply = try_terminate(Mod, ExitReason, State),
case Reply of
{'EXIT', ExitReason1, ReportReason1} ->
FmtState = format_status(terminate, Mod, get(), State),
error_info(ReportReason1, Name, Msg, FmtState, Debug),
exit(ExitReason1);
_ ->
case ExitReason of
normal ->
exit(normal);
shutdown ->
exit(shutdown);
{shutdown,_}=Shutdown ->
exit(Shutdown);
_ ->
FmtState = format_status(terminate, Mod, get(), State),
error_info(ReportReason, Name, Msg, FmtState, Debug),
exit(ExitReason)
end
end.
try_terminate(Mod, Reason, State) ->
try
{ok, Mod:terminate(Reason, State)}
catch
throw:R ->
{ok, R};
error:R ->
Stacktrace = erlang:get_stacktrace(),
{'EXIT', {R, Stacktrace}, {R, Stacktrace}};
exit:R ->
Stacktrace = erlang:get_stacktrace(),
{'EXIT', R, {R, Stacktrace}}
end.
terminate函数会回调我们的worker:terminate,并调用format_status格式化输出信息。这里也可以看到可选回调format_status的作用。
format_status(Opt, Mod, PDict, State) ->
DefStatus = case Opt of
terminate -> State;
_ -> [{data, [{"State", State}]}]
end,
case erlang:function_exported(Mod, format_status, 2) of
true ->
case catch Mod:format_status(Opt, [PDict, State]) of
{'EXIT', _} -> DefStatus;
Else -> Else
end;
_ ->
DefStatus
end.
为了完整的讨论进程终止,我们顺便介绍gen_server:stop函数:
stop(Name) ->
gen:stop(Name).
stop(Name, Reason, Timeout) ->
gen:stop(Name, Reason, Timeout).
简单调用gen:stop函数:
stop(Process) ->
stop(Process, normal, infinity).
stop(Process, Reason, Timeout)
when Timeout =:= infinity; is_integer(Timeout), Timeout >= 0 ->
Fun = fun(Pid) -> proc_lib:stop(Pid, Reason, Timeout) end,
do_for_proc(Process, Fun).
%% Local or remote by pid
do_for_proc(Pid, Fun) when is_pid(Pid) ->
Fun(Pid);
%% Local by name
do_for_proc(Name, Fun) when is_atom(Name) ->
case whereis(Name) of
Pid when is_pid(Pid) ->
Fun(Pid);
undefined ->
exit(noproc)
end;
do_for_proc会以pid为参数调用第二个参数fun函数。fun即 proc_lib:stop(Pid, Reason, Timeout)。
%% proc_lib.erl
-spec stop(Process, Reason, Timeout) -> 'ok' when
Process :: pid() | RegName | {RegName,node()},
RegName :: atom(),
Reason :: term(),
Timeout :: timeout().
stop(Process, Reason, Timeout) ->
{Pid, Mref} = erlang:spawn_monitor(do_stop(Process, Reason)),
receive
{'DOWN', Mref, _, _, Reason} ->
ok;
{'DOWN', Mref, _, _, {noproc,{sys,terminate,_}}} ->
exit(noproc);
{'DOWN', Mref, _, _, CrashReason} ->
exit(CrashReason)
after Timeout ->
exit(Pid, kill),
receive
{'DOWN', Mref, _, _, _} ->
exit(timeout)
end
end.
-spec do_stop(Process, Reason) -> Fun when
Process :: pid() | RegName | {RegName,node()},
RegName :: atom(),
Reason :: term(),
Fun :: fun(() -> no_return()).
do_stop(Process, Reason) ->
fun() ->
Mref = erlang:monitor(process, Process),
ok = sys:terminate(Process, Reason, infinity),
receive
{'DOWN', Mref, _, _, ExitReason} ->
exit(ExitReason)
end
end.
这里会多次用到进程监控。proc_lib:stop生成并监控新进程 proc_lib:do_stop。do_stop也会监控要终止的进程,即worker,然后调用sys:terminate,它并不会直接终止进程,而是发消息:
terminate(Name, Reason, Timeout) ->
send_system_msg(Name, {terminate, Reason}, Timeout).
send_system_msg(Name, Request, Timeout) ->
case catch gen:call(Name, system, Request, Timeout) of
{ok,Res} -> Res;
{'EXIT', Reason} -> exit({Reason, mfa(Name, Request, Timeout)})
end.
利用gen:call函数给Name - {local, worker}进程发消息,由之前介绍过的loop消息循环接收并处理,会收到一个系统消息,然后回调gen_server:system_terminate,然后回到gen_server:terminate。这里先略过详细流程,只要知道最后还是会由gen_server:terminate来处理。为什么要搞这么多监控呢?把复杂的任务放到新进程中,然后用一个简单的进程(简单到几乎不会崩溃)监控它,是Erlang最根本的哲学。
gen_server:call
worker.erl程序中,导出函数query和set调用了gen_server:call:
%% gen_server.erl
call(Name, Request) ->
case catch gen:call(Name, '$gen_call', Request) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, call, [Name, Request]}})
end.
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
会以参数[{local, worker}, ‘$gen_call’, Request]调用到gen:call——之前有过介绍,这里详细看一下代码:
call(Process, Label, Request) ->
call(Process, Label, Request, ?default_timeout).
call(Process, Label, Request, Timeout)
when Timeout =:= infinity; is_integer(Timeout), Timeout >= 0 ->
Fun = fun(Pid) -> do_call(Pid, Label, Request, Timeout) end,
do_for_proc(Process, Fun).
do_call(Process, Label, Request, Timeout) ->
try erlang:monitor(process, Process) of
Mref ->
catch erlang:send(Process, {Label, {self(), Mref}, Request},
[noconnect]),
receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
{ok, Reply};
{'DOWN', Mref, _, _, noconnection} ->
Node = get_node(Process),
exit({nodedown, Node});
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end
catch
error:_ ->
Node = get_node(Process),
monitor_node(Node, true),
receive
{nodedown, Node} ->
monitor_node(Node, false),
exit({nodedown, Node})
after 0 ->
Tag = make_ref(),
Process ! {Label, {self(), Tag}, Request},
wait_resp(Node, Tag, Timeout)
end
end.
do_call执行真正的调用,也就是消息发送。首先monitor进程,然后调用erlang:send发送消息。如果进程在其他节点,需要监控节点的进程。erlang:send发送的消息由loop循环接收,gen_server:handle_msg最后详细介绍。
gen_server:cast
worker:show会调用gen_server:cast,发送异步消息:
cast({global,Name}, Request) ->
catch global:send(Name, cast_msg(Request)),
ok;
cast({via, Mod, Name}, Request) ->
catch Mod:send(Name, cast_msg(Request)),
ok;
cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_atom(Dest) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_pid(Dest) ->
do_cast(Dest, Request).
do_cast(Dest, Request) ->
do_send(Dest, cast_msg(Request)),
ok.
cast_msg(Request) -> {'$gen_cast',Request}.
do_send(Dest, Msg) ->
case catch erlang:send(Dest, Msg, [noconnect]) of
noconnect ->
spawn(erlang, send, [Dest,Msg]);
Other ->
Other
end.
异步消息只需要发送,不关心进程的响应。所以很简单,erlang:send发送'$gen_cast’标记的消息。
gen_server:handle_msg
消息收发的关键函数时handle_msg,所以现在介绍。
handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod) ->
Result = try_handle_call(Mod, Msg, From, State),
case Result of
{ok, {reply, Reply, NState}} ->
reply(From, Reply),
loop(Parent, Name, NState, Mod, infinity, []);
{ok, {reply, Reply, NState, Time1}} ->
reply(From, Reply),
loop(Parent, Name, NState, Mod, Time1, []);
{ok, {noreply, NState}} ->
loop(Parent, Name, NState, Mod, infinity, []);
{ok, {noreply, NState, Time1}} ->
loop(Parent, Name, NState, Mod, Time1, []);
{ok, {stop, Reason, Reply, NState}} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
reply(From, Reply),
exit(R);
Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State)
end;
handle_msg(Msg, Parent, Name, State, Mod) ->
Reply = try_dispatch(Msg, Mod, State),
handle_common_reply(Reply, Parent, Name, Msg, Mod, State).
try_handle_call(Mod, Msg, From, State) ->
try
{ok, Mod:handle_call(Msg, From, State)}
catch
throw:R ->
{ok, R};
error:R ->
Stacktrace = erlang:get_stacktrace(),
{'EXIT', {R, Stacktrace}, {R, Stacktrace}};
exit:R ->
Stacktrace = erlang:get_stacktrace(),
{'EXIT', R, {R, Stacktrace}}
end.
reply({To, Tag}, Reply) ->
catch To ! {Tag, Reply}.
首先处理'$gen_call’消息,try_handle_call是try保护的回调调用,调用worker:handle_call,我们自己编写的回调函数,看到其实可以在handle_call中直接throw返回结果。调用reply发回响应,From的形式是{To, Tag},To表示pid或者注册名,Tag是monitor返回的引用值,这个消息会被之前介绍的gen:do_call接收,并返回{ok, Reply}到我们的应用程序。
返回其他值走向handle_common_reply。
其他消息走到第二个分支,调用try_dispatch。
try_dispatch({'$gen_cast', Msg}, Mod, State) ->
try_dispatch(Mod, handle_cast, Msg, State);
try_dispatch(Info, Mod, State) ->
try_dispatch(Mod, handle_info, Info, State).
try_dispatch(Mod, Func, Msg, State) ->
try
{ok, Mod:Func(Msg, State)}
catch
throw:R ->
{ok, R};
error:R ->
Stacktrace = erlang:get_stacktrace(),
{'EXIT', {R, Stacktrace}, {R, Stacktrace}};
exit:R ->
Stacktrace = erlang:get_stacktrace(),
{'EXIT', R, {R, Stacktrace}}
end.
handle_cast和handle_info的使用在这里体现,它们返回的结果都会交由handle_common_reply处理:
handle_common_reply(Reply, Parent, Name, Msg, Mod, State) ->
case Reply of
{ok, {noreply, NState}} ->
loop(Parent, Name, NState, Mod, infinity, []);
{ok, {noreply, NState, Time1}} ->
loop(Parent, Name, NState, Mod, Time1, []);
{ok, {stop, Reason, NState}} ->
terminate(Reason, Name, Msg, Mod, NState, []);
{'EXIT', ExitReason, ReportReason} ->
terminate(ExitReason, ReportReason, Name, Msg, Mod, State, []);
{ok, BadReply} ->
terminate({bad_return_value, BadReply}, Name, Msg, Mod, State, [])
end.
这里没什么特殊的,除了注意超时值,如果某次返回没有超时,就会更新为infinity;这也很好理解,尾递归情况,之前返回的参数不会影响后续调用。还有,handle_cast和handle_info的返回值只能是{ok, {noreply, NState…}}。随后接着进入loop循环。
最后
实际运行worker看一下效果:
Erlang/OTP 19 [erts-8.0] [64-bit] [smp:8:8] [async-threads:10]
Eshell V8.0 (abort with ^G)
1> worker:start_link().
Saving key: '0x01f' with value test, self: <0.58.0>
Timeout
{ok,<0.58.0>}
2> worker:show('0x01f').
show, key '0x01f' value test, self: <0.58.0>
ok
3> worker:query('0x01f').
query from {<0.56.0>,#Ref<0.0.1.59>}, self: <0.58.0>
{ok,test}
4> self().
<0.56.0>
5>
唯一没有介绍的回调函数code_change由gen_server:system_code_change回调:
system_code_change([Name, State, Mod, Time], _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
{ok, NewState} -> {ok, [Name, NewState, Mod, Time]};
Else -> Else
end.
总结一下。虽然篇幅很长,但几乎都是列源代码,只有少部分简单解释。因为Erlang实在是太简单了——这里简单的意思是简洁。很容易就能理解它的所有能力,而这种能力又非常稳定和强大。