设为首页收藏本站

Erlang中文论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 27753|回复: 0

erlang NIF部分接口实现(四)消息发送

[复制链接]
发表于 2013-10-21 19:19:59 | 显示全部楼层 |阅读模式
erlang中不能没有消息和异步过程,NIF也必须有此项能力,这个能力是通过enif_send实现的,它可以在NIF中向一个进程发送消息,但由于消息本身需要跨进程传递,消息的生命周期可能很长,而在erlang NIF部分接口实现(一)中可以看到,NIF每次调用所使用的ErlNifEnv结构是位于process_main函数的栈上的,由这个ErlNifEnv结构分配消息所占用的内存是不可能的,因此需要一个长期存在的ErlNifEnv结构来回收消息的内存,而ErlNifEnv结构是附着于一个进程的,同时也需要一个Process结构,产生分配内存的堆。
为了构建这个长期存在的ErlNifEnv结构,需要能够动态的分配ErlNifEnv结构:

  1. struct enif_msg_environment_t
  2. {
  3.     ErlNifEnv env;
  4.     Process phony_proc;
  5. };


  6. ErlNifEnv* enif_alloc_env(void)
  7. {
  8.     struct enif_msg_environment_t* msg_env =
  9.         erts_alloc_fnf(ERTS_ALC_T_NIF, sizeof(struct enif_msg_environment_t));
  10.     Eterm* phony_heap = (Eterm*) msg_env; /* dummy non-NULL ptr */
  11.        
  12.     msg_env->env.hp = phony_heap;
  13.     msg_env->env.hp_end = phony_heap;
  14.     msg_env->env.heap_frag = NULL;
  15.     msg_env->env.mod_nif = NULL;
  16.     msg_env->env.tmp_obj_list = NULL;
  17.     msg_env->env.proc = &msg_env->phony_proc;
  18.     memset(&msg_env->phony_proc, 0, sizeof(Process));
  19.     HEAP_START(&msg_env->phony_proc) = phony_heap;
  20.     HEAP_TOP(&msg_env->phony_proc) = phony_heap;
  21.     HEAP_LIMIT(&msg_env->phony_proc) = phony_heap;
  22.     HEAP_END(&msg_env->phony_proc) = phony_heap;
  23.     MBUF(&msg_env->phony_proc) = NULL;
  24.     msg_env->phony_proc.id = ERTS_INVALID_PID;
  25. #ifdef FORCE_HEAP_FRAGS
  26.     msg_env->phony_proc.space_verified = 0;
  27.     msg_env->phony_proc.space_verified_from = NULL;
  28. #endif
  29.     return &msg_env->env;
  30. }
复制代码


该函数将在内存中产生一个enif_msg_environment_t结构,它包含两个成员,env和 phony_proc,env即为长期ErlNifEnv结构,而phony_proc即为env附着的伪Process结构,env借助phony_proc的堆分配消息所占的内存。
任何需要发送的消息的term,都必须通过这个动态产生的env分配,而之后调用enif_send发送消息时,消息及其动态env必须一致。
可以通过enif_free_env来释放enif_msg_environment_t结构:
  1. void enif_free_env(ErlNifEnv* env)
  2. {
  3.     enif_clear_env(env);
  4.     erts_free(ERTS_ALC_T_NIF, env);
  5. }
复制代码




也可以通过enif_clear_env来清洗一个ErlNifEnv结构以重用:

  1. void enif_clear_env(ErlNifEnv* env)
  2. {
  3.     struct enif_msg_environment_t* menv = (struct enif_msg_environment_t*)env;
  4.     Process* p = &menv->phony_proc;
  5.     ASSERT(p == menv->env.proc);
  6.     ASSERT(p->id == ERTS_INVALID_PID);
  7.     ASSERT(MBUF(p) == menv->env.heap_frag);
  8.     if (MBUF(p) != NULL) {
  9.         erts_cleanup_offheap(&MSO(p));
  10.         clear_offheap(&MSO(p));
  11.         free_message_buffer(MBUF(p));
  12.         MBUF(p) = NULL;
  13.         menv->env.heap_frag = NULL;
  14.     }
  15.     ASSERT(HEAP_TOP(p) == HEAP_END(p));
  16.     menv->env.hp = menv->env.hp_end = HEAP_TOP(p);

  17.     ASSERT(!is_offheap(&MSO(p)));
  18.     free_tmp_objs(env);
  19. }
