您好,欢迎访问一九零五行业门户网

deno通信实现的方法(附代码)

本篇文章给大家带来的内容是关于deno通信实现的方法(附代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
通信方式
deno执行代码和node相似,包含同步和异步的方式, 异步方式通过promise.then实现。
typescript/javascript调用rust
在上一节中讲到deno的启动时会初始化v8 isolate实例,在初始化的过程中,会将c++的函数绑定到v8 isolate的实例上,在v8执行javascript代码时,可以像调用javascript函数一样调用这些绑定的函数。具体的绑定实现如下:
void initializecontext(v8::isolate* isolate, v8::local<v8::context> context) {  v8::handlescope handle_scope(isolate);  v8::context::scope context_scope(context);  auto global = context->global();  auto deno_val = v8::object::new(isolate);  check(global->set(context, deno::v8_str(libdeno), deno_val).fromjust());  auto print_tmpl = v8::functiontemplate::new(isolate, print);  auto print_val = print_tmpl->getfunction(context).tolocalchecked();  check(deno_val->set(context, deno::v8_str(print), print_val).fromjust());  auto recv_tmpl = v8::functiontemplate::new(isolate, recv);  auto recv_val = recv_tmpl->getfunction(context).tolocalchecked();  check(deno_val->set(context, deno::v8_str(recv), recv_val).fromjust());  auto send_tmpl = v8::functiontemplate::new(isolate, send);  auto send_val = send_tmpl->getfunction(context).tolocalchecked();  check(deno_val->set(context, deno::v8_str(send), send_val).fromjust());  auto eval_context_tmpl = v8::functiontemplate::new(isolate, evalcontext);  auto eval_context_val =      eval_context_tmpl->getfunction(context).tolocalchecked();  check(deno_val->set(context, deno::v8_str(evalcontext), eval_context_val)            .fromjust());  auto error_to_json_tmpl = v8::functiontemplate::new(isolate, errortojson);  auto error_to_json_val =      error_to_json_tmpl->getfunction(context).tolocalchecked();  check(deno_val->set(context, deno::v8_str(errortojson), error_to_json_val)            .fromjust());  check(deno_val->setaccessor(context, deno::v8_str(shared), shared)            .fromjust());}
在完成绑定之后,在typescript中可以通过如下代码实现c++方法和typescript方法的映射
libdeno.tsinterface libdeno {  recv(cb: messagecallback): void;  send(control: arraybufferview, data?: arraybufferview): null | uint8array;  print(x: string, iserr?: boolean): void;  shared: arraybuffer;  /** evaluate provided code in the current context.   * it differs from eval(...) in that it does not create a new context.   * returns an array: [output, errinfo].   * if an error occurs, `output` becomes null and `errinfo` is non-null.   */  // eslint-disable-next-line @typescript-eslint/no-explicit-any  evalcontext(code: string): [any, evalerrorinfo | null];  errortojson: (e: error) => string;}export const libdeno = window.libdeno as libdeno;
在执行typescript代码时,只需要引入libdeno,就直接调用c++方法,例如:
import { libdeno } from ./libdeno;function sendinternal(  builder: flatbuffers.builder,  innertype: msg.any,  inner: flatbuffers.offset,  data: undefined | arraybufferview,  sync = true): [number, null | uint8array] {  const cmdid = nextcmdid++;  msg.base.startbase(builder);  msg.base.addinner(builder, inner);  msg.base.addinnertype(builder, innertype);  msg.base.addsync(builder, sync);  msg.base.addcmdid(builder, cmdid);  builder.finish(msg.base.endbase(builder));  const res = libdeno.send(builder.asuint8array(), data);  builder.inuse = false;  return [cmdid, res];}
调用libdeno.send方法可以将数据传给c++,然后通过c++去调用rust代码实现具体的工程操作。
typescript层同步异步实现同步在typescript中只需要设置sendinternal方法的sync参数为true即可,在rust中会根据sync参数去判断是执行同步或者异步操作,如果sync为true,libdeono.send方法会返回执行的结果,rust和typescript之间传递数据需要将数据序列化,这里序列化操作使用的是flatbuffer库。
const [cmdid, resbuf] = sendinternal(builder, innertype, inner, data, true);
异步实现同理,实现异步方式,只需要设置sync参数为false即可,但是异步操作和同步相比,多了回掉方法,在执行异步通信时,libdeno.send方法会返回一个唯一的cmdid标志这次调用操作。同时在异步通信完成后,会创建一个promise对象,将cmdid作为key,promise作为value,加入map中。代码如下:
const [cmdid, resbuf] = sendinternal(builder, innertype, inner, data, false);  util.assert(resbuf == null);  const promise = util.createresolvable<msg.base>();  promisetable.set(cmdid, promise);  return promise;
rust实现同步和异步当在typescript中调用libdeno.send方法时,调用了c++文件binding.cc中的send方法,该方法是在deno初始化时绑定到v8 isolate上去的。在send方法中去调用了ops.rs文件中的dispatch方法,该方法实现了消息到函数的映射。每个类型的消息对应了一种函数,例如读文件消息对应了读文件的函数。
pub fn dispatch(  isolate: &isolate,  control: libdeno::deno_buf,  data: libdeno::deno_buf,) -> (bool, box<op>) {  let base = msg::get_root_as_base(&control);  let is_sync = base.sync();  let inner_type = base.inner_type();  let cmd_id = base.cmd_id();  let op: box<op> = if inner_type == msg::any::settimeout {    // settimeout is an exceptional op: the global timeout field is part of the    // isolate state (not the isolatestate state) and it must be updated on the    // main thread.    assert_eq!(is_sync, true);    op_set_timeout(isolate, &base, data)  } else {    // handle regular ops.    let op_creator: opcreator = match inner_type {      msg::any::accept => op_accept,      msg::any::chdir => op_chdir,      msg::any::chmod => op_chmod,      msg::any::close => op_close,      msg::any::fetchmodulemetadata => op_fetch_module_meta_data,      msg::any::copyfile => op_copy_file,      msg::any::cwd => op_cwd,      msg::any::dial => op_dial,      msg::any::environ => op_env,      msg::any::exit => op_exit,      msg::any::fetch => op_fetch,      msg::any::formaterror => op_format_error,      msg::any::listen => op_listen,      msg::any::maketempdir => op_make_temp_dir,      msg::any::metrics => op_metrics,      msg::any::mkdir => op_mkdir,      msg::any::open => op_open,      msg::any::readdir => op_read_dir,      msg::any::readfile => op_read_file,      msg::any::readlink => op_read_link,      msg::any::read => op_read,      msg::any::remove => op_remove,      msg::any::rename => op_rename,      msg::any::replreadline => op_repl_readline,      msg::any::replstart => op_repl_start,      msg::any::resources => op_resources,      msg::any::run => op_run,      msg::any::runstatus => op_run_status,      msg::any::setenv => op_set_env,      msg::any::shutdown => op_shutdown,      msg::any::start => op_start,      msg::any::stat => op_stat,      msg::any::symlink => op_symlink,      msg::any::truncate => op_truncate,      msg::any::workergetmessage => op_worker_get_message,      msg::any::workerpostmessage => op_worker_post_message,      msg::any::write => op_write,      msg::any::writefile => op_write_file,      msg::any::now => op_now,      msg::any::istty => op_is_tty,      msg::any::seek => op_seek,      msg::any::permissions => op_permissions,      msg::any::permissionrevoke => op_revoke_permission,      _ => panic!(format!(        unhandled message {},        msg::enum_name_any(inner_type)      )),    };    op_creator(&isolate, &base, data)  };  // ...省略多余的代码}
在每个类型的函数中会根据在typescript中调用libdeo.send方法时传入的sync参数值去判断同步执行还是异步执行。
let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf);
同步执行在执行dispatch方法后,会返回is_sync的变量,如果is_sync为true,表示该方法是同步执行的,op表示返回的结果。rust代码会调用c++文件api.cc中的deno_respond方法,将执行结果同步回去,deno_respond方法中根据current_args_的值去判断是否为同步消息,如果current_args_存在值,则直接返回结果。
异步执行在deno中,执行异步操作是通过rust的tokio模块来实现的,在调用dispatch方法后,如果是异步操作,is_sync的值为false,op不再是执行结果,而是一个执行函数。通过tokio模块派生一个线程程异步去执行该函数。
    let task = op      .and_then(move |buf| {        let sender = tx; // tx is moved to new thread        sender.send((zero_copy_id, buf)).expect(tx.send error);        ok(())      }).map_err(|_| ());    tokio::spawn(task);
在deno初始化时,会创建一个管道,代码如下:
let (tx, rx) = mpsc::channel::<(usize, buf)>();
管道可以实现不同线程之间的通信,由于异步操作是创建了一个新的线程去执行的,所以子线程无法直接和主线程之间通信,需要通过管道的机制去实现。在异步代码执行完成后,调用tx.send方法将执行结果加入管道里面,event loop会每次从管道里面去读取结果返回回去。
event loop由于异步操作依赖事件循环,所以先解释一下deno中的事件循环,其实事件循环很简单,就是一段循环执行的代码,当达到条件后,事件循环会结束执行,deno中主要的事件循环代码实现如下:
pub fn event_loop(&self) -> result<(), jserror> {    // main thread event loop.    while !self.is_idle() {      match recv_deadline(&self.rx, self.get_timeout_due()) {        ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),        err(mpsc::recvtimeouterror::timeout) => self.timeout(),        err(e) => panic!(recv_deadline() failed: {:?}, e),      }      self.check_promise_errors();      if let some(err) = self.last_exception() {        return err(err);      }    }    // check on done    self.check_promise_errors();    if let some(err) = self.last_exception() {      return err(err);    }    ok(())  }
self.is_idle方法用来判断是否所有的异步操作都执行完毕,当所有的异步操作都执行完毕后,停止事件循环,is_idle方法代码如下:
fn is_idle(&self) -> bool {    self.ntasks.get() == 0 && self.get_timeout_due().is_none()  }
当产生一次异步方法调用时,会调用下面的方法,使ntasks内部的值加1,
fn ntasks_increment(&self) {    assert!(self.ntasks.get() >= 0);    self.ntasks.set(self.ntasks.get() + 1);  }
在event loop循环中,每次从管道中去取值,这里event loop充消费者,执行异步方法的子线程充当生产者。如果在一次事件循环中,获取到了一次执行结果,那么会调用ntasks_decrement方法,使ntasks内部的值减1,当ntasks的值为0的时候,事件循环会退出执行。在每次循环中,将管道中取得的值作为参数,调用complete_op方法,将结果返回回去。
rust中将异步操作结果返回回去在初始化v8实例时,绑定的c++方法中有一个recv方法,该方法的作用时暴露一个typescript的函数给rust,在deno的io.ts文件的start方法中执行libdeno.recv(handleasyncmsgfromrust),将handleasyncmsgfromrust函数通过c++方法暴露给rust。具体实现如下:
export function start(source?: string): msg.startres {  libdeno.recv(handleasyncmsgfromrust);  // first we send an empty `start` message to let the privileged side know we  // are ready. the response should be a `startres` message containing the cli  // args and other info.  const startresmsg = sendstart();  util.setlogdebug(startresmsg.debugflag(), source);  setglobals(startresmsg.pid(), startresmsg.nocolor(), startresmsg.execpath()!);  return startresmsg;}
当异步操作执行完成后,可以在rust中直接调用handleasyncmsgfromrust方法,将结果返回给typescript。先看一下handleasyncmsgfromrust方法的实现细节:
export function handleasyncmsgfromrust(ui8: uint8array): void {  // if a the buffer is empty, recv() on the native side timed out and we  // did not receive a message.  if (ui8 && ui8.length) {    const bb = new flatbuffers.bytebuffer(ui8);    const base = msg.base.getrootasbase(bb);    const cmdid = base.cmdid();    const promise = promisetable.get(cmdid);    util.assert(promise != null, `expecting promise in table. ${cmdid}`);    promisetable.delete(cmdid);    const err = errors.maybeerror(base);    if (err != null) {      promise!.reject(err);    } else {      promise!.resolve(base);    }  }  // fire timers that have become runnable.  firetimers();}
从代码handleasyncmsgfromrust方法的实现中可以知道,首先通过flatbuffer反序列化返回的结果,然后获取返回结果的cmdid,根据cmdid获取之前创建的promise对象,然后调用promise.resolve方法触发promise.then中的代码执行。
以上就是deno通信实现的方法(附代码)的详细内容。
其它类似信息

推荐信息