supervisor源码阅读
supervisor是监控树中最重要的组成部分,负责启动、停止和监控子进程,异常时重启等。supervisor模块应该尽可能简单,保持功能单一——负责监控。supervisor模块本身基于gen_server,所以它实现了gen_server的六个主要回调函数,是一个gen_server进程。通过阅读supervisor模块源码,了解实现supervisor回调的过程以及实现进程监控器的步骤。
先写一个简单的worker_sup.erl,实现了supervisor行为模式,配合worker.erl模块使用:
%% supervisor
-module(worker_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link(worker_sup, []).
init(_Args) ->
SupFlags = #{strategy => one_for_one, intensity => 1, period => 5},
ChildSpecs = [#{id => worker, start => {worker, start_link, []}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [worker]}],
{ok, {SupFlags, ChildSpecs}}.
效果如下,注意进程实现了一次自动重启:
ကEshell V8.0 (abort with ^G)
1> c(worker_sup).
{ok,worker_sup}
2> worker_sup:start_link().
Saving key: '0x01f' with value test, self: <0.64.0>
Timeout
{ok,<0.63.0>}
3> worker:
code_change/3 handle_call/3 handle_cast/2 handle_info/2 init/1
module_info/0 module_info/1 query/1 set/1 show/1
start_link/0 start_link/1 stop/0 terminate/2
3> worker:show('0x01f').
show, key '0x01f' value test, self: <0.64.0>
ok
4> worker:stop().
terminate reason normal, self: <0.64.0>
Saving key: '0x01f' with value test, self: <0.68.0>
Timeout
ok
5> worker:stop().
terminate reason normal, self: <0.68.0>
Saving key: '0x01f' with value test, self: <0.71.0>
Timeout
ok
6> worker:stop().
terminate reason normal, self: <0.71.0>
** exception exit: shutdown
所有行为模式模块都要实现必需的回调函数,supervisor只需要模块实现init回调:
-callback init(Args :: term()) ->
{ok, {SupFlags :: sup_flags(), [ChildSpec :: child_spec()]}} | ignore.
init回调返回三元组{ok, SupFlags, [ChildSpec]},或者ignore。之前翻译的OTP规范文档有较详细的介绍,但是还是从源码里看一下SupFlags和ChildSpec的定义。
监控策略
源码中策略的定义:
-type strategy() :: 'one_for_all' | 'one_for_one' | 'rest_for_one' | 'simple_one_for_one'.
-type sup_flags() :: #{strategy => strategy(), % optional
intensity => non_neg_integer(), % optional
period => pos_integer()} % optional
|{RestartStrategy :: strategy(),
Intensity :: non_neg_integer(),
Period :: pos_integer()}.
-define(default_flags, #{strategy => one_for_one,
intensity => 1,
period => 5}).
strategy表示子进程重启策略,intensity和period表示重启间隔,在period时间内最多允许重启intensity次,违规后的策略根据strategy而不同。
子进程规范
子进程规范定义和类型声明以及默认值:
-type child_id() :: term().
-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
-type modules() :: [module()] | 'dynamic'.
-type restart() :: 'permanent' | 'transient' | 'temporary'.
-type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type child_spec() :: #{id := child_id(), % mandatory
start := mfargs(), % mandatory
restart => restart(), % optional
shutdown => shutdown(), % optional
type => worker(), % optional
modules => modules()} % optional
| {Id :: child_id(),
StartFunc :: mfargs(),
Restart :: restart(),
Shutdown :: shutdown(),
Type :: worker(),
Modules :: modules()}.
-define(default_child_spec, #{restart => permanent, type => worker}).
监控器内部用id标识进程,start表示要启动的子进程模块、函数和参数。其余参数均为可选参数。
supervisor内部状态
由于supervisor是一个gen_server进程,因此需要维护服务器内部状态:
-type child() :: 'undefined' | pid().
-record(child, {pid = undefined :: child() | {restarting, pid() | undefined} | [pid()], % pid is undefined when child is not running
name :: child_id(),
mfargs :: mfargs(),
restart_type :: restart(),
shutdown :: shutdown(),
child_type :: worker(),
modules = [] :: modules()}).
-type child_rec() :: #child{}.
-define(DICTS, dict).
-define(DICT, dict:dict).
-define(SETS, sets).
-define(SET, sets:set).
-record(state, {name,
strategy :: strategy() | 'undefined',
children = [] :: [child_rec()],
dynamics :: {'dict', ?DICT(pid(), list())} | {'set', ?SET(pid())} | 'undefined',
intensity :: non_neg_integer() | 'undefined',
period :: pos_integer() | 'undefined',
restarts = [],
dynamic_restarts = 0 :: non_neg_integer(),
module,
args}).
-type state() :: #state{}.
state保存了supervisor的参数以及所有子进程的信息。
supervisor启动
调用supervisor:start_link启动监控器,以模块名和参数列表为参数,参数列表会传给回调的init函数。
start_link() ->
supervisor:start_link(worker_sup, []).
看一下supervisor:start_link/2的定义:
-spec start_link(Module, Args) -> startlink_ret() when
Module :: module(),
Args :: term().
start_link(Mod, Args) ->
gen_server:start_link(supervisor, {self, Mod, Args}, []).
-spec start_link(SupName, Module, Args) -> startlink_ret() when
SupName :: sup_name(),
Module :: module(),
Args :: term().
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, supervisor, {SupName, Mod, Args}, []).
通过gen_server:start_link启动进程,上文已经介绍过gen_server的启动流程——创建一个新进程并最终回调supervisor:init函数。
-define(is_simple(State), State#state.strategy =:= simple_one_for_one).
-type init_sup_name() :: sup_name() | 'self'.
-type stop_rsn() :: {'shutdown', term()}
| {'bad_return', {module(),'init', term()}}
| {'bad_start_spec', term()}
| {'start_spec', term()}
| {'supervisor_data', term()}.
-spec init({init_sup_name(), module(), [term()]}) ->
{'ok', state()} | 'ignore' | {'stop', stop_rsn()}.
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
case Mod:init(Args) of
{ok, {SupFlags, StartSpec}} ->
case init_state(SupName, SupFlags, Mod, Args) of
{ok, State} when ?is_simple(State) ->
init_dynamic(State, StartSpec);
{ok, State} ->
init_children(State, StartSpec);
Error ->
{stop, {supervisor_data, Error}}
end;
ignore ->
ignore;
Error ->
{stop, {bad_return, {Mod, init, Error}}}
end.
首先调用process_flag设置自己为系统进程,然后回调init,即我们自己实现的worker_sup:init。
init(_Args) ->
SupFlags = #{strategy => one_for_one, intensity => 1, period => 5},
ChildSpecs = [#{id => worker, start => {worker, start_link, []}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [worker]}],
{ok, {SupFlags, ChildSpecs}}.
init_state接收监控器名、监控策略、子进程模块、子进程参数,返回supervisor进程的内部状态:
%%-----------------------------------------------------------------
%% Func: init_state/4
%% Args: SupName = {local, atom()} | {global, atom()} | self
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
%% rest_for_one
%% MaxIntensity = integer() >= 0
%% Period = integer() > 0
%% Mod :== atom()
%% Args :== term()
%% Purpose: Check that Type is of correct type (!)
%% Returns: {ok, state()} | Error
%%-----------------------------------------------------------------
init_state(SupName, Type, Mod, Args) ->
set_flags(Type, #state{name = supname(SupName,Mod), module = Mod, args = Args}).
set_flags(Flags, State) ->
try check_flags(Flags) of
#{strategy := Strategy, intensity := MaxIntensity, period := Period} ->
{ok, State#state{strategy = Strategy, intensity = MaxIntensity, period = Period}}
catch
Thrown -> Thrown
end.
check_flags(SupFlags) when is_map(SupFlags) ->
do_check_flags(maps:merge(?default_flags,SupFlags));
check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(#{strategy => Strategy, intensity => MaxIntensity, period => Period});
check_flags(What) ->
throw({invalid_type, What}).
do_check_flags(#{strategy := Strategy, intensity := MaxIntensity, period := Period} = Flags) ->
validStrategy(Strategy),
validIntensity(MaxIntensity),
validPeriod(Period),
Flags.
validStrategy(simple_one_for_one) -> true;
validStrategy(one_for_one) -> true;
validStrategy(one_for_all) -> true;
validStrategy(rest_for_one) -> true;
validStrategy(What) -> throw({invalid_strategy, What}).
validIntensity(Max) when is_integer(Max),
Max >= 0 -> true;
validIntensity(What) -> throw({invalid_intensity, What}).
validPeriod(Period) when is_integer(Period),
Period > 0 -> true;
validPeriod(What) -> throw({invalid_period, What}).
supname(self, Mod) -> {self(), Mod};
supname(N, _) -> N.
这一段代码很有Erlang风格,很好的利用Erlang的模式匹配,检查init返回的重启策略参数,并初始化state相关的值。
如果子进程重启策略的strategy键是simple_one_for_one,则调用init_dynamic,否则调用init_children。
simple_one_for_one和init_dynamic
simple_one_for_one用于动态生成子进程,初始化时没有任何子进程,通过调用supervisor:start_child,会按照子进程规范启动新的子进程。调用init_dynamic初始化。
init_dynamic(State, [StartSpec]) ->
case check_startspec([StartSpec]) of
{ok, Children} ->
{ok, State#state{children = Children}};
Error ->
{stop, {start_spec, Error}}
end;
init_dynamic(_State, StartSpec) ->
{stop, {bad_start_spec, StartSpec}}.
init_dynamic的第二个参数必须是子进程规范列表,check_startspec检查子进程启动参数:
%%% ------------------------------------------------------
%%% Check that the children start specification is valid.
%%% Input: [child_spec()]
%%% Returns: {ok, [child_rec()]} | Error
%%% ------------------------------------------------------
check_startspec(Children) -> check_startspec(Children, []).
check_startspec([ChildSpec|T], Res) ->
case check_childspec(ChildSpec) of
{ok, Child} ->
case lists:keymember(Child#child.name, #child.name, Res) of
%% The error message duplicate_child_name is kept for
%% backwards compatibility, although
%% duplicate_child_id would be more correct.
true -> {duplicate_child_name, Child#child.name};
false -> check_startspec(T, [Child | Res])
end;
Error -> Error
end;
check_startspec([], Res) ->
{ok, lists:reverse(Res)}.
check_startspec检索列表的所有子进程约束,调用check_childspec检查参数,并确保子进程名不会重复。
check_childspec(ChildSpec) when is_map(ChildSpec) ->
catch do_check_childspec(maps:merge(?default_child_spec,ChildSpec));
check_childspec({Name, Func, RestartType, Shutdown, ChildType, Mods}) ->
check_childspec(#{id => Name,
start => Func,
restart => RestartType,
shutdown => Shutdown,
type => ChildType,
modules => Mods});
check_childspec(X) -> {invalid_child_spec, X}.
check_childspec把子进程规范转换成map,并调用do_check_childspec,真正的参数检查:
do_check_childspec(#{restart := RestartType, type := ChildType} = ChildSpec)->
Name = case ChildSpec of
#{id := N} -> N;
_ -> throw(missing_id)
end,
Func = case ChildSpec of
#{start := F} -> F;
_ -> throw(missing_start)
end,
validName(Name),
validFunc(Func),
validRestartType(RestartType),
validChildType(ChildType),
Shutdown = case ChildSpec of
#{shutdown := S} -> S;
#{type := worker} -> 5000;
#{type := supervisor} -> infinity
end,
validShutdown(Shutdown),
Mods = case ChildSpec of
#{modules := Ms} -> Ms;
_ -> {M,_,_} = Func, [M]
end,
validMods(Mods),
{ok, #child{name = Name, mfargs = Func, restart_type = RestartType,
shutdown = Shutdown, child_type = ChildType, modules = Mods}}.
validChildType(supervisor) -> true;
validChildType(worker) -> true;
validChildType(What) -> throw({invalid_child_type, What}).
validName(_Name) -> true.
validFunc({M, F, A}) when is_atom(M),
is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
validRestartType(permanent) -> true;
validRestartType(temporary) -> true;
validRestartType(transient) -> true;
validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}).
validShutdown(Shutdown)
when is_integer(Shutdown), Shutdown > 0 -> true;
validShutdown(infinity) -> true;
validShutdown(brutal_kill) -> true;
validShutdown(Shutdown) -> throw({invalid_shutdown, Shutdown}).
validMods(dynamic) -> true;
validMods(Mods) when is_list(Mods) ->
lists:foreach(fun(Mod) ->
if
is_atom(Mod) -> ok;
true -> throw({invalid_module, Mod})
end
end,
Mods);
validMods(Mods) -> throw({invalid_modules, Mods}).
do_check_childspec检查参数类型合法,并返回{ok, Child}。因此check_startspec会按照子进程约束列表的顺序,返回服务器状态所需的child列表。真正的启动过程将在后续start_child讨论中解释。
正常的子进程启动
非simple_one_for_one类型的初始化过程会调用init_children:
init_children(State, StartSpec) ->
SupName = State#state.name,
case check_startspec(StartSpec) of
{ok, Children} ->
case start_children(Children, SupName) of
{ok, NChildren} ->
{ok, State#state{children = NChildren}};
{error, NChildren, Reason} ->
_ = terminate_children(NChildren, SupName),
{stop, {shutdown, Reason}}
end;
Error ->
{stop, {start_spec, Error}}
end.
和init_dynamic类似,首先检查参数。随后调用start_children启动子进程,同时会传入SupName:
%%-----------------------------------------------------------------
%% Func: start_children/2
%% Args: Children = [child_rec()] in start order
%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod}
%% Purpose: Start all children. The new list contains #child's
%% with pids.
%% Returns: {ok, NChildren} | {error, NChildren, Reason}
%% NChildren = [child_rec()] in termination order (reversed
%% start order)
%%-----------------------------------------------------------------
start_children(Children, SupName) -> start_children(Children, [], SupName).
start_children([Child|Chs], NChildren, SupName) ->
case do_start_child(SupName, Child) of
{ok, undefined} when Child#child.restart_type =:= temporary ->
start_children(Chs, NChildren, SupName);
{ok, Pid} ->
start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
{ok, Pid, _Extra} ->
start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
{error, Reason} ->
report_error(start_error, Reason, Child, SupName),
{error, lists:reverse(Chs) ++ [Child | NChildren], {failed_to_start_child,Child#child.name,Reason}}
end;
start_children([], NChildren, _SupName) ->
{ok, NChildren}.
start_children会按照子进程规范列表启动子进程,并按照反序存放在列表,因此服务器状态的#state.child存储顺序和启动顺序相反。真正的启动函数是do_start_child:
do_start_child(SupName, Child) ->
#child{mfargs = {M, F, Args}} = Child,
case catch apply(M, F, Args) of
{ok, Pid} when is_pid(Pid) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
{ok, Pid};
{ok, Pid, Extra} when is_pid(Pid) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
{ok, Pid, Extra};
ignore ->
{ok, undefined};
{error, What} -> {error, What};
What -> {error, What}
end.
do_start_child简单调用apply——这里的模块实际上只能是gen_server或supervisor行为模式的模块,比如worker:start_link,因此总是生成新的进程。最后调用report_progress把进程信息写入log日志。
因为supervisor进程本身也是一个gen_server进程,因此supervisor本身的初始化过程很简单:supervisor:start_link -> gen_server:init -> supervisor:init -> Mod:init -> init_state -> init_dynamic/init_children -> check_startspec -> start_children。
supervisor:start_child
start_child启动子进程:
-spec start_child(SupRef, ChildSpec) -> startchild_ret() when
SupRef :: sup_ref(),
ChildSpec :: child_spec() | (List :: [term()]).
start_child(Supervisor, ChildSpec) ->
call(Supervisor, {start_child, ChildSpec}).
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
gen_server的call调用会回调handle_call函数:
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
Child = hd(State#state.children),
#child{mfargs = {M, F, A}} = Child,
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid, Extra}, NState};
What ->
{reply, What, State}
end;
handle_call({start_child, ChildSpec}, _From, State) ->
case check_childspec(ChildSpec) of
{ok, Child} ->
{Resp, NState} = handle_start_child(Child, State),
{reply, Resp, NState};
What ->
{reply, {error, What}, State}
end;
首先处理simple_one_for_one的子进程创建,去子进程列表的第一个规范,然后调用do_start_child_i启动进程,并通过save_dynamic_child保存动态子进程信息:
do_start_child_i(M, F, A) ->
case catch apply(M, F, A) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{ok, Pid, Extra} when is_pid(Pid) ->
{ok, Pid, Extra};
ignore ->
{ok, undefined};
{error, Error} ->
{error, Error};
What ->
{error, What}
end.
save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) ->
DynamicsDb = dynamics_db(temporary, Dynamics),
State#state{dynamics = {set, ?SETS:add_element(Pid, DynamicsDb)}};
save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) ->
DynamicsDb = dynamics_db(RestartType, Dynamics),
State#state{dynamics = {dict, ?DICTS:store(Pid, Args, DynamicsDb)}}.
dynamics_db(temporary, undefined) ->
?SETS:new();
dynamics_db(_, undefined) ->
?DICTS:new();
dynamics_db(_, {_Tag, DynamicsDb}) ->
DynamicsDb.
temporary子进程会保存到sets中,而其他子进程通过dicts字典保存。
其它类型的子进程生成略有不同。首先传入的参数是一个子进程规范——而不是额外参数,说明可以生成一个和init返回规范不同的子进程。通过调用handle_start_child实现:
handle_start_child(Child, State) ->
case get_child(Child#child.name, State) of
false ->
case do_start_child(State#state.name, Child) of
{ok, undefined} when Child#child.restart_type =:= temporary ->
{{ok, undefined}, State};
{ok, Pid} ->
{{ok, Pid}, save_child(Child#child{pid = Pid}, State)};
{ok, Pid, Extra} ->
{{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)};
{error, What} ->
{{error, {What, Child}}, State}
end;
{value, OldChild} when is_pid(OldChild#child.pid) ->
{{error, {already_started, OldChild#child.pid}}, State};
{value, _OldChild} ->
{{error, already_present}, State}
end.
首先检查子进程名字是否已经存在,然后调用do_start_child,如果子进程是temporary,并且init回调返回ignore,不会保存进程信息——尽可能保持轻量,反之调用save_child:
save_child(#child{restart_type = temporary, mfargs = {M, F, _}} = Child, #state{children = Children} = State) ->
State#state{children = [Child#child{mfargs = {M, F, undefined}} |Children]};
save_child(Child, #state{children = Children} = State) ->
State#state{children = [Child |Children]}.
save_child会省略temporary的参数信息。
子进程退出和重启
supervisor监控器存在的意义就是子进程退出时根据监控策略和子进程规范,选择重启或者停止进程。子进程退出时,监控进程会收到{‘EXIT’, …}信息,看一下处理过程:
handle_info({'EXIT', Pid, Reason}, State) ->
case restart_child(Pid, Reason, State) of
{ok, State1} ->
{noreply, State1};
{shutdown, State1} ->
{stop, shutdown, State1}
end;
restart_child重启进程:
restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) ->
RestartType = Child#child.restart_type,
case dynamic_child_args(Pid, RestartType, State#state.dynamics) of
{ok, Args} ->
{M, F, _} = Child#child.mfargs,
NChild = Child#child{pid = Pid, mfargs = {M, F, Args}},
do_restart(RestartType, Reason, NChild, State);
error ->
{ok, State}
end;
restart_child(Pid, Reason, State) ->
Children = State#state.children,
case lists:keyfind(Pid, #child.pid, Children) of
#child{restart_type = RestartType} = Child ->
do_restart(RestartType, Reason, Child, State);
false ->
{ok, State}
end.
对于simple_one_for_one进程,调用dynamic_child_args从动态进程列表中获取参数;其它进程,则直接从Children列表中查找子进程规范。最后都会调用do_restart:
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
do_restart(_, normal, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
do_restart(_, shutdown, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
do_restart(_, {shutdown, _Term}, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
do_restart(transient, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
do_restart(temporary, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
NState = state_del_child(Child, State),
{ok, NState}.
do_restart以重启类型、退出原因、子进程规范和服务器状态为参数,选择重启策略:
- permanent 进程一定会被重启
- 退出原因是normal、shutdown、{shutdown, term()}的进程不会被重启
- transient 进程在其他情况退出会重启
- 临时进程不会被重启
state_del_child会在dynamic列表或者字典中删除子进程的信息。
最后的步骤是 restart 重启。这是 supervisor 功能核心,根据重启策略的 strategy 键值,确定重启行为;
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
case restart(NState#state.strategy, Child, NState) of
{try_again,NState2} ->
%% Leaving control back to gen_server before
%% trying again. This way other incoming requsts
%% for the supervisor can be handled - e.g. a
%% shutdown request for the supervisor or the
%% child.
Id = if ?is_simple(State) -> Child#child.pid;
true -> Child#child.name
end,
ok = try_again_restart(self(), Id),
{ok,NState2};
{try_again, NState2, #child{name=ChName}} ->
ok = try_again_restart(self(), ChName),
{ok,NState2};
Other ->
Other
end;
{terminate, NState} ->
report_error(shutdown, reached_max_restart_intensity,
Child, State#state.name),
{shutdown, remove_child(Child, NState)}
end.
首先调用 add_restart 检查重启频率,这也是重启策略的重启强度参数起作用的时候:
add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
Now = erlang:monotonic_time(1),
R1 = add_restart([Now|R], Now, P),
State1 = State#state{restarts = R1},
case length(R1) of
CurI when CurI =< I ->
{ok, State1};
_ ->
{terminate, State1}
end.
add_restart([R|Restarts], Now, Period) ->
case inPeriod(R, Now, Period) of
true ->
[R|add_restart(Restarts, Now, Period)];
_ ->
[]
end;
add_restart([], _, _) ->
[].
inPeriod(Then, Now, Period) ->
Now =< Then + Period.
add_restart 维护当前时段的所有重启进程,如果超过intensity,返回terminate,restart会返回{shutdown, …},让整个监控树终止。没有超过重启强度时,调用restart/3具体的实现重启:
one_for_one
restart(one_for_one, Child, State) ->
OldPid = Child#child.pid,
case do_start_child(State#state.name, Child) of
{ok, Pid} ->
NState = replace_child(Child#child{pid = Pid}, State),
{ok, NState};
{ok, Pid, _Extra} ->
NState = replace_child(Child#child{pid = Pid}, State),
{ok, NState};
{error, Reason} ->
NState = replace_child(Child#child{pid = restarting(OldPid)}, State),
report_error(start_error, Reason, Child, State#state.name),
{try_again, NState}
end;
one_for_one 是最简单的情况,子进程不影响其他进程,所以这里只是简单调用do_start_child重启name名的进程,并替换服务器保存的子进程数据。
rest_for_one
restart(rest_for_one, Child, State) ->
{ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children),
ChAfter2 = terminate_children(ChAfter, State#state.name),
case start_children(ChAfter2, State#state.name) of
{ok, ChAfter3} ->
{ok, State#state{children = ChAfter3 ++ ChBefore}};
{error, ChAfter3, {failed_to_start_child, ChName, _Reason}}
when ChName =:= Child#child.name ->
NChild = Child#child{pid=restarting(Child#child.pid)},
NState = State#state{children = ChAfter3 ++ ChBefore},
{try_again, replace_child(NChild,NState)};
{error, ChAfter3, {failed_to_start_child, ChName, _Reason}} ->
NChild = lists:keyfind(ChName, #child.name, ChAfter3),
NChild2 = NChild#child{pid=?restarting(undefined)},
NState = State#state{children = ChAfter3 ++ ChBefore},
{try_again, replace_child(NChild2,NState), NChild2}
end;
rest_for_one 策略时,子进程重启会先终止子进程启动之后的子进程,并重启所有这些子进程。
首先调用 split_child 把子进程列表分开:
%% Chs = [S4, S3, Ch, S1, S0]
%% Ret: {[S4, S3, Ch], [S1, S0]}
split_child(Name, Chs) ->
split_child(Name, Chs, []).
split_child(Name, [Ch|Chs], After) when Ch#child.name =:= Name ->
{lists:reverse([Ch#child{pid = undefined} | After]), Chs};
split_child(Pid, [Ch|Chs], After) when Ch#child.pid =:= Pid ->
{lists:reverse([Ch#child{pid = undefined} | After]), Chs};
split_child(Name, [Ch|Chs], After) ->
split_child(Name, Chs, [Ch | After]);
split_child(_, [], After) ->
{lists:reverse(After), []}.
ChBefore是不需要重启的。首先调用terminate_children终止ChAfter进程,需要注意的是因为temporary进程不会被重启,因此这里也会跳过temporary进程;终止子进程时可以有超时值,允许子进程执行某些清理工作。
start_children会启动所有子进程,如果启动失败,会返回try_again。
在restart/2中,针对{try_again, …}会调用 try_again_restart ,然后返回。
%%% Called by restart/2
-spec try_again_restart(SupRef, Child) -> ok when
SupRef :: sup_ref(),
Child :: child_id() | pid().
try_again_restart(Supervisor, Child) ->
cast(Supervisor, {try_again_restart, Child}).
发送一个异步请求,然后返回;这样做是为了处理其他的消息——如终止监控树等。这是很好的编程范例。这里顺便看一下handle_cast处理try_again_restart的过程:
-spec handle_cast({try_again_restart, child_id() | pid()}, state()) ->
{'noreply', state()} | {stop, shutdown, state()}.
handle_cast({try_again_restart,Pid}, #state{children=[Child]}=State)
when ?is_simple(State) ->
RT = Child#child.restart_type,
RPid = restarting(Pid),
case dynamic_child_args(RPid, RT, State#state.dynamics) of
{ok, Args} ->
{M, F, _} = Child#child.mfargs,
NChild = Child#child{pid = RPid, mfargs = {M, F, Args}},
case restart(NChild,State) of
{ok, State1} ->
{noreply, State1};
{shutdown, State1} ->
{stop, shutdown, State1}
end;
error ->
{noreply, State}
end;
handle_cast({try_again_restart,Name}, State) ->
case lists:keyfind(Name,#child.name,State#state.children) of
Child = #child{pid=?restarting(_)} ->
case restart(Child,State) of
{ok, State1} ->
{noreply, State1};
{shutdown, State1} ->
{stop, shutdown, State1}
end;
_ ->
{noreply,State}
end.
只是再次执行restart流程。
one_for_all
restart(one_for_all, Child, State) ->
Children1 = del_child(Child#child.pid, State#state.children),
Children2 = terminate_children(Children1, State#state.name),
case start_children(Children2, State#state.name) of
{ok, NChs} ->
{ok, State#state{children = NChs}};
{error, NChs, {failed_to_start_child, ChName, _Reason}}
when ChName =:= Child#child.name ->
NChild = Child#child{pid=restarting(Child#child.pid)},
NState = State#state{children = NChs},
{try_again, replace_child(NChild,NState)};
{error, NChs, {failed_to_start_child, ChName, _Reason}} ->
NChild = lists:keyfind(ChName, #child.name, NChs),
NChild2 = NChild#child{pid=?restarting(undefined)},
NState = State#state{children = NChs},
{try_again, replace_child(NChild2,NState), NChild2}
end.
类似 rest_for_one,只不过这次会终止所有进程,然后再重启。
simple_one_for_one
restart(simple_one_for_one, Child, State0) ->
#child{pid = OldPid, mfargs = {M, F, A}} = Child,
State = case OldPid of
?restarting(_) ->
NRes = State0#state.dynamic_restarts - 1,
State0#state{dynamic_restarts = NRes};
_ ->
State0
end,
Dynamics = ?DICTS:erase(OldPid, dynamics_db(Child#child.restart_type,
State#state.dynamics)),
case do_start_child_i(M, F, A) of
{ok, Pid} ->
DynamicsDb = {dict, ?DICTS:store(Pid, A, Dynamics)},
NState = State#state{dynamics = DynamicsDb},
{ok, NState};
{ok, Pid, _Extra} ->
DynamicsDb = {dict, ?DICTS:store(Pid, A, Dynamics)},
NState = State#state{dynamics = DynamicsDb},
{ok, NState};
{error, Error} ->
NRestarts = State#state.dynamic_restarts + 1,
DynamicsDb = {dict, ?DICTS:store(restarting(OldPid), A, Dynamics)},
NState = State#state{dynamic_restarts = NRestarts,
dynamics = DynamicsDb},
report_error(start_error, Error, Child, State#state.name),
{try_again, NState}
end;
simple_one_for_one是最特殊的,因为所有子进程都按照一份子进程规范动态生成。所以子进程退出时只是简单的新建一个进程,并更新动态子进程列表。
总体来说,supervisor 行为模式的实现很简单,这是理所当然的,监控器做的事要尽可能少。另外,它基于gen_server,通过服务器状态维护子进程和动态子进程信息,根据用户设置的重启策略和子进程规范,实现健壮的监控模型。