Erlang中文论坛

标题: erlang NIF部分接口实现(五)复用driver功能的接口 [打印本页]

作者: dreamxyp    时间: 2013-10-21 19:21
标题: erlang NIF部分接口实现(五)复用driver功能的接口
NIF除了自身提供的功能外,还封装了一系列driver的功能,这些功能与操作系统平台紧密相关,主要包括:
系统信息, 操作系统线程及线程私有资源 ,条件变量、信号量、读写锁等,这些功能本身与erlang的进程体系无关,本身也是线程安全的,因此可以直接复用到NIF中,统一NIF的接口。
此处简单的分析一个操作系统线程创建的接口,其余的类似。

  1. int enif_thread_create(char *name, ErlNifTid *tid, void* (*func)(void *),
  2.                        void *args, ErlNifThreadOpts *opts) {
  3.     return erl_drv_thread_create(name,tid,func,args,(ErlDrvThreadOpts*)opts);
  4. }
  5. enif_thread_create直接调用了对应的driver接口erl_drv_thread_create,其它复用driver功能的接口也类似。
  6. int
  7. erl_drv_thread_create(char *name,
  8.                       ErlDrvTid *tid,
  9.                       void* (*func)(void*),
  10.                       void* arg,
  11.                       ErlDrvThreadOpts *opts)
  12. {
  13. #ifdef USE_THREADS
  14.     int res;
  15.     struct ErlDrvTid_ *dtid;
  16.     ethr_thr_opts ethr_opts;
  17.     ethr_thr_opts *use_opts;

  18.     if (!opts)
  19.         use_opts = NULL;
  20.     else {
  21.         sys_memcpy((void *) ðr_opts,
  22.                    (void *) &def_ethr_opts,
  23.                    sizeof(ethr_thr_opts));
  24.         ethr_opts.suggested_stack_size = opts->suggested_stack_size;
  25.         use_opts = ðr_opts;
  26.     }

  27.     dtid = erts_alloc_fnf(ERTS_ALC_T_DRV_TID,
  28.                           (sizeof(struct ErlDrvTid_)
  29.                            + (name ? sys_strlen(name) + 1 : 0)));
  30.     /* 分配一个ErlDrvTid_结构,保存线程描述符 */
  31.     /*
  32.         struct ErlDrvTid_ {
  33.             ethr_tid tid;
  34.             void* (*func)(void*);
  35.             void* arg;
  36.             int drv_thr;
  37.             Uint tsd_len;
  38.             void **tsd;
  39.             char *name;
  40.         };
  41.      */

  42.     if (!dtid)
  43.         return ENOMEM;

  44.     dtid->drv_thr = 1;
  45.     dtid->func = func;
  46.     dtid->arg = arg;
  47.     dtid->tsd = NULL;
  48.     dtid->tsd_len = 0;

  49.     /* 填充线程描述符,func为线程执行入口点,arg为其参数,tsd为线程私有数据结构,可以保存线程私有数据,与线程紧密相关 */

  50.     if (!name)
  51.         dtid->name = no_name;
  52.     else {
  53.         dtid->name = ((char *) dtid) + sizeof(struct ErlDrvTid_);
  54.         sys_strcpy(dtid->name, name);
  55.     }
  56.     res = ethr_thr_create(&dtid->tid, erl_drv_thread_wrapper, dtid, use_opts);

  57.     /* 创建一个操作系统线程,该线程并不直接以用户提供的函数为入口点,而是提供了一层外覆函数, 外覆 函数为erl_drv_thread_wrapper */

  58.     if (res != 0) {
  59.         erts_free(ERTS_ALC_T_DRV_TID, dtid);
  60.         return res;
  61.     }

  62.     *tid = (ErlDrvTid) dtid;

  63.     /* 返回创建的线程描述符 */

  64.     return 0;
  65. #else
  66.     return ENOTSUP;
  67. #endif
  68. }
  69. int ethr_thr_create(ethr_tid *tid, void * (*func)(void *), void *arg, ethr_thr_opts *opts)
  70. {
  71.     ethr_thr_wrap_data__ twd;
  72.     pthread_attr_t attr;
  73.     int res, dres;
  74.     int use_stack_size = (opts && opts->suggested_stack_size >= 0
  75.                           ? opts->suggested_stack_size
  76.                           : -1 /* Use system default */);

  77. #ifdef ETHR_MODIFIED_DEFAULT_STACK_SIZE
  78.     if (use_stack_size < 0)
  79.         use_stack_size = ETHR_MODIFIED_DEFAULT_STACK_SIZE;
  80. #endif

  81. #if ETHR_XCHK
  82.     if (ethr_not_completely_inited__) {
  83.         ETHR_ASSERT(0);
  84.         return EACCES;
  85.     }
  86.     if (!tid || !func) {
  87.         ETHR_ASSERT(0);
  88.         return EINVAL;
  89.     }
  90. #endif

  91.     ethr_atomic32_init(&twd.result, (ethr_sint32_t) -1);
  92.     twd.tse = ethr_get_ts_event();
  93.     twd.thr_func = func;
  94.     twd.arg = arg;

  95.     res = pthread_attr_init(&attr);
  96.     if (res != 0)
  97.         return res;

  98.     /* Error cleanup needed after this point */

  99.     /* Schedule child thread in system scope (if possible) ... */
  100.     res = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
  101.     if (res != 0 && res != ENOTSUP)
  102.         goto error;

  103.     if (use_stack_size >= 0) {
  104.         size_t suggested_stack_size = (size_t) use_stack_size;
  105.         size_t stack_size;
  106. #ifdef ETHR_DEBUG
  107.         suggested_stack_size /= 2; /* Make sure we got margin */
  108. #endif
  109. #ifdef ETHR_STACK_GUARD_SIZE
  110.         /* The guard is at least on some platforms included in the stack size
  111.            passed when creating threads */
  112.         suggested_stack_size += ETHR_B2KW(ETHR_STACK_GUARD_SIZE);
  113. #endif
  114.         if (suggested_stack_size < ethr_min_stack_size__)
  115.             stack_size = ETHR_KW2B(ethr_min_stack_size__);
  116.         else if (suggested_stack_size > ethr_max_stack_size__)
  117.             stack_size = ETHR_KW2B(ethr_max_stack_size__);
  118.         else
  119.             stack_size = ETHR_PAGE_ALIGN(ETHR_KW2B(suggested_stack_size));
  120.         (void) pthread_attr_setstacksize(&attr, stack_size);
  121.     }

  122. #ifdef ETHR_STACK_GUARD_SIZE
  123.     (void) pthread_attr_setguardsize(&attr, ETHR_STACK_GUARD_SIZE);
  124. #endif

  125.     /* Detached or joinable... */
  126.     res = pthread_attr_setdetachstate(&attr,
  127.                                       (opts && opts->detached
  128.                                        ? PTHREAD_CREATE_DETACHED
  129.                                        : PTHREAD_CREATE_JOINABLE));
  130.     if (res != 0)
  131.         goto error;

  132.     /* Call prepare func if it exist */
  133.     if (ethr_thr_prepare_func__)
  134.         twd.prep_func_res = ethr_thr_prepare_func__();
  135.     else
  136.         twd.prep_func_res = NULL;

  137.     res = pthread_create((pthread_t *) tid, &attr, thr_wrapper, (void*) &twd);

  138.     if (res == 0) {
  139.         int spin_count = child_wait_spin_count;

  140.         /* Wait for child to initialize... */
  141.         while (1) {
  142.             ethr_sint32_t result;
  143.             ethr_event_reset(&twd.tse->event);

  144.             result = ethr_atomic32_read(&twd.result);
  145.             if (result == 0)
  146.                 break;

  147.             if (result > 0) {
  148.                 res = (int) result;
  149.                 goto error;
  150.             }

  151.             res = ethr_event_swait(&twd.tse->event, spin_count);
  152.             if (res != 0 && res != EINTR)
  153.                 goto error;
  154.             spin_count = 0;
  155.         }
  156.     }

  157.     /* Cleanup... */

  158. error:
  159.     dres = pthread_attr_destroy(&attr);
  160.     if (res == 0)
  161.         res = dres;
  162.     if (ethr_thr_parent_func__)
  163.         ethr_thr_parent_func__(twd.prep_func_res);
  164.     return res;
  165. }
