设为首页收藏本站

Erlang中文论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 29464|回复: 1

异步gen_server进行port访问时性能严重下降的原因和应对方法...

[复制链接]
发表于 2013-10-21 00:54:26 | 显示全部楼层 |阅读模式
上文介绍了简单的问题场景,现在来分析下产生的原因。
consumer进程里面调用file:write和gen_tcp:send来处理消息:

  1. handle_msg(Type, Msg, #state{file = File, socket = Socket}) when is_binary(Msg) ->
  2.     case Type of
  3.         file -> file:write(File, Msg);
  4.         net -> gen_tcp:send(Socket, Msg)
  5.     end;
  6. handle_msg(Type, Msg, State) ->
  7.     handle_msg(Type, term_to_binary(Msg), State).
复制代码


这两个函数都是通过erlang的port体系实现的:
filepen返回一个文件描述符,如果选项包含raw,那么这个描述符将是一个record,定义为:
kernel/include/file.hrl

  1. -record(file_descriptor,
  2.         {module :: module(),     % Module that handles this kind of file
  3.          data   :: term()}).     % Module dependent data
复制代码


通常为#file_descriptor{module = prim_file, data = {Port, Number}},其中Port为文件对应的erlang port,Number为文件在erlang虚拟机进程中的描述符,原进程内部调用prim_file的各个接口;
如果选项不包含raw,这个描述符为pid(),这个描述符Pid对应了一个file_io_server进程,该进程代理原进程调用prim_file的各个接口;
gen_tcp:connect返回一个套接字描述符,这个套接字本身就是一个erlang port。
在erlang虚拟机中,文件对应的port_driver为efile,描述符为efile_driver_entry,而tcp套接字对应的port_driver为tcp_inet,描述符为tcp_inet_driver_entry。
通过port体系实现的接口,都需要先打开port,在实现具体功能时,都需要两步动作完成一个请求:
  1. erlang:port_command(Port, Data).
  2. receive
  3.     {MsgType, MsgBody} -> handle_port_return(MsgType, MsgBody)
  4. end.
复制代码


上述动作都需要在调用者进程中完成。
例如,file:write最终定位到prim_file:write,其实现如下:
prim_file.erl
  1. write(#file_descriptor{module = ?MODULE, data = {Port, _}}, Bytes) ->
  2.     case drv_command(Port, [?FILE_WRITE,Bytes]) of
  3.         {ok, _Size} ->
  4.             ok;
  5.         Error ->
  6.             Error
  7.     end.
  8. drv_command(Port, Command) -> drv_command(Port, Command, undefined).
  9. drv_command(Port, Command, R) when is_binary(Command) ->
  10.     drv_command(Port, Command, true, R);
  11. drv_command(Port, Command, R) ->
  12.     try erlang:iolist_size(Command) of
  13.         _ -> drv_command(Port, Command, true, R)
  14.     catch error:Reason -> {error, Reason}
  15.     end.
  16. drv_command(Port, Command, Validated, R) when is_port(Port) ->
  17.     try erlang:port_command(Port, Command) of
  18.         true -> drv_get_response(Port, R)
  19.     catch
  20.         error:badarg when Validated -> {error, einval};
  21.         error:badarg ->
  22.             try erlang:iolist_size(Command) of
  23.                 _ ->  {error, einval}
  24.             catch error:_ -> {error, badarg}
  25.             end;
  26.         error:Reason -> {error, Reason}
  27.     end;
  28. drv_command({Driver, Portopts}, Command, Validated, R) ->
  29.     case drv_open(Driver, Portopts) of
  30.         {ok, Port} ->
  31.             Result = drv_command(Port, Command, Validated, R),
  32.             drv_close(Port),
  33.             Result;
  34.         Error ->
  35.             Error
  36.     end.

  37. drv_get_response(Port, R) when is_list(R) ->
  38.     case drv_get_response(Port) of
  39.         ok ->
  40.             {ok, R};
  41.         {ok, Name} ->
  42.             drv_get_response(Port, [Name|R]);
  43.         Error ->
  44.             Error
  45.     end;
  46. drv_get_response(Port, _) ->
  47.     drv_get_response(Port).
  48. drv_get_response(Port) ->
  49.     erlang:bump_reductions(100),
  50.     receive
  51.         {Port, {data, [Response|Rest] = Data}} ->
  52.             try translate_response(Response, Rest)
  53.             catch
  54.                 error:Reason ->
  55.                     {error, {bad_response_from_port, Data, {Reason, erlang:get_stacktrace()}}}
  56.             end;
  57.         {'EXIT', Port, Reason} -> {error, {port_died, Reason}}
  58.     end.
复制代码



而gen_tcp:send最终定位到prim_inet:send,其实现如下:

  1. send(S, Data, OptList) when is_port(S), is_list(OptList) ->
  2.     ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
  3.     try erlang:port_command(S, Data, OptList) of
  4.         false -> % Port busy and nosuspend option passed
  5.             ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
  6.             {error,busy};
  7.         true ->
  8.             receive
  9.                 {inet_reply,S,Status} ->
  10.                     ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
  11.                     Status
  12.             end
  13.     catch
  14.         error:_Error ->
  15.             ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
  16.              {error,einval}
  17.     end.
  18. send(S, Data) ->
  19.     send(S, Data, []).
复制代码



他们都遵从了上面的编写模式。这个模式看起来并没有什么,但是应用在异步环境里面,却会引起一些问题。
上述模式最根本的问题在于,port发送完命令后,紧接着进行了一次receive,而该receive过程中包含了一个match过程,这个match过程是对进程的消息队列进行遍历,直到找到可以匹配到的消息。
为了更清晰的看清楚这个问题,编写两段段简单的代码,使用erlc +"'S'" x.erl编译,生成它们的抽象码:

  1. receive_match() ->
  2.     receive
  3.         {Type, Data} -> {Type, Data}
  4.     end.

  5. receive_fetch() ->
  6.     receive
  7.         Msg -> Msg
  8.     end.
复制代码


抽象码为:

01{function, receive_match, 0, 2}.
02  {label,1}.
03    {func_info,{atom,rc},{atom,receive_match},0}.
04  {label,2}.
05    {loop_rec,{f,4},{x,0}}.
06    {test,is_tuple,{f,3},[{x,0}]}.
07    {test,test_arity,{f,3},[{x,0},2]}.       
08    {get_tuple_element,{x,0},0,{x,1}}.
09    {get_tuple_element,{x,0},1,{x,2}}.
10    remove_message.
11    {test_heap,3,3}.
12    {put_tuple,2,{x,0}}.
13    {put,{x,1}}.
14    {put,{x,2}}.
15    return.
16  {label,3}.
17    {loop_rec_end,{f,2}}.
18  {label,4}.
19    {wait,{f,2}}.
20
21{function, receive_fetch, 0, 6}.
22  {label,5}.
23    {func_info,{atom,rc},{atom,receive_fetch},0}.
24  {label,6}.
25    {loop_rec,{f,7},{x,0}}.
26    remove_message.
27    return.
28  {label,7}.
29    {wait,{f,6}}.

首先看receive_match的抽象码,其抽象码解释如下:
05 loop_rec指令接收消息,若进程消息队列没有消息,跳至{label,4}处wait,若有消息,将消息移动到进程堆上,绑定到变量{x,0}
06 测试消息是否为元组tuple,若不是跳至{label,3}处
07 测试消息是否为二元元组,若不是跳至{label,3}处
08-09 此时消息已经匹配,可以做后续处理,将元组的两个元绑定到{x,1}和{x,2}上
10 消息已经匹配,将其从消息队列中移除
11-15 正常处理流程
16-17 此处表示,消息队列中有消息,但是并不是最近一次receive需要的消息,此时要做的是将该消息放回到进程消息队列上,并跳转回05的{label,2}处重新取下一条消息
18-19 此处表示,消息队列中没有消息,需要重新跳转回05的{label,2}处取下一条消息
这里可以发现,receive_match函数需要扫描一遍进程消息队列,从中取出符合要求的消息;
接着看receive_fetch的抽象码,其抽象码解释如下:
24-29 若消息队列中没有消息,则继续等待,若有消息,则取出消息,将消息移动到进程堆上,绑定到变量{x,0};
这里可以发现,receive_fetch不会扫描进程消息队列,直接取出下一条消息。
上述抽象码对应的erlang虚拟机c代码此处不再分析,其执行过程即如上所述,有兴趣的读者可以自行查阅。

这里便可以解释之前场景的问题了:
consumer的进程内部调用file:write或gen_tcp:send,然后将形成一个receive_match的模式,遍历进程消息队列,然后取出需要的消息;另一方面,生产者却没有任何顾忌的往consumer进程中投递消息,如果处理(也即调用file:write或gen_tcp:send)速度慢于消息投递速度(通常情况下总是如此,gen_server:cast的qps在本地可以轻松达到40-80w),则consumer的消息队列将不断增大,receive_match模式却仍然需要遍历整个消息队列,从而导致处理速度进一步下降,消息队列进一步增大,陷入一个恶性循环。
实际应用中,也有会遭遇这种典型的场景:进程允许异步投递, 但进程内部有调用port(receive_match)的模式出现 。

未完待续...

回复

使用道具 举报

发表于 2019-10-16 16:31:05 | 显示全部楼层
把消费者进程的消息邮箱的消息先全部读出来,缓存起来,自己模拟个消息队列来处理,可以减少port发送完后,receive 对消息邮箱的扫描量。
性能应该会有所提升。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|小黑屋|Erldoc.com  

GMT+8, 2024-3-29 01:22 , Processed in 0.189420 second(s), 7 queries , File On.

Powered by Discuz! X3.3

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表