在进行项目开发时,有这么一个需求:一个生产者不断产生消息,并将消息异步投递给消费者,异步投递不受任何限制,消费者进程内部将消息写入文件或者通过套接字发送该消息。 看到这个需求,我想也没想,熟练地写下了如下代码:
API:- start_link(FileName, DsgHost, DstPort) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [FileName, DsgHost, DstPort], []).
-
-
- deliver_to_file(Msg) -> deliver_to(file, Msg).
- deliver_to_net(Msg) -> deliver_to(net, Msg).
-
- deliver_to(Type, Msg) ->
- gen_server:cast(?SERVER, {deliver_to, Type, Msg}).
复制代码 gen_server callback: - init([FileName, DsgHost, DstPort]) ->
- {ok, FD} = file:open(FileName,
- [binary, raw, read, write, append]),
- {ok, Socket} = gen_tcp:connect(DsgHost, DstPort,
- [inet, binary, {active, false}, {packet, 4}, {reuseaddr, true}]),
- {ok, #state{file = FD, socket = Socket}}.
-
-
-
- handle_cast({deliver_to, Type, Msg}, State) ->
- handle_msg(Type, Msg, State),
- {noreply, State};
-
-
- handle_msg(Type, Msg, #state{file = File, socket = Socket}) when is_binary(Msg) ->
- case Type of
- file -> file:write(File, Msg);
- net -> gen_tcp:send(Socket, Msg)
- end;
- handle_msg(Type, Msg, State) ->
- handle_msg(Type, term_to_binary(Msg), State).
复制代码
压测函数编写如下:- deliver_to_with_len_and_times(Type, Len, Times) ->
- Msg = makeup_log_by_len(Len),
- deliver_to_times(Type, Msg, Times).
-
- deliver_to_times(Type, Msg, Times) when Times > 0 ->
- deliver_to(Type, Msg);
- deliver_to_times(_Type, _Msg, 0) ->
- ok.
-
- makeup_log_by_len(Len) ->
- list_to_binary(lists:duplicate(Len, $a)).
复制代码
函数主要目的为异步地向consumer投递Times个长度为Len的消息。 这段看似简单的代码,却在压测的时候产生了意想不到的效果:当投递消息个数较小时(<1000),handle_msg的处理qps上万,但是随着投递消息数目的增加(>10000),性能却急剧下降,最后仅有300+,使用vmstat、ifstat和top工具观察,磁盘写入和网络发送数据量均不大,但是cpu消耗却不小,这是肿么回事呢?
未完待续...
|