复制代码


可以看出,线程创建过程是一个标准的pthread线程创建框架。
  1. static void *erl_drv_thread_wrapper(void *vdtid)
  2. {
  3.     int res;
  4.     struct ErlDrvTid_ *dtid = (struct ErlDrvTid_ *) vdtid;
  5.     res = ethr_tsd_set(tid_key, vdtid);
  6.     if (res != 0)
  7.         fatal_error(res, "erl_drv_thread_wrapper()");
  8.     return (*dtid->func)(dtid->arg);
  9. }
  10. int
  11. ethr_tsd_set(ethr_tsd_key key, void *value)
  12. {
  13. #if ETHR_XCHK
  14.     if (ethr_not_inited__) {
  15.         ETHR_ASSERT(0);
  16.         return EACCES;
  17.     }
  18. #endif
  19.     return pthread_setspecific((pthread_key_t) key, value);
  20. }
复制代码



外覆函数通过pthread_setspecific为线程设置kv私有数据结构,这个数据结构是线程的erlang线程描述符ErlDrvTid,然后以用户给出的参数调用用户的函数。
进入NIF世界后,就可以借助很多erlang虚拟机的功能来探索erlang世界了,此时既可以通过erlang来实现功能,也可以通过c来实现功能,可以将各式各样的c系统移植如erlang虚拟机,扩充和丰富erlang的功能,同时减少开发的难度,性能也将得到很大的提高。






欢迎光临 Erlang中文论坛 (https://bbs.erldoc.com/) Powered by Discuz! X3.3