复制代码


有了这个长期存在的ErlNifEnv结构,就可以利用它来产生消息所需要的内存并发送该消息了:

  1. int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ErlNifEnv* msg_env, ERL_NIF_TERM msg)
  2. {
  3.     struct enif_msg_environment_t* menv = (struct enif_msg_environment_t*)msg_env;
  4.     ErtsProcLocks rp_locks = 0;
  5.     Process* rp;
  6.     Process* c_p;
  7.     ErlHeapFragment* frags;
  8. #if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP)
  9.     ErtsProcLocks rp_had_locks;
  10. #endif
  11.     Eterm receiver = to_pid->pid;
  12.     int flush_me = 0;

  13.     if (env != NULL) {
  14.         c_p = env->proc;
  15.         if (receiver == c_p->id) {
  16.             rp_locks = ERTS_PROC_LOCK_MAIN;
  17.             flush_me = 1;
  18.         }
  19.     }
  20.     else {
  21. #ifdef ERTS_SMP
  22.         c_p = NULL;
  23. #else
  24.         erl_exit(ERTS_ABORT_EXIT,"enif_send: env==NULL on non-SMP VM");
  25. #endif
  26.     }   

  27. #if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP)
  28.     rp_had_locks = rp_locks;
  29. #endif
  30.     rp = erts_pid2proc_opt(c_p, ERTS_PROC_LOCK_MAIN,
  31.                            receiver, rp_locks, ERTS_P2P_FLG_SMP_INC_REFC);

  32.     /* 临时增加消息目的进程的引用计数,防止在发送途中目的进程被销毁。 */

  33.     if (rp == NULL) {
  34.         ASSERT(env == NULL || receiver != c_p->id);
  35.         return 0;
  36.     }
  37.     flush_env(msg_env);
  38.     frags = menv->env.heap_frag;
  39.     ASSERT(frags == MBUF(&menv->phony_proc));
  40.     if (frags != NULL) {
  41.         /* Move all offheap's from phony proc to the first fragment.
  42.            Quick and dirty, but erts_move_msg_mbuf_to_heap doesn't care. */
  43.         ASSERT(!is_offheap(&frags->off_heap));
  44.         frags->off_heap = MSO(&menv->phony_proc);
  45.         clear_offheap(&MSO(&menv->phony_proc));
  46.         menv->env.heap_frag = NULL;
  47.         MBUF(&menv->phony_proc) = NULL;
  48.     }
  49.     ASSERT(!is_offheap(&MSO(&menv->phony_proc)));

  50.     if (flush_me) {       
  51.         flush_env(env); /* Needed for ERTS_HOLE_CHECK */
  52.     }
  53.     erts_queue_message(rp, &rp_locks, frags, msg, am_undefined
  54. #ifdef USE_VM_PROBES
  55.                        , NIL
  56. #endif
  57.                        );

  58.     /* 这是erlang内部由于将消息投递到目的进程消息队列的函数 */

  59.     if (rp_locks) {       
  60.         ERTS_SMP_LC_ASSERT(rp_locks == (rp_had_locks | (ERTS_PROC_LOCK_MSGQ |
  61.                                                         ERTS_PROC_LOCK_STATUS)));
  62.         erts_smp_proc_unlock(rp, (ERTS_PROC_LOCK_MSGQ | ERTS_PROC_LOCK_STATUS));
  63.     }
  64.     erts_smp_proc_dec_refc(rp);
  65.     if (flush_me) {
  66.         cache_env(env);
  67.     }
  68.     return 1;
  69. }
复制代码


该接口的to_pid参数即为消息的目的进程,msg_env即为通过enif_alloc_env所产生的长期ErlNifEnv,通过前面的代码分析可以发现,该结构实际嵌入一个enif_msg_environment_t结构,msg参数是需要发送的消息,它是由msg_env所分配的term,消息将通过erts_queue_message转移到目的进程的消息队列中。
在有了异步消息投递能力后,NIF可以的功能将极大的丰富。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|小黑屋|Erldoc.com  

GMT+8, 2022-5-19 17:22 , Processed in 0.540954 second(s), 7 queries , File On.

Powered by Discuz! X3.3

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表