ITPub博客

首页 > 区块链 > 区块链 > Envoy原始码分析之调度员

Envoy原始码分析之调度员

区块链 作者:zjgc002 时间:2020-08-28 15:28:20 0 删除 编辑

调度员

在特使代码的中  Dispatcher  的英文随处可见的,可以说在特使中有着举足轻重的地位,一个  Dispatcher  就是一个  ,其承担了任务队列,网络事件处理,定时器,信号处理等核心功能。在  特使线程模型  这篇文章所提到的  EventLoop  (  Each worker thread runs a “non-blocking” event loop  )指的就是这个  Dispatcher  对象。这个部分的代码相对较独立,和其他模块关联也比较少,但实际上却不言而喻。下面是与  Dispatcher  相关的类图,在接下来的会议对其中的关键概念进行介绍。

dispatch.png

调度员和Libevent

Dispatcher  本质上就是一个  ,特使并没有重新实现,而是复用了LIBEVENT中的  event_base  ,在LIBEVENT的基础上进行了二次封装并抽象出一些事件类 ,比如  FileEvent  ,  SignalEvent  ,  Timer  等.Libevent是一个Ç库,而Envoy是C ++,为了避免手动管理这些C结构的内存,Envoy通过继承  unique_ptr  的方式重新封装了这些libevent暴露出来的C结构。

template <class T,void(* deleter)(T *)> class CSmartPtr:public std :: unique_ptr <T,void(*)(T *)> {public:  CSmartPtr():std :: unique_ptr <T,void(*)(T *)>(nullptr,deleter){}  CSmartPtr(T * object):std :: unique_ptr <T,void(*)(T *)>(object,deleter){}};

通过  CSmartPtr  就可以将Libevent中的一些C数据结构的内存通过  机制自动管理起来,使用方式如下:

extern“ C” {void event_base_free(event_base *);} struct evbuffer; extern“ C” {void evbuffer_free(evbuffer *);}..... typedef CSmartPtr <event_base,event_base_free> BasePtr; typedef CSmartPtr <evbuffer,evbuffer_free> BufferPtr; typedef CSmartPtr <bufferevent,bufferevent_free> BufferEventPtr; typedef CSmartPtr <evconnlistener,evconnlistener_free> ListenerPtr;

在Libevent中无论是计时器到期,收到信号,还是文件可读写等都是事件,统一使用  event  类型来表示,Envoy中则将  event  作为  ImplBase  的成员,然后让所有的事件类型的对象都继承  ImplBase  ,从而实现了事件的抽象。

ImplBase类{受保护:  〜ImplBase(); 事件raw_event_;};

SignalEvent

SignalEvent的实现很简单,通过  evsignal_assign  来初始化事件,然后通过  evsignal_add  添加事件使事件成为未决状态(关于Libevent事件状态见附录)。

