Erlang HTTP服务器cowboy
cowboy是基于ranch的http服务器框架,提供用户自定义路由、REST接口等便利功能。由于ranch是很完善的TCP池,所以在此之上的cowboy代码很容易支持http/https。虽说如此,http毕竟是很复杂的协议,好多细节不参考RFC根本搞不清楚。借此机会详细了解如何实现完整的HTTP服务器。首先了解cowboy完整的流程以及使用方法。
最简单的例子:
start(_Type, _Args) ->
Dispatch = cowboy_router:compile([
{'_', [
{"/", toppage_handler, []}
]}
]),
{ok, _} = cowboy:start_clear(http, 100, [{port, 8080}], #{
env => #{dispatch => Dispatch}
}),
cowboy_router.erl模块用于路由匹配,用户传入路由列表,并将其作为cowboy:start_clear的第四个参数的env值。类型定义和解析非常具有Erlang特色,新手看绝对会头晕,甚至完全看不懂。我也是配合shell才完全理解。'_‘代表host,随后的列表是path元组,匹配host之下的路径,支持[]可选模式和_通配符等,匹配成功后回调指定的模块,这里是toppage_handler。
cowboy:start_clear开始http服务器:
-spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(),
cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}.
start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts)
when is_integer(NbAcceptors), NbAcceptors > 0 ->
TransOpts = [connection_type(ProtoOpts)|TransOpts0],
ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
ranch accept成功后,回调cowboy_clear模块的start_link:
-spec start_link(ranch:ref(), inet:socket(), module(), cowboy:opts()) -> {ok, pid()}.
start_link(Ref, Socket, Transport, Opts) ->
Pid = proc_lib:spawn_link(?MODULE, proc_lib_hack, [self(), Ref, Socket, Transport, Opts]),
{ok, Pid}.
ranch引用值、客户端socket、参数等为参数创建proc_lib_hack进程,它会在try中执行init:
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
init(Parent, Ref, Socket, Transport, Opts) ->
ok = ranch:accept_ack(Ref),
init(Parent, Ref, Socket, Transport, Opts, cowboy_http).
init(Parent, Ref, Socket, Transport, Opts, Protocol) ->
{Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}),
_ = case Type of
worker -> ok;
supervisor -> process_flag(trap_exit, true)
end,
Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler).
ranch:accpet_ack同步ranch服务中心的shoot消息,表示当前socket准备完毕,可以收发数据。init中,注意Handler默认是cowboy_stream_h,之后的代码中会出现无数的handler、state、pid等信息,稍不小心就搞混。Protocol:init即cowboy_http:init。进入真正的http模块。
到了http模块框架其实就走完了,接下来的无非就是接收数据、解析http请求行、http头和请求包体,然后根据路由规则处理。说来简单,实现的时候却涉及到相当多的技巧和细节。
cowboy使用半自动模式,即每次决定接收数据时将socket设置为{active,once};消息循环loop开始前,先设置最大长连接数默认为100,并启动一个默认值为5s的定时器。http实现时默认是系统进程,需要处理进程退出、OTP回调等消息。主要关心数据接收、连接关闭和http数据流,这也是cowboy的核心功能。
数据接收和解析由cowboy_http:parse/2完成,内部状态state有一个值表示当前解析阶段——请求行、请求头、数据包体等。解析完后,生成一个map数据结构,维护当前请求状态。类似nginx的请求上下文,很重要的结构:
Req = #{
ref => Ref,
pid => self(),
streamid => StreamID,
peer => Peer,
method => Method,
scheme => Scheme,
host => Host,
port => Port,
path => Path,
qs => Qs,
version => Version,
headers => maps:remove(<<"transfer-encoding">>, Headers),
has_body => HasBody,
body_length => BodyLength
}
随后把状态更新为body接收,并取消定时器(这里的处理是否会有DOS风险?)。至此parse完成,并调用after_parse,后者回调Handler:init/3,默认是cowboy_stream_h:init:
-spec init(_,_,_) -> _.
init(_StreamID, Req=#{ref := Ref}, Opts) ->
Env = maps:get(env, Opts, #{}),
Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
Shutdown = maps:get(shutdown, Opts, 5000),
Pid = proc_lib:spawn_link(?MODULE, proc_lib_hack, [Req, Env, Middlewares]),
{[{spawn, Pid, Shutdown}], #state{ref=Ref, pid=Pid}}.
它接收stream id、请求结构体和http全局结构体的opts选项为参数。middlewares中间件默认有cowboy_router路由模块和cowboy_handler回调处理模块,我们可以自由添加,只需在cowboy:start_clear时传入合适的middlewares值。这点看来,比nginx的介入处理功能还要强大。
这里又会创建新进程proc_lib_hack,它会安顺西调用middlewares列表的模块的execute/2导出函数,cowboy_router和cowboy_handler的execute如下:
%% cowboy_router.erl
execute(Req=#{host := Host, path := Path}, Env=#{dispatch := Dispatch}) ->
case match(Dispatch, Host, Path) of
{ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} ->
{ok, Req#{
host_info => HostInfo,
path_info => PathInfo,
bindings => Bindings
}, Env#{
handler => Handler,
handler_opts => HandlerOpts
}};
...
end.
match函数匹配Host和Path,并返回Handler和HandlerOpts,这里分别是toppage_handler和[]。再看最后的中间件cowboy_handler:
%% cowboy_handler.erl
execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
try Handler:init(Req, HandlerOpts) of
{ok, Req2, State} ->
Result = terminate(normal, Req2, State, Handler),
{ok, Req2, [{result, Result}|Env]};
{Mod, Req2, State} ->
Mod:upgrade(Req2, Env, Handler, State, infinity, run);
...
这里用户的回调模块便会发挥作用:
%% toppage_handler.erl
init(Req0, Opts) ->
Req = cowboy_req:reply(200, #{
<<"content-type">> => <<"text/plain">>
}, <<"Hello world!">>, Req0),
{ok, Req, Opts}.
hello_world例子中,直接调用cowboy_req:reply发送响应,然后返回ok结束http。
先回到after_parse,cowboy_stream_h:init的返回值为 {Commands = [{spawn, Pid, Shutdown}], StreamState = #state{ref=Ref, pid=Pid}}. 。http模块更新state状态的streams键,以及短连接时更新last_streamid。然后调用commands:
commands(State, _, []) ->
State;
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
将同一连接的子进程加入state的children状态。GET请求一般到这里就结束了,因为没有数据包。http连接终止时,会把子进程信息从children中取出。具体的终止在下面的响应包发送的时候提及。
做个小总结:每次http访问维护一个全局状态,而每个http长连接维护一组用stream id标识的streams,即基于cowboy_stream_h模块的进程,回调用户模块处理请求。
有一个很重要的功能还没介绍——发送响应。通过导出接口cowboy_req:reply实现:
reply(Status, Headers, Body, Req) when is_integer(Status); is_binary(Status) ->
do_reply(Status, Headers#{<<"content-length">> => integer_to_binary(iolist_size(Body))}, Body, Req).
do_reply(Status, Headers, Body, Req=#{pid := Pid, streamid := StreamID}) ->
Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), Body}},
done_replying(Req, true).
向请求上下文记录的pid发送{Pid,StreamID}开头的消息,pid指向http进程:
%% cowboy_http.erl loop
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
{OK, Closed, Error} = Transport:messages(),
receive
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State, StreamID, Msg), Buffer);
...
http调用info函数,info首先找出stream对应的状态,并回调stream处理模块cowboy_stream_h的info函数,info返回{Command, NewStreamState},http调用commands函数处理命令并更新流状态:
%% cowboy_stream_h.erl
%% Response.
info(_StreamID, Response = {response, _, _, _}, State) ->
{[Response], State};
%% normal exit
info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
{[stop], State};
这里顺便把连接关闭时的info处理过程放上,处理都是类似的。commands函数的{response,…}分支处理响应包。
commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
[{response, StatusCode, Headers0, Body}|Tail]) ->
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
{State, Headers} = connection(State0, Headers0, StreamID, Version),
Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
case Body of
{sendfile, O, B, P} ->
Transport:send(Socket, Response),
commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
_ ->
Transport:send(Socket, [Response, Body]),
maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
end;
Transport是ranch_tcp,发送响应。
cowboy是一款功能强大的http服务器,类似nginx,轻量、高度可定制,如果用来实现负载均衡器,应该也是不错的。更具体的细节我还没看,包括REST支持、介入请求处理流程等。熟悉之后再写更多的分析文章。