首页 > 区块链 > 区块链 > Envoy原始码分析之调度员
在特使代码的中
Dispatcher
的英文随处可见的,可以说在特使中有着举足轻重的地位,一个
Dispatcher
就是一个
,其承担了任务队列,网络事件处理,定时器,信号处理等核心功能。在
特使线程模型
这篇文章所提到的
EventLoop
(
Each worker thread runs a “non-blocking” event loop
)指的就是这个
Dispatcher
对象。这个部分的代码相对较独立,和其他模块关联也比较少,但实际上却不言而喻。下面是与
Dispatcher
相关的类图,在接下来的会议对其中的关键概念进行介绍。
Dispatcher
本质上就是一个
,特使并没有重新实现,而是复用了LIBEVENT中的
event_base
,在LIBEVENT的基础上进行了二次封装并抽象出一些事件类
,比如
FileEvent
,
SignalEvent
,
Timer
等.Libevent是一个Ç库,而Envoy是C ++,为了避免手动管理这些C结构的内存,Envoy通过继承
unique_ptr
的方式重新封装了这些libevent暴露出来的C结构。
template <class T,void(* deleter)(T *)> class CSmartPtr:public std :: unique_ptr <T,void(*)(T *)> {public: CSmartPtr():std :: unique_ptr <T,void(*)(T *)>(nullptr,deleter){} CSmartPtr(T * object):std :: unique_ptr <T,void(*)(T *)>(object,deleter){}};
通过
CSmartPtr
就可以将Libevent中的一些C数据结构的内存通过
机制自动管理起来,使用方式如下:
extern“ C” {void event_base_free(event_base *);} struct evbuffer; extern“ C” {void evbuffer_free(evbuffer *);}..... typedef CSmartPtr <event_base,event_base_free> BasePtr; typedef CSmartPtr <evbuffer,evbuffer_free> BufferPtr; typedef CSmartPtr <bufferevent,bufferevent_free> BufferEventPtr; typedef CSmartPtr <evconnlistener,evconnlistener_free> ListenerPtr;
在Libevent中无论是计时器到期,收到信号,还是文件可读写等都是事件,统一使用
event
类型来表示,Envoy中则将
event
作为
ImplBase
的成员,然后让所有的事件类型的对象都继承
ImplBase
,从而实现了事件的抽象。
ImplBase类{受保护: 〜ImplBase(); 事件raw_event_;};
SignalEvent的实现很简单,通过
evsignal_assign
来初始化事件,然后通过
evsignal_add
添加事件使事件成为未决状态(关于Libevent事件状态见附录)。
class SignalEventImpl:public SignalEvent,ImplBase {public:// signal_num:要设置的信号值 // cb:信号事件的处理函数 SignalEventImpl(DispatcherImpl&dispatcher,int signal_num,SignalCb cb);私有: SignalCb cb_;};SignalEventImpl :: SignalEventImpl(DispatcherImpl&dispatcher, int signal_num,SignalCb cb):cb_(cb){ evsignal_assign( &raw_event_,&dispatcher.base(),signal_num, [](evutil_socket_t,short,void * arg)-> void { static_cast <SignalEventImpl *>(arg)-> cb_(); }, 这个); evsignal_add(&raw_event_,nullptr);}
Timer事件暴露了两个接口一个用于关闭Timer,另外一个则用于启动Timer,需要传递一个时间来设置Timer的到期时间间隔。
类Timer {public:virtual〜Timer(){} virtual void disableTimer()PURE; 虚拟无效enableTimer(const std :: chrono :: milliseconds&d)PURE;};
创建Timer的时候会通过
evtimer_assgin
对事件进行初始化,这个时候事件还处于未决状态而不会触发,需要通过
event_add
添加到
Dispatcher
中才能被触发。
class TimerImpl:public Timer,ImplBase {public: TimerImpl(Libevent :: BasePtr&libevent,TimerCb cb); //计时器 void disableTimer()覆盖;void enableTimer(const std :: chrono :: milliseconds&d)覆盖;私有: TimerCb cb_;};TimerImpl :: TimerImpl(DispatcherImpl&dispatcher,TimerCb cb):cb_(cb){ ASSERT(cb_); evtimer_assign( &raw_event_,&dispatcher.base(), [](evutil_socket_t,short,void * arg)-> void { static_cast <TimerImpl *>(arg)-> cb_(); }, 这个);}
disableTimer
被调用时其内部会召唤
event_del
删除事件,使事件成为非未决状态,
enableTimer
被调用时则间接调用
event_add
使事件成为未决状态,这样一旦超时时间到了就会触发超时事件。
无效的TimerImpl :: disableTimer(){event_del(&raw_event_); }无效的TimerImpl :: enableTimer(const std :: chrono :: milliseconds&d){if(d.count()== 0){ event_active(&raw_event_,EV_TIMEOUT,0); }其他{ std :: chrono :: microseconds我们= std :: chrono :: duration_cast <std :: chrono :: microseconds>(d); 定时电视; tv.tv_sec = us.count()/ 1000000; tv.tv_usec = us.count()%1000000; event_add(&raw_event_,&tv); }}
上面的代码在计算
timer
时间timeval
的时候实现的不确定优雅,应该避免使用像1000000
这样的不连续的性的数字常量,社区中有人建议可以改成如下的形式。
自动秒= std :: chrono :: duration_cast <std :: chrono :: seconds>(d);自动usecs = std :: chrono :: duration_cast <std :: chrono :: microseconds>(d-秒);tv.tv_secs = secs.count();tv.tv_usecs = usecs.count();
socket
专有相关事件被封装为
FileEvent
,其上暴露了二个接口:
activate
用于主动触发事件,典型的使用场景场景:触发EventLoop,Write Buffer有数据,可以主动触发下可写事件(Envoy中的典型使用场景)等;
setEnabled
用于设置事件类型,将事件添加到
EventLoop
中转换成为未决状态。
void FileEventImpl :: activate(uint32_t events){int libevent_events = 0; 如果(事件&FileReadyType :: Read){ libevent_events | = EV_READ; } if(events&FileReadyType :: Write){ libevent_events | = EV_WRITE; } if(events&FileReadyType :: Closed){ libevent_events | = EV_CLOSED; } ASSERT(libevent_events); event_active(&raw_event_,libevent_events,0);} void FileEventImpl :: setEnabled(uint32_t events){ event_del(&raw_event_); AssignEvents(事件); event_add(&raw_event_,nullptr);}
Dispatcher
的内部有一个任务类别,也会创建一个线程专有的人处理任务类别中的任务。通过
Dispatcher
的
post
方法可以将任务投递到任务类别中,交给内部
Dispatcher
的线程去处理。
无效DispatcherImpl :: post(std :: function <void()>回调){bool do_post; { 线程:: LockGuard lock(post_lock_); do_post = post_callbacks_.empty(); post_callbacks_.push_back(callback); }如果(do_post){ post_timer _-> enableTimer(std :: chrono ::毫秒(0)); }}
post
方法将传递传入来的
callback
所代表的任务,添加到
post_callbacks_
所代表的类型为
vector<callback>
的成员表变量中。如果
post_callbacks_
为空的话,说明背后的处理线程是处于非活动状态,这时通过
post_timer_
设置一个超时时间为0方式来唤醒它。
post_timer_
在构造的时候就已经设置好对应的
callback
为
runPostCallbacks
,对应代码如下:
DispatcherImpl :: DispatcherImpl(TimeSystem&time_system, Buffer :: WatermarkFactoryPtr &&工厂) :...... post_timer_(createTimer([this]()-> void {runPostCallbacks();})), current_to_delete _(&to_delete_1_){ RELEASE_ASSERT(Libevent :: Global :: initialized(),“”);}
runPostCallbacks
是一个同时循环,都每次从
post_callbacks_
中取出一个
callback
所代表的任务去运行,直到
post_callbacks_
为空。运行每次
runPostCallbacks
都会确保所有的任务都执行完。显然,在
runPostCallbacks
被线程执行的期间如果
post
进来了新的任务,那么新任务直接追加到
post_callbacks_
尾部即可,而无需做重启线程这一动作。
无效DispatcherImpl :: runPostCallbacks(){而(true){std :: function <void()>回调; { 线程:: LockGuard lock(post_lock_); 如果(post_callbacks_.empty()){返回; } 回调= post_callbacks_.front(); post_callbacks_.pop_front(); } 打回来(); }}
最后讲一下
Dispatcher
中比较难理解也很重要的
DeferredDeletable
,它是一个空接口,所有要进行递减析构的对象都要继承自这个空接口。在特使的代码中像下面这样继承自
DeferredDeletable
的类随处可见。
class DeferredDeletable {公共: 虚拟〜DeferredDeletable(){}};
那何为递减析出构呢?用在这场景呢?递归析出指的是将析出构形的动作交由
Dispatcher
来完成,所以
DeferredDeletable
和
Dispatcher
相关的。
Dispatcher
对象有一个
vector
保存了所有要
重复析
构的对象。
class DispatcherImpl:public Dispatcher { ...... 私人的: ........ std :: vector <DeferredDeletablePtr> to_delete_1_; std :: vector <DeferredDeletablePtr> to_delete_2_; std :: vector <DeferredDeletablePtr> * current_to_delete_; }
to_delete_1_
和
to_delete_2_
就是放置存放所有的要逐步析构的对象,这里使用两个
vector
放置,为什么要这样做呢?。
current_to_delete_
始终指向当前正要析构的对象列表,每次执行完析构后就交替指向另外一个对象列表,来回交替。
无效DispatcherImpl :: clearDeferredDeleteList(){ ASSERT(isThreadSafe()); std :: vector <DeferredDeletablePtr> * to_delete = current_to_delete_; size_t num_to_delete = to_delete-> size(); 如果(deferred_deleting_ ||!num_to_delete){返回; } ENVOY_LOG(trace,“清除延迟的删除列表(size = {})”,num_to_delete); 如果(current_to_delete_ ==&to_delete_1_){ current_to_delete_ =&to_delete_2_; }其他{ current_to_delete_ =&to_delete_1_; } deferred_deleting_ = true; 对于(size_t i = 0; i <num_to_delete; i ++){ (* to_delete)[i] .reset(); } to_delete-> clear(); deferred_deleting_ = false;}
上面的代码在执行对象析构的时候先使用
to_delete
来指向当前正要析构的对象列表,然后将
current_to_delete_
指向另外一个列表,这样在添加重复删除的对象时,就可以做到安全的把对象添加到列表中了。因为
deferredDelete
和
clearDeferredDeleteList
都是在同一个线程中运行,所以
current_to_delete_
是一个普通的指针,可以安全的更改指针指向另外一个,而不用担心有线程安全问题。
无效DispatcherImpl :: deferredDelete(DeferredDeletablePtr && to_delete){ ASSERT(isThreadSafe()); current_to_delete _-> emplace_back(std :: move(to_delete)); ENVOY_LOG(trace,“项目添加到延迟删除列表(size = {})”,current_to_delete _-> size()); 如果(1 == current_to_delete _-> size()){ deferred_delete_timer _-> enableTimer(std :: chrono :: milliseconds(0)); }}
当有要进行递减析出
deferredDelete
构图
的对象时,调用
立即,这个函数内部会通过
current_to_delete_
把对象放到要分解析构的列表中,最后判断下当前要延迟析构的列表大小是否是1,如果是1表明这是第一次添加延迟析构的对象
,就那么需要通过
deferred_delete_timer_
把背后的线程唤醒执行
clearDeferredDeleteList
函数。这样做的原因是避免多次唤醒,因为有一种情况是线程已经唤醒 了正在执行
clearDeferredDeleteList
,在这个过程中又有其他的对象需要析构而加入到
vector
中。
到此为止
deferredDelete
的实现原理就基本分析完了,可以研磨它的实现和任务类别的实现很类似,只不过一个是循环执行
callback
所代表的任务,另一个是对对象进行析构。最后我们来看一下
deferredDelete
的应用场景,却“为什么要进行中继析构?”在Envoy的源代码中经常会看到像下面这样的代码片段。
ConnectionImpl :: ConnectionImpl(Event :: Dispatcher&dispatcher, ConnectionSocketPtr &&套接字, TransportSocketPtr && transport_socket, 布尔连接){...... } //传递裸指针到任意中 file_event_ = dispatcher_.createFileEvent( fd(),[此](uint32_t事件)-> void {onFileEvent(events ;; }, 事件:: FileTriggerType :: Edge, 事件:: FileReadyType :: Read | Event :: FileReadyType :: Write); ......}
传递给
Dispatcher
的
callback
都是通过裸指针的方式进行,如果进行进行的时候对象已经析出了,就会出现野指针的问题,我相信C ++水平还可以的同学都会研磨这个问题,除非能在逻辑上上保证
Dispatcher
的生命周期比所有对象都短,这样就能保证在不同的时候对象肯定不会析构,但是这不可能成立的,因为
Dispatcher
是
EventLoop
的核心。
线程一个运行一个
EventLoop
直到线程结束,
Dispatcher
对象才会析构,意味着这
Dispatcher
对象的生命周期的英文最长的。所以从逻辑上没办法保证进行回调的时候对象没有析构
。可能有人会有疑问,对象在析析的时候把注册的事件取消不就可以避免野指针的问题吗?
那如果事件已经触发了,
callback
正在等待运行呢?或者或者
callback
运行了一半呢?前者libevent是可以保证的,在调用
event_del
的时候可以把正在等待运行的事件取消掉,但是上面只是无能为力了,这个时候如果并且这个想法想一想,是不是只要保证对象析构的时候没有
callback
正在运行就可以解决问题了呢?是的,只要保证所有在执行中的
callback
执行可以利用
Dispatcher
是顺序执行所有
callback
的特点,向
Dispatcher
中插入一个任务就是使用对象析出的,那么当这个任务执行的时候是可以保证没有其他任何
callback
在运行。通过这个方法就完美解决了这里遇到的野指针问题了。
或许有人又会想,这里是不是可以用
和
来解这个呢?
是的,这是解决多线程环境下对象析构的秘密武器,通过延长对象的生命周期,把对象的生命周期延长到和
callback
一样,等
callback
执行完再进行析构,同样可以达到效果,但是这带来了两个问题,第一就是对象生命周期被无限拉长,虽然延迟析构也拉长了生命周期,但是时间是可预期的,一旦
EventLoop
执行了
clearDeferredDeleteList
任务就会立刻被回收,而通过
shared_ptr
的方式其生命周期周期性
callback
何时运行,而
callback
何时运行这个是没办法保证的,
socket
某种
一个等待
的事件事件进行,如果对端一直不发送数据,那么
callback
就一直不会被运行,对象就一直一直无法被第二就是在使用方式上侵入性位移,需要强制使用
shared_ptr
的方式创建对象。
Dispatcher
总的来说其实现还是比较简单明了的,比较容易验证其正确性,同样功能也相对较弱,和chromium的
MessageLoop
,boost的
asio
都是相似的用途,但是功能上差得比较多。另一个一个我觉得比较奇怪的是,为什么在
DeferredDeletable
的实现中要用
to_delete_1_
和
to_delete_2_
两个交替放置的地方,其实按照我的理解一个级别即可,因为
clearDeferredDeleteList
和
deferredDelete
是保证在同一个线程中执行的,就和
Dispatcher
的任务类别一样,用一个蓄存所有要执行的任务,循环的执行即可。但是Envoy中没有这种,我理解这样设计的原因可能是因为原因在原因上在任务上至少存在一定数量的重分配构象,而在另一方面,大量对象的析构如果保存在一个地方中循环的进行析构势必会影响其他关键任务的执行,所以这里分割成两个物体,多个任务交替的执行,就好比把一个大任务分解成好几个小任务顺序来执行。
来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69982815/viewspace-2715545/,如需转载,请注明出处,否则将追究法律责任。