class SignalEventImpl:public SignalEvent,ImplBase {public:// signal_num:要设置的信号值  // cb:信号事件的处理函数  SignalEventImpl(DispatcherImpl&dispatcher,int signal_num,SignalCb cb);私有:  SignalCb cb_;};SignalEventImpl :: SignalEventImpl(DispatcherImpl&dispatcher,                                  int signal_num,SignalCb cb):cb_(cb){  evsignal_assign(      &raw_event_,&dispatcher.base(),signal_num,      [](evutil_socket_t,short,void * arg)-> void {           static_cast <SignalEventImpl *>(arg)-> cb_(); },      这个);  evsignal_add(&raw_event_,nullptr);}

计时器

Timer事件暴露了两个接口一个用于关闭Timer,另外一个则用于启动Timer,需要传递一个时间来设置Timer的到期时间间隔。

类Timer {public:virtual〜Timer(){} virtual void disableTimer()PURE; 虚拟无效enableTimer(const std :: chrono :: milliseconds&d)PURE;};

创建Timer的时候会通过  evtimer_assgin  对事件进行初始化,这个时候事件还处于未决状态而不会触发,需要通过  event_add  添加到  Dispatcher  中才能被触发。

class TimerImpl:public Timer,ImplBase {public:  TimerImpl(Libevent :: BasePtr&libevent,TimerCb cb); //计时器  void disableTimer()覆盖;void enableTimer(const std :: chrono :: milliseconds&d)覆盖;私有:  TimerCb cb_;};TimerImpl :: TimerImpl(DispatcherImpl&dispatcher,TimerCb cb):cb_(cb){  ASSERT(cb_);  evtimer_assign(      &raw_event_,&dispatcher.base(),      [](evutil_socket_t,short,void * arg)-> void {           static_cast <TimerImpl *>(arg)-> cb_(); },       这个);}

disableTimer  被调用时其内部会召唤  event_del  删除事件,使事件成为非未决状态,  enableTimer  被调用时则间接调用  event_add  使事件成为未决状态,这样一旦超时时间到了就会触发超时事件。

无效的TimerImpl :: disableTimer(){event_del(&raw_event_); }无效的TimerImpl :: enableTimer(const std :: chrono :: milliseconds&d){if(d.count()== 0){    event_active(&raw_event_,EV_TIMEOUT,0);  }其他{    std :: chrono :: microseconds我们=           std :: chrono :: duration_cast <std :: chrono :: microseconds>(d);    定时电视;    tv.tv_sec = us.count()/ 1000000;    tv.tv_usec = us.count()%1000000;    event_add(&raw_event_,&tv);  }}

上面的代码在计算  timer  时间  timeval  的时候实现的不确定优雅,应该避免使用像  1000000  这样的不连续的性的数字常量,社区中有人建议可以改成如下的形式。

自动秒= std :: chrono :: duration_cast <std :: chrono :: seconds>(d);自动usecs =   std :: chrono :: duration_cast <std :: chrono :: microseconds>(d-秒);tv.tv_secs = secs.count();tv.tv_usecs = usecs.count();

FileEvent

socket  专有相关事件被封装为  FileEvent  ,其上暴露了二个接口:  activate  用于主动触发事件,典型的使用场景场景:触发EventLoop,Write Buffer有数据,可以主动触发下可写事件(Envoy中的典型使用场景)等;  setEnabled  用于设置事件类型,将事件添加到  EventLoop  中转换成为未决状态。

void FileEventImpl :: activate(uint32_t events){int libevent_events = 0; 如果(事件&FileReadyType :: Read){    libevent_events | = EV_READ;  } if(events&FileReadyType :: Write){    libevent_events | = EV_WRITE;  } if(events&FileReadyType :: Closed){    libevent_events | = EV_CLOSED;  }  ASSERT(libevent_events);  event_active(&raw_event_,libevent_events,0);} void FileEventImpl :: setEnabled(uint32_t events){  event_del(&raw_event_);  AssignEvents(事件);  event_add(&raw_event_,nullptr);}

任务等级

Dispatcher  的内部有一个任务类别,也会创建一个线程专有的人处理任务类别中的任务。通过  Dispatcher  的  post  方法可以将任务投递到任务类别中,交给内部  Dispatcher  的线程去处理。

无效DispatcherImpl :: post(std :: function <void()>回调){bool do_post;  {    线程:: LockGuard lock(post_lock_);    do_post = post_callbacks_.empty();    post_callbacks_.push_back(callback);  }如果(do_post){    post_timer _-> enableTimer(std :: chrono ::毫秒(0));  }}

post  方法将传递传入来的  callback  所代表的任务,添加到  post_callbacks_  所代表的类型为  vector<callback>  的成员表变量中。如果  post_callbacks_  为空的话,说明背后的处理线程是处于非活动状态,这时通过  post_timer_  设置一个超时时间为0方式来唤醒它。  post_timer_  在构造的时候就已经设置好对应的  callback  为  runPostCallbacks  ,对应代码如下:

DispatcherImpl :: DispatcherImpl(TimeSystem&time_system,                               Buffer :: WatermarkFactoryPtr &&工厂)    :......      post_timer_(createTimer([this]()-> void {runPostCallbacks();})),      current_to_delete _(&to_delete_1_){  RELEASE_ASSERT(Libevent :: Global :: initialized(),“”);}

runPostCallbacks  是一个同时循环,都每次从  post_callbacks_  中取出一个  callback  所代表的任务去运行,直到  post_callbacks_  为空。运行每次  runPostCallbacks  都会确保所有的任务都执行完。显然,在  runPostCallbacks  被线程执行的期间如果  post  进来了新的任务,那么新任务直接追加到  post_callbacks_  尾部即可,而无需做重启线程这一动作。

无效DispatcherImpl :: runPostCallbacks(){而(true){std :: function <void()>回调;    {      线程:: LockGuard lock(post_lock_); 如果(post_callbacks_.empty()){返回;      }      回调= post_callbacks_.front();      post_callbacks_.pop_front();    }    打回来();  }}

延迟删除

最后讲一下  Dispatcher  中比较难理解也很重要的  DeferredDeletable  ,它是一个空接口,所有要进行递减析构的对象都要继承自这个空接口。在特使的代码中像下面这样继承自  DeferredDeletable  的类随处可见。

class DeferredDeletable {公共:  虚拟〜DeferredDeletable(){}};

那何为递减析出构呢?用在这场景呢?递归析出指的是将析出构形的动作交由  Dispatcher  来完成,所以  DeferredDeletable  和  Dispatcher  相关的。  Dispatcher  对象有一个  vector  保存了所有要 重复析  构的对象。

class DispatcherImpl:public Dispatcher {  ...... 私人的:  ........ std :: vector <DeferredDeletablePtr> to_delete_1_; std :: vector <DeferredDeletablePtr> to_delete_2_; std :: vector <DeferredDeletablePtr> * current_to_delete_; }

to_delete_1_  和  to_delete_2_  就是放置存放所有的要逐步析构的对象,这里使用两个  vector  放置,为什么要这样做呢?。  current_to_delete_  始终指向当前正要析构的对象列表,每次执行完析构后就交替指向另外一个对象列表,来回交替。

无效DispatcherImpl :: clearDeferredDeleteList(){  ASSERT(isThreadSafe()); std :: vector <DeferredDeletablePtr> * to_delete = current_to_delete_; size_t num_to_delete = to_delete-> size(); 如果(deferred_deleting_ ||!num_to_delete){返回;  }  ENVOY_LOG(trace,“清除延迟的删除列表(size = {})”,num_to_delete); 如果(current_to_delete_ ==&to_delete_1_){    current_to_delete_ =&to_delete_2_;  }其他{    current_to_delete_ =&to_delete_1_;  }  deferred_deleting_ = true; 对于(size_t i = 0; i <num_to_delete; i ++){    (* to_delete)[i] .reset();  }  to_delete-> clear();  deferred_deleting_ = false;}

上面的代码在执行对象析构的时候先使用  to_delete  来指向当前正要析构的对象列表,然后将  current_to_delete_  指向另外一个列表,这样在添加重复删除的对象时,就可以做到安全的把对象添加到列表中了。因为  deferredDelete  和  clearDeferredDeleteList  都是在同一个线程中运行,所以  current_to_delete_  是一个普通的指针,可以安全的更改指针指向另外一个,而不用担心有线程安全问题。

无效DispatcherImpl :: deferredDelete(DeferredDeletablePtr && to_delete){  ASSERT(isThreadSafe());  current_to_delete _-> emplace_back(std :: move(to_delete));  ENVOY_LOG(trace,“项目添加到延迟删除列表(size = {})”,current_to_delete _-> size()); 如果(1 == current_to_delete _-> size()){    deferred_delete_timer _-> enableTimer(std :: chrono :: milliseconds(0));  }}

当有要进行递减析出  deferredDelete  构图 的对象时,调用  立即,这个函数内部会通过  current_to_delete_  把对象放到要分解析构的列表中,最后判断下当前要延迟析构的列表大小是否是1,如果是1表明这是第一次添加延迟析构的对象 ,就那么需要通过  deferred_delete_timer_  把背后的线程唤醒执行  clearDeferredDeleteList  函数。这样做的原因是避免多次唤醒,因为有一种情况是线程已经唤醒 了正在执行  clearDeferredDeleteList  ,在这个过程中又有其他的对象需要析构而加入到  vector  中。

deferdelete.png

到此为止  deferredDelete  的实现原理就基本分析完了,可以研磨它的实现和任务类别的实现很类似,只不过一个是循环执行  callback  所代表的任务,另一个是对对象进行析构。最后我们来看一下  deferredDelete  的应用场景,却“为什么要进行中继析构?”在Envoy的源代码中经常会看到像下面这样的代码片段。

ConnectionImpl :: ConnectionImpl(Event :: Dispatcher&dispatcher,                                ConnectionSocketPtr &&套接字,                               TransportSocketPtr && transport_socket,                               布尔连接){......  } //传递裸指针到任意中  file_event_ = dispatcher_.createFileEvent(      fd(),[此](uint32_t事件)-> void {onFileEvent(events ;; },       事件:: FileTriggerType :: Edge,      事件:: FileReadyType :: Read | Event :: FileReadyType :: Write);    ......}

传递给  Dispatcher  的  callback  都是通过裸指针的方式进行,如果进行进行的时候对象已经析出了,就会出现野指针的问题,我相信C ++水平还可以的同学都会研磨这个问题,除非能在逻辑上上保证  Dispatcher  的生命周期比所有对象都短,这样就能保证在不同的时候对象肯定不会析构,但是这不可能成立的,因为  Dispatcher  是  EventLoop  的核心。

线程一个运行一个  EventLoop  直到线程结束,  Dispatcher  对象才会析构,意味着这  Dispatcher  对象的生命周期的英文最长的。所以从逻辑上没办法保证进行回调的时候对象没有析构 。可能有人会有疑问,对象在析析的时候把注册的事件取消不就可以避免野指针的问题吗? 那如果事件已经触发了,  callback  正在等待运行呢?或者或者  callback  运行了一半呢?前者libevent是可以保证的,在调用  event_del  的时候可以把正在等待运行的事件取消掉,但是上面只是无能为力了,这个时候如果并且这个想法想一想,是不是只要保证对象析构的时候没有  callback  正在运行就可以解决问题了呢?是的,只要保证所有在执行中的  callback  执行可以利用  Dispatcher  是顺序执行所有  callback  的特点,向  Dispatcher  中插入一个任务就是使用对象析出的,那么当这个任务执行的时候是可以保证没有其他任何  callback  在运行。通过这个方法就完美解决了这里遇到的野指针问题了。

或许有人又会想,这里是不是可以用  和  来解这个呢? 是的,这是解决多线程环境下对象析构的秘密武器,通过延长对象的生命周期,把对象的生命周期延长到和  callback  一样,等  callback  执行完再进行析构,同样可以达到效果,但是这带来了两个问题,第一就是对象生命周期被无限拉长,虽然延迟析构也拉长了生命周期,但是时间是可预期的,一旦  EventLoop  执行了  clearDeferredDeleteList  任务就会立刻被回收,而通过  shared_ptr  的方式其生命周期周期性  callback  何时运行,而  callback  何时运行这个是没办法保证的,  socket  某种 一个等待  的事件事件进行,如果对端一直不发送数据,那么  callback  就一直不会被运行,对象就一直一直无法被第二就是在使用方式上侵入性位移,需要强制使用  shared_ptr  的方式创建对象。

总结

Dispatcher  总的来说其实现还是比较简单明了的,比较容易验证其正确性,同样功能也相对较弱,和chromium的  MessageLoop  ,boost的  asio  都是相似的用途,但是功能上差得比较多。另一个一个我觉得比较奇怪的是,为什么在  DeferredDeletable  的实现中要用  to_delete_1_  和  to_delete_2_  两个交替放置的地方,其实按照我的理解一个级别即可,因为  clearDeferredDeleteList  和  deferredDelete  是保证在同一个线程中执行的,就和  Dispatcher  的任务类别一样,用一个蓄存所有要执行的任务,循环的执行即可。但是Envoy中没有这种,我理解这样设计的原因可能是因为原因在原因上在任务上至少存在一定数量的重分配构象,而在另一方面,大量对象的析构如果保存在一个地方中循环的进行析构势必会影响其他关键任务的执行,所以这里分割成两个物体,多个任务交替的执行,就好比把一个大任务分解成好几个小任务顺序来执行。


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69982815/viewspace-2715545/,如需转载,请注明出处,否则将追究法律责任。

上一篇: 没有了~
下一篇: 没有了~
请登录后发表评论 登录
全部评论
专注软件系统开发

注册时间:2020-08-28

  • 博文量
    1
  • 访问量
    380