RocketMQ原理分享

之前在组内分享过一次RocketMQ,虽然写了PPT,还是口述+黑板画图,现场分享效果更佳。RocketMQ架构清晰,只有NameServer、Broker、Producer和Consumer四个组件,通过共享commitlog,即使数十万topic,依然能保持高性能;是通过工程巧妙解决业务难题的典范。RocketMQ真正做到了“简单”,例如重复消费这种功能,完全抛给消费侧;通过构建逻辑队列,每个消息都会指定唯一队列,解决消息顺序的问题。

下面介绍整体架构和四个组件,然后分析一条消息从诞生到被消费的全过程,从中了解RocketMQ设计的精妙。

thrift 编码解码详解

thrift结构体按照一定格式编码成字符串,供网络传输;通常上层会对解码和编码的调用做封装,把object编码为string,从string解码object:

boost::shared_ptr<TMemoryBuffer> mem_buffer(new TMemoryBuffer());
mem_buffer->resetBuffer(
    reinterpret_cast<uint8_t*>(const_cast<char*>(buffer.c_str())),
    buffer.size());
TBinaryProtocol protocol(mem_buffer);
data.read(&protocol);

boost::shared_ptr<TMemoryBuffer> mem_buffer(new TMemoryBuffer());
TBinaryProtocol protocol(mem_buffer);
data.write(&protocol);
buffer = mem_buffer->getBufferAsString();

spdlog阅读记录

spdlog_impl.h: 用户接口

registry.h: logger封装接口

using registry =  registry_t<std::mutex> 
class registry_t {
private:
std::unordered_map<std::string, std::shared_ptr<logger>> _loggers; // name -> logger
}

poolboy——Erlang进程池

poolboy是Erlang进程池,应用广泛,代码短小精悍(400行不到)。PostgreSQL的Erlang客户端就用到了poolboy,其他DB库也有用,比如redis等。由于足够简洁,所以坑不少,这篇文章做了总结。任何进程池的目的都是很明确的——预先创建特定数目的进程,执行反复的一次性工作,随后接受直接提供服务。用Erlang实现进程池更是易如反掌,只需要按照OTP规范写一个监控树就行。

poolboy使用简单,官方README和例子说的很清楚。简要列一下工作流程:

poolboy:start/2(start_link/2) -> poolboy gen_server process
poolboy:init -> poolboy_sup -> N worker_process
poolboy:transaction -> select one worker_process from pool and execute function

下面详细看源码。

HTTP请求处理cowboy_rest

cowboy_rest以REST方式,允许用户模块介入HTTP请求处理。看过nginx源码的同学都知道,nginx回调模块可以介入http请求处理,依靠请求过程中划分的十几个阶段,实现资源重定向、权限控制等。cowboy的做法略有不同,前一篇提到过中间件的概念。给middlewares设置实现了execute回调的模块,接收到请求头和数据包后介入处理,实现nginx类似功能,官方有个markdown的例子,回头分析。一般来说,我们关注正常请求,在用户回调模块中实现cowboy_rest模块的可选回调即可。

cowboy_handler作为请求处理的最后一环,根据路由规则回调用户模块,如果返回{cowboy_rest, Req, State},就会调用cowboy_rest:upgrade/6:

execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
	try Handler:init(Req, HandlerOpts) of
		...
		{Mod, Req2, State} ->
			Mod:upgrade(Req2, Env, Handler, State, infinity, run);
		...

Erlang HTTP服务器cowboy

cowboy是基于ranch的http服务器框架,提供用户自定义路由、REST接口等便利功能。由于ranch是很完善的TCP池,所以在此之上的cowboy代码很容易支持http/https。虽说如此,http毕竟是很复杂的协议,好多细节不参考RFC根本搞不清楚。借此机会详细了解如何实现完整的HTTP服务器。首先了解cowboy完整的流程以及使用方法。

最简单的例子:

start(_Type, _Args) ->
	Dispatch = cowboy_router:compile([
		{'_', [
			{"/", toppage_handler, []}
		]}
	]),
	{ok, _} = cowboy:start_clear(http, 100, [{port, 8080}], #{
		env => #{dispatch => Dispatch}
	}),

tcp连接池ranch

ranch 是Erlang/OTP实现的 TCP 连接池,我在看cowboy源码的时候意外发现了这个组件。让人欣喜的是它本身就非常强大,只需要短短几行代码,就可以实现强大的tcp服务器功能,而socket管理的细节都被ranch封装起来。

运行anch_app:start(,)启动服务,然后就可以使用自己实现的服务。晚上看了一下它的源码,非常短,但是很成熟,是学习的好例子。

ranch的启动大致如下流程:

ranch_app:start(_,_) %% application
-> ranch_sup:start_link() %% supervisor 
-> ranch_sup:init() %% supervisor callback
-> ets:new(ranch_server, ...) %% one_for_one, ranch_server
-> ranch_server:start_link() %% gen_server

ranch_server提供tcp连接池管理,其它模块通过和ranch_server交互,维护自己的连接信息、socket选项等。

supervisor源码阅读

supervisor是监控树中最重要的组成部分,负责启动、停止和监控子进程,异常时重启等。supervisor模块应该尽可能简单,保持功能单一——负责监控。supervisor模块本身基于gen_server,所以它实现了gen_server的六个主要回调函数,是一个gen_server进程。通过阅读supervisor模块源码,了解实现supervisor回调的过程以及实现进程监控器的步骤。

先写一个简单的worker_sup.erl,实现了supervisor行为模式,配合worker.erl模块使用:

%% supervisor
-module(worker_sup).

-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
    supervisor:start_link(worker_sup, []).

init(_Args) ->
    SupFlags = #{strategy => one_for_one, intensity => 1, period => 5},
    ChildSpecs = [#{id => worker, start => {worker, start_link, []}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [worker]}],
    {ok, {SupFlags, ChildSpecs}}.

gen_server源码阅读

gen_server是Erlang/OTP的通用服务器框架,使用最为广泛。源代码很少,只有几百行。抽空详细梳理了一遍流程,以简单的gen_server应用worker为例。

先看我们自己的worker.erl文件,根据OTP设计规范,既是回调模块,又是用户接口。对外导出 start_link/0,1 , query/1 , set/1 , show/1 , stop/1,同时实现gen_server行为模式的6个必要的回调函数: