ITPub博客

首页 > 数据库 > NoSQL > mongodb内核源码实现、性能调优、最佳运维实践系列-网络传输层模块源码实现三

mongodb内核源码实现、性能调优、最佳运维实践系列-网络传输层模块源码实现三

原创 NoSQL 作者:y123456yzzyz 时间:2020-10-28 14:00:11 0 删除 编辑

transport_layer 网络传输层模块源码实现三

关于作者

前滴滴出行技术专家,现任OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/ 十万亿级数据量文档数据库 mongodb 研发和运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。 Github 账号地址 : https://github.com/y123456yz

1. 说明

   在之前的 <<Mongodb 网络传输处理源码实现及性能调优 - 体验内核性能极致设计 >> << transport_layer 网络传输层模块源码实现 >> 一文中分析了如何阅读百万级大工程源码、 Asio 网络库实现、线程模型、 transport_layer 套接字处理及传输层管理子模块、 session 会话子模块、 Ticket 数据收发子模块、 service_entry_point 服务入口点子模块。

本文将继续分析网络传输层模块中service_state_machine 状态机调度子模块内核源码实现。

请提前阅读系列的前两篇文章:

mongodb 内核源码实现、性能调优、最佳运维实践系列 - 网络传输层模块源码实现一

mongodb 内核源码实现、性能调优、最佳运维实践系列 - 网络传输层模块源码实现二

2. service_state_machine 状态机调度子模块

service_state_machine 状态机处理模块主要复杂一次完整请求的状态转换,确保请求可以按照指定的流程顺利进行,最终实现客户端的正常 mongodb 访问。该模块核心代码实现主要由以下三个源码文件实现( test 为测试相关,可以忽略):

2.1 核心代码实现

service_entry_point 服务入口点子模块分析中,当接收到一个新的链接后,在 ServiceEntryPointImpl::startSession(...) 回调函数中会构造一个 ServiceStateMachine   ssm 类,从而实现了新链接、 session ssm 的一一映射关系。其中, ServiceStateMachine  类实现对ThreadGuard( 线程守护 ) 有较多的依赖,因此本文从这两个类核心代码实现来分析整个状态机调度模块的内部设计细节。

2.1.1 ThreadGuard 线程守护类核心代码实现

ThreadGuard 也就是 线程守护 类,该类主要用于工作线程名的管理维护、ssm 归属管理、该 ssm 对应 session 链接的回收处理等。该类核心成员及接口实现如下:

1. class ServiceStateMachine::ThreadGuard {  
2.     ......  
3. public:  
4.     // create a ThreadGuard which will take ownership of the SSM in this thread.  
5.     //ThreadGuard初始化,标记ssm所有权属于本线程  
6.     explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {  
7.         //获取ssm状态机所有权  
8.         auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned);  
9.         //如果是一个链接一个线程模型,则条件满足  
10.         if (owned == Ownership::kStatic) {   
11.         //一个链接一个线程模型,由于链接始终由同一个线程处理,因此该链接对应的ssm始终归属于同一个线程  
12.             dassert(haveClient());  
13.             dassert(Client::getCurrent() == _ssm->_dbClientPtr);  
14.             //标识归属权已确定  
15.             _haveTakenOwnership = true;  
16.             return;  
17.         }  
18.          ......
19.         //adaptive 动态线程模式走下面的模式  
20.   
21.         //把当前线程的线程名零时保存起来  
22.         auto oldThreadName = getThreadName();   
23.         //ssm保存的线程名和当前线程名不同  
24.         if (oldThreadName != _ssm->_threadName) {  
25.             //即将修改线程名了,把修改前的线程名保存到_oldThreadName  
26.             _ssm->_oldThreadName = getThreadName().toString();  
27.            //把运行本ssm状态机的线程名改为conn-x线程  
28.             setThreadName(_ssm->_threadName); //把当前线程改名为_threadName  
29.         }  
30.   
31.         //设置本线程对应client信息,一个链接对应一个client,标识本client当前归属于本线程处理  
32.         Client::setCurrent(std::move(_ssm->_dbClient));  
33.          //本状态机ssm所有权有了,归属于运行本ssm的线程  
34.         _haveTakenOwnership = true;  
35.     }  
36.     ......  
37.     //重新赋值  
38.     ThreadGuard& operator=(ThreadGuard&& other) {  
39.         if (this != &other) {  
40.             _ssm = other._ssm;  
41.             _haveTakenOwnership = other._haveTakenOwnership;  
42.             //原来的other所有权失效  
43.             other._haveTakenOwnership = false;  
44.         }  
45.     //返回  
46.         return *this;  
47.     };  
48.   
49.     //析构函数  
50.     ~ThreadGuard() {  
51.     //ssm所有权已确定,则析构的时候,调用release处理,恢复线程原有线程名  
52.         if (_haveTakenOwnership)  
53.             release();  
54.     }  
55.   
56.     //一个链接一个线程模型,标记_owned为kStatic,也就是线程名永远不会改变  
57.     void markStaticOwnership() {  
58.         dassert(static_cast<bool>(*this));  
59.         _ssm->_owned.store(Ownership::kStatic);  
60.     }  
61.   
62.     //恢复原有线程名,同时把client信息从调度线程归还给状态机  
63.     void release() {  
64.          auto owned = _ssm->_owned.load();  
65.          //adaptive异步线程池模式满足if条件,表示SSM固定归属于某个线程  
66.         if (owned != Ownership::kStatic) {  
67.         //本线程拥有currentClient信息,于是把它归还给SSM状态机  
68.             if (haveClient()) {  
69.                 _ssm->_dbClient = Client::releaseCurrent();  
70.             }  
71.              //恢复到以前的线程名  
72.             if (!_ssm->_oldThreadName.empty()) {  
73.                 //恢复到老线程名  
74.                 setThreadName(_ssm->_oldThreadName);   
75.             }  
76.         }  
77.         //状态机状态进入end,则调用对应回收hook处理  
78.         if (_ssm->state() == State::Ended) {  
79.             //链接关闭的回收处理 ServiceStateMachine::setCleanupHook  
80.             auto cleanupHook = std::move(_ssm->_cleanupHook);  
81.             if (cleanupHook)  
82.                 cleanupHook();  
83.             return;  
84.         }  
85.   
86.         //归属权失效  
87.         _haveTakenOwnership = false;  
88.         //归属状态变为未知  
89.         if (owned == Ownership::kOwned) {  
90.             _ssm->_owned.store(Ownership::kUnowned);  
91.         }  
92.     }  
93.   
94. private:  
95.     //本线程守护当前对应的ssm  
96.     ServiceStateMachine* _ssm;  
97.     //默认false,标识该状态机ssm不归属于任何线程  
98.     bool _haveTakenOwnership = false;  
99. }

从上面的代码分析可以看出线程守护类作用比较明确,就是守护当前线程的归属状态,并记录状态机ssm 不同状态变化前后的线程名。此外,状态机 ssm 对应的 session 链接如果进入 end 状态,则该链接的资源回收释放也由该类完成。

查看mongod 或者 mongos 实例,如果启动实例的时候配置了 serviceExecutor: adaptive 会发现这些进程下面有很多线程名为 conn-x worker-x 线程,同时同一个线程线程名可能发生改变,这个过程就是由ThreadGuard 线程守护类来实现。 synchronous 一个链接一个线程模型只有 conn-x 线程,adaptive 线程模型将同时存在有线程名为 conn-x worker-x 的线程,具体原理讲在后面继续分析,如下图:

说明: synchronous 线程模式对应 worker 初始线程名为 conn-x adaptive 线程模型对应 worker 初始线程名为 worker-x

ThreadGuard 线程守护类和状态机 ssm(service_state_machine) 关联紧密,客户端请求处理的内部 ssm 状态转换也和该类密切关联,请看后续分析。

2.1.2 ServiceStateMachine  类核心代码实现

service_state_machine 状态机处理模块核心代码实现通过 ServiceStateMachine 类完成,该类的核心结构成员和函数接口如下:

1. //一个新链接对应一个ServiceStateMachine保存到ServiceEntryPointImpl._sessions中  
2. class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {  
3.    ......  
4. public:  
5.    ......  
6.    static std::shared_ptr<ServiceStateMachine> create(...);  
7.    ServiceStateMachine(...);  
8.    //状态机所属状态
9.    enum class State {  
10.        //ServiceStateMachine::ServiceStateMachine构造函数初始状态  
11.        Created,        
12.        //ServiceStateMachine::_runNextInGuard开始进入接收网络数据状态  
13.        //标记本次客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该链接的  
14.        //下一轮请求,该状态对应处理流程为_sourceMessage  
15.        Source,         
16.        //等待获取客户端的数据  
17.        SourceWait,     
18.        //接收到一个完整mongodb报文后进入该状态  
19.        Process,        
20.        //等待数据发送成功  
21.        SinkWait,       
22.        //接收或者发送数据异常、链接关闭,则进入该状态  _cleanupSession  
23.        EndSession,     
24.        //session回收处理进入该状态  
25.        Ended           
26.    };  
27.    //所有权状态,主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效  
28.    enum class Ownership {   
29.        //该状态表示本状态机SSM处于非活跃状态  
30.        kUnowned,    
31.        //该状态标识本状态机SSM归属于某个工作worker线程,处于活跃调度运行状态  
32.        kOwned,   
33.        //表示SSM固定归属于某个线程  
34.        kStatic   
35.    };  
36.      
37.    ......  
38. private:  
39.    //ThreadGuard可以理解为线程守护,后面在ThreadGuard类中单独说明  
40.    class ThreadGuard;  
41.    friend class ThreadGuard;  
42.      
43.    ......  
44.    //获取session信息  
45.    const transport::SessionHandle& _session()  
46.    //以下两个接口为任务task调度相关接口  
47.    void _scheduleNextWithGuard(...);  
48.    void _runNextInGuard(ThreadGuard guard);  
49.    //接收到一个完整mongodb报文后的处理  
50.    inline void _processMessage(ThreadGuard guard);  
51.    //以下四个接口完成底层数据读写及其对应回调处理  
52.    void _sourceCallback(Status status);  
53.    void _sinkCallback(Status status);  
54.    void _sourceMessage(ThreadGuard guard);  
55.    void _sinkMessage(ThreadGuard guard, Message toSink);  
56.      
57.    //一次客户端请求,当前mongodb服务端所处的状态  
58.    AtomicWord<State> _state{State::Created};  
59.    //服务入口,ServiceEntryPointMongod ServiceEntryPointMongos mongod及mongos入口点  
60.    ServiceEntryPoint* _sep;  
61.    //synchronous及adaptive模式,也就是线程模型是一个链接一个线程还是动态线程池  
62.    transport::Mode _transportMode;  
63.    //ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)服务上下文  
64.    ServiceContext* const _serviceContext;  
65.    //也就是本ssm对应的session信息,默认对应ASIOSession   
66.    transport::SessionHandle _sessionHandle;   
67.    //根据session构造对应client信息,ServiceStateMachine::ServiceStateMachine赋值  
68.    //也就是本次请求对应的客户端信息  
69.    ServiceContext::UniqueClient _dbClient;  
70.    //指向上面的_dbClient  
71.    const Client* _dbClientPtr;  
72.     //该SSM当前处理线程的线程名,因为adaptive线程模型一次请求中的不同状态会修改线程名
73.    const std::string _threadName;  
74.    //修改线程名之前的线程名称  
75.    std::string _oldThreadName;  
76.    //ServiceEntryPointImpl::startSession->ServiceStateMachine::setCleanupHook中设置赋值  
77.    //session链接回收处理  
78.    stdx::function<void()> _cleanupHook;  
79.    //接收处理的message信息  一个完整的报文就记录在该msg中  
80.    Message _inMessage;   
81.    //默认初始化kUnowned,标识本SSM状态机处于非活跃状态,  
82.    //主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效  
83.    AtomicWord<Ownership> _owned{Ownership::kUnowned};  
84. }

该类核心成员功能说明如下表:

  我们知道,链接、session SSM 状态机一一对应,他们也拥有对应的归属权,这里的归属权指的就是当前 SSM 归属于那个线程,也就是当前 SSM 状态机调度模块由那个线程实现。归属权通过 Ownership 类标记,该类保护如下几种状态,每种状态功能说明如下:

  Mongodb 服务端接收到客户端请求后的数据接收、协议解析、从 db 层获取数据、发送数据给客户端都是通过 SSM 状态机进行有序的状态转换处理, SSM 调度处理过程中保护多个状态,每个状态对应一个状态码,具体状态码及其功能说明如下表所示:

以上是SSM 处理请求过程的状态码信息,状态转换的具体实现过程请参考后面的核心代码分析。 listerner 线程接收到新的客户端链接后会调用通过 service_entry_point 服务入口点子模块的 ssm->start() 接口进入 SSM 状态机调度模块,该接口相关的源码实现如下:

1. //ServiceEntryPointImpl::startSession中执行  启动SSM状态机  
2. void ServiceStateMachine::start(Ownership ownershipModel) {  
3.     //直接调用_scheduleNextWithGuard接口  
4.     _scheduleNextWithGuard(   
5.     //listener线程暂时性的变为conn线程名,在_scheduleNextWithGuard中任  
6.     //务入队完成后,在下面的_scheduleNextWithGuard调用guard.release()恢复listener线程名  
7.         ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel);  
8. }  
9.   
10. void ServiceStateMachine::_scheduleNextWithGuard(...) {  
11.     //该任务func实际上由worker线程运行,worker线程从asio库的全局队列获取任务调度执行  
12.     auto func = [ ssm = shared_from_this(), ownershipModel ] {  
13.         //构造ThreadGuard  
14.         ThreadGuard guard(ssm.get());    
15.         //说明是sync mode,即一个链接一个线程模式, 归属权明确,属于指定线程  
16.         if (ownershipModel == Ownership::kStatic)   
17.             guard.markStaticOwnership();  
18.         //对应:ServiceStateMachine::_runNextInGuard,在这里面完成状态调度转换  
19.         ssm->_runNextInGuard(std::move(guard));    
20.     };  
21.     //恢复之前的线程名,如果该SSM进入Ended状态,则开始资源回收处理  
22.     guard.release();  
23.     //ServiceExecutorAdaptive::schedule(adaptive)   ServiceExecutorSynchronous::schedule(synchronous)  
24.     //第一次进入该函数的时候在这里面创建新线程,不是第一次则把task任务入队调度  
25.     Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags);  
26.     if (status.isOK()) {  
27.         return;  
28.     }  
29.     //异常处理  
30.     ......  
31. }

ServiceStateMachine::start() 接口调用 ServiceStateMachine::_scheduleNextWithGuard( ) 来启动状态机运行。 _scheduleNextWithGuard( ) 接口最核心的作用就是调用 service_executor 服务运行子模块 ( 线程模型子模块 ) schedule() 接口来把状态机调度任务入队到 ASIO 网络库的一个全局队列 (adaptive 动态线程模型 ) ,如果是一个链接一个线程模型,则任务入队到线程级私有队列。

adaptive 线程模型,任务入队以及工作线程调度任务执行的流程将在后续的线程模型子模块中分析,也可以参考: <<Mongodb 网络传输处理源码实现及性能调优 - 体验内核性能极致设计 >>

   此外, _scheduleNextWithGuard( ) 入队到全局队列的任务就是本模块后面需要分析的SSM 状态机任务,这些 task 任务通过本函数接口的 func  (...) 进行封装,然后通过线程模型子模块入队到一个全局队列。 Func(...) 这个 task 任务中会直接调用 _runNextInGuard() 接口来进行状态转换处理,该接口也就是入队到 ASIO 全局队列的任务,核心代码功能如下:

1. void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {  
2.     //获取当前SSM状态  
3.     auto curState = state();  
4.     // If this is the first run of the SSM, then update its state to Source  
5.     //如果是第一次运行该SSM,则状态为Created,到这里标记可以准备接收数据了  
6.     if (curState == State::Created) {   
7.         //进入Source等待接收数据  
8.         curState = State::Source;  
9.         _state.store(curState);  
10.     }  
11.     //各状态对应处理  
12.     try {  
13.         switch (curState) {   
14.             //接收数据  
15.             case State::Source:    
16.                 _sourceMessage(std::move(guard));  
17.                 break;  
18.             //以及接收到完整的一个mongodb报文,可以内部处理(解析+命令处理+应答客户端)  
19.             case State::Process:  
20.                 _processMessage(std::move(guard));  
21.                 break;  
22.             //链接异常或者已经关闭,则开始回收处理  
23.             case State::EndSession:  
24.                 _cleanupSession(std::move(guard));  
25.                 break;  
26.             default:  
27.                 MONGO_UNREACHABLE;  
28.         }  
29.          return;
30.     } catch (...) {  
31.         //异常打印  
32.     }  
33.     //异常处理  
34.     ......  
35.     //进入EndSession状态  
36.     _state.store(State::EndSession);  
37.     _cleanupSession(std::move(guard));  
38. }

从上面的代码实现可以看出,真正入队到全局队列中的任务类型只有三个,分别是:

1)  接收mongodb 数据的 task 任务,简称为 readTask

2)  接收到一个完整mongodb 数据后的后续处理 ( 包括协议解析、命令处理、 DB 获取数据、发送数据给客户端等 ) ,简称为 dealTask

3)  接收或者发送数据异常、链接关闭等引起的后续资源释放,简称为cleanTask

下面针对这三种task 任务核心代码实现进行分析:

readTask 任务核心代码实现

readTask 任务核心代码实现由 _sourceMessage() 接口实现,具体代码如下:

1. //接收客户端数据  
2. void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {  
3.     ......  
4.     //获取本session接收数据的ticket,也就是ASIOSourceTicket  
5.     auto ticket = _session()->sourceMessage(&_inMessage);   
6.     //进入等等接收数据状态  
7.     _state.store(State::SourceWait);    
8.     //release恢复worker线程原有的线程名,synchronous线程模型为"conn-xx",adaptive对应worker线程名为"conn-xx"  
9.     guard.release();  
10.     //线程模型默认同步方式,也就是一个链接一个线程  
11.     if (_transportMode == transport::Mode::kSynchronous) {  
12.          //同步方式,读取到一个完整mongodb报文后执行_sourceCallback回调  
13.          _sourceCallback([this](auto ticket) {  
14.             MONGO_IDLE_THREAD_BLOCK;  
15.             return _session()->getTransportLayer()->wait(std::move(ticket));  
16.         }(std::move(ticket)));   
17.     } else if (_transportMode == transport::Mode::kAsynchronous) {  
18.         //adaptive线程模型,异步读取一个mongodb报文后执行_sourceCallback回调  
19.         _session()->getTransportLayer()->asyncWait(   
20.             ////TransportLayerASIO::ASIOSourceTicket::_bodyCallback读取到一个完整报文后执行该回调  
21.             std::move(ticket), [this](Status status) { _sourceCallback(status); });  
22.     }  
23. }  
24.   
25. //接收到一个完整mongodb报文后的回调处理  
26. void ServiceStateMachine::_sourceCallback(Status status) {  
27.     //构造ThreadGuard,修改执行本SSM接口的线程名为conn-xx  
28.     ThreadGuard guard(this);   
29.   
30.     //状态检查  
31.     dassert(state() == State::SourceWait);  
32.     //获取链接session远端信息  
33.     auto remote = _session()->remote();   
34.     if (status.isOK()) {  
35.     //等待调度,进入处理消息阶段  _processMessage  
36.         _state.store(State::Process);  
37.         //注意kMayRecurse标识State::Process阶段的处理还是由本线程执行,这是一个递归标记  
38.         return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse);  
39.     }  
40.     ......  
41.     //异常流程调用  
42.     _runNextInGuard(std::move(guard));  
43. }

SSM 调度的第一个任务就是 readTask 任务,从上面的源码分析可以看出,该任务就是通过 ticket 数据分发模块从 ASIO 网络库读取一个完整长度的 mongodb 报文,然后执行 _sourceCallback 回调。进入该回调函数后,即刻设置SSM 状态为 State::Process 状态,然后调用 _scheduleNextWithGuard (...) dealTask 任务入队到 ASIO 的全局队列 (adaptive 线程模型 ) ,或者入队到线程级私有队列 ( synchronous 线程模型) 等待 worker 线程调度执行。

这里有个细节,在把dealTask 入队的时候,携带了 kMayRecurse 标记,该标记标识该任务可以递归调用,也就是该任务可以由当前线程继续执行,这样也就可以保证同一个请求的taskRead 任务和 dealTask 任务由同一个线程处理。任务递归调度,可以参考后面的线程模型子模块源码实现。

dealTask 任务核心代码实现

当读取到一个完整长度的mongodb 报文后,就会把 dealTask 任务入队到全局队列,然后由 worker 线程调度执行该 task 任务。 dealTask 任务的核心代码实现如下:

1. //dealTask处理  
2. void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
3.     ......  
4.     //入口流量计数  
5.     networkCounter.hitLogicalIn(_inMessage.size());  
6.     //获取一个唯一的UniqueOperationContext,一个客户端对应一个UniqueOperationContext  
7.     auto opCtx = Client::getCurrent()->makeOperationContext();  
8.     //ServiceEntryPointMongod::handleRequest  ServiceEntryPointMongos::handleRequest请求处理  
9.     //command处理、DB访问后的数据通过dbresponse返回  
10.     DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
11.     //释放opCtx,这样currentop就看不到了  
12.     opCtx.reset();  
13.     //需要发送给客户端的应答数据  
14.     Message& toSink = dbresponse.response;  
15.     //应答数据存在  
16.     if (!toSink.empty()) {    
17.         ......  
18.         //发送数据 ServiceStateMachine::_sinkMessage()  
19.         _sinkMessage(std::move(guard), std::move(toSink));  
20.   
21.     } else {  
22.        //如果不需要应答客户端的处理  
23.        ......  
24.     }  
25. }  
26.   
27. //调用Sinkticket发送数据  
28. void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {  
29.     //获取发送数据的ASIOSinkTicket  
30.     auto ticket = _session()->sinkMessage(toSink);  
31.     //进入sink发送等待状态  
32.     _state.store(State::SinkWait);  
33.     //恢复原有的worker线程名  
34.     guard.release();  
35.     //synchronous线程模型,同步发送  
36.     if (_transportMode == transport::Mode::kSynchronous) {  
37.         //最终在ASIOSinkTicket同步发送数据成功后执行_sinkCallback  
38.         _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket)));  
39.     } else if (_transportMode == transport::Mode::kAsynchronous) {  
40.         //最终在ASIOSinkTicket异步发送数据成功后执行_sinkCallback  
41.         _session()->getTransportLayer()->asyncWait(  
42.             std::move(ticket), [this](Status status) { _sinkCallback(status); });  
43.     }  
44. }  
45.   
46. //sink数据发送  
47. void ServiceStateMachine::_sinkCallback(Status status) {  
48.     //SSM归属于新的guard,同时修改当前线程名为conn-xx  
49.     ThreadGuard guard(this);  
50.     //状态检查  
51.     dassert(state() == State::SinkWait);  
52.     if (!status.isOK()) {  
53.         //进入EndSession状态  
54.         _state.store(State::EndSession);  
55.         //异常情况调用  
56.         return _runNextInGuard(std::move(guard));  
57.     } else if (_inExhaust) { //_inExhaust方式   
58.         //注意这里的状态是process   _processMessage   还需要继续进行Process处理  
59.         _state.store(State::Process);   
60.     } else {   
61.         //正常流程始终进入该分支 _sourceMessage    这里继续进行递归接收数据处理  
62.         //注意这里的状态是Source,继续接收客户端请求  
63.         _state.store(State::Source);  
64.     }  
65.     //本链接对应的一次mongo访问已经应答完成,需要继续要一次调度了  
66.     return _scheduleNextWithGuard(std::move(guard),  
67.                                   ServiceExecutor::kDeferredTask |  
68.                                       ServiceExecutor::kMayYieldBeforeSchedule);  
69. }

readTask 通过 ticket 数据分发子模块读取一个完整长度的 mongodb 报文后,开始 dealTask 任务逻辑,该任务也就是 _processMessage(...) 。该接口中核心实现就是调用 mongod mongos 实例对应的服务入口类的 handleRequest(...) 接口来完成后续的 command 命令处理、 DB 层数据访问等,访问到的数据存储到 DbResponse 中,最终在通过 _sinkMessage(...) 把数据发送出去。

真正的mongodb 内部处理流程实际上就是通过该 dealTask 任务完成,该任务也是处理整个请求中资源耗费最重的一个环节。在该 task 任务中,当数据成功发送给客户端后,该 session 链接对应的 SSM 状态机进入 State::Source 状态,继续等待worker 线程调度来完成后续该链接的新请求。

cleanTask 任务

在数据读写过程、客户端链接关闭、访问DB 数据层等任何一个环节异常,则会进入 State::EndSession 状态。该状态对应得任务实现相对比较简单,具体代码实现如下:

1. //session对应链接回收处理  
2. void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {  
3.     //进入这个状态后在~ThreadGuard::release中进行ssm _cleanupHook处理,该hook在ServiceEntryPointImpl::startSession  
4.     _state.store(State::Ended);  
5.     //清空message buffer  
6.     _inMessage.reset();  
7.     //释放链接对应client资源  
8.     Client::releaseCurrent();  
}

进入该状态后直接由本线程进行session 资源回收和 client 资源释放处理,而无需状态机调度 worker 线程来回收。

2.2 关于 worker 线程名和 guardthread 线程守护类

前面得分析我们知道,当线程模型为adaptive 动态线程模型的时候, mongod mongos 实例对应的子线程中有很多名为“ conn-xx ”和 worker-xx 的线程,而且同一个线程可能一会儿线程名为conn-xx ”,下一次又变为了 worker-xx 。这个线程名的初始命名和线程名更改与ServiceStateMachine 状态机调度类、 guardthread 线程守护类、 worker 线程模型等都有关系。

Worker 线程由 ServiceExecutor 线程模型子模块创建,请参考后续线程模型子模块相关章节。默认初始化线程名为 conn-x ,初始化代码实现如下:

1. //ServiceStateMachine::create调用,ServiceStateMachine类初始化构造  
2. ServiceStateMachine::ServiceStateMachine(...)  
3.     ......  
4.     //线程名初始化:conn-xx,xx代码session id  
5.     _threadName{str::stream() << "conn-" << _session()->id()} {}   
6. }  
7.   
8. class Session {  
9.     ......  
10.     //sessionID,自增生成  
11.     const Id _id;  
12. }  
13.   
14. //全局sessionIdCounter计数器,初始化为0  
15. AtomicUInt64 sessionIdCounter(0);  
16.   
17. //session id自增  
18. Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {}

SSM 状态处理过程中,会把一个完整的请求过程 = readTask 任务 + dealTask 任务,这两个任务都是通过 SSM 状态机和 ServiceExecutor 线程模型子模块的 worker 线程配合调度完成,在任务处理过程中处理同一个任务的线程可能会有多次线程名更改,这个就是结合 guardthread 线程守护类来完成,以一个线程名切换更改伪代码实现为例:

1. worker_thread_run_task(...)  
2. {  
3.     //如果是adaptive线程模型,当前worker线程名为"worker-xx"  
4.     print(threadName)  
5.     //业务逻辑处理1  
6.     ......  
7.       
8.     //初始化构造ThreadGuard,这里面修改线程名为_ssm->_threadName,也就是"conn-xx",  
9.     //同时保存原来的线程名"worker-xx"到_ssm->_oldThreadName中  
10.     ThreadGuard guard(this);  
11.     //如果是adaptive线程模型,线程名打印内容为"conn-xx"  
12.     print(threadName)  
13.     //业务逻辑处理2  
14.     ......  
15.     //恢复_ssm->_oldThreadName保存的线程名"worker-xx"  
16.     guard.release();  
17.       
18.     //如果是adaptive线程模型,线程名恢复为"worker-xx"  
19.     print(threadName)  
20. }

从上面的伪代码可以看出,adaptive 线程模型对应 worker 线程名为 worker ,在进入 ThreadGuard guard(this) 流程后,线程名更改为 conn-xx 线程,当 guard.release() 释放后恢复原有 worker-xx 线程名。

结合前面的SSM 状态处理流程, adaptive 线程模型可以得到如下总结:底层网络 IO 数据读写过程, worker 线程名会改为 worker-xx ,其他非网络IO mongodb 内部逻辑处理线程名为 conn-xx 。所以,如果查看mongod 或者 mongos 进程所有线程名的时候,如果发现线程名为 worker-xx ,说明当前线程在处理网络IO ;如果发现线程名为 conn-xx ,则说明当前线程在处理内部逻辑处理,对于mongod 实例可以理解为主要处理磁盘 IO

由于 synchronous 同步线程模型,同一链接对应的所有客户端请求至始至终都有同一线程处理,所以整个处理线程名不会改变,也没必要修改线程名,整个过程都是 conn-xx 线程名。

2.3 该模块函数接口总结大全

   前面分析了主要核心接口源码实现,很多其他接口没有一一列举详细分析,该模块u 所有接口功能总结如下,更多接口代码实现详见 Mongodb 内核源码详细注释分析

3. 总结

本文主要分析了service_state_machine 状态机子模块,该模块把 session 对应的客户端请求转换为 readTask 任务、 dealTask 任务和 cleanTask 任务,前两个任务通过 worker 线程完成调度处理, cleanTask 任务在内部处理异常或者链接关闭的时候由本线程直接执行,而不是通过 worker 线程调度执行。

这三个任务处理过程会分别对应到Created Source SourceWait Process SinkWait EndSession Ended 七种状态的一种或者多种,具体详见前面的状态码分析。一个正常的客户端请求状态转换过程如下 :

1)  链接刚建立的第一次请求状态转换过程:

Created->Source -> SourceWait -> Process -> SinkWait -> Source

2)  该链接后续的请求状态转换过程:

 Source -> SourceWait -> Process -> SinkWait -> Source

此外,SSM 状态机调度模块通过 ServiceStateMachine::_scheduleNextWithGuard(...) 接口和线程模型子模块关联起来。 SSM 通过该接口完成 worker 线程初始创建、 task 任务入队处理,下期将分析 << 网络线程模型子模块 >> 详细源码实现。

说明:

该模块更多接口实现细节详见Mongodb 内核源码注释: Mongodb 内核源码详细注释分析



来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69984922/viewspace-2730526/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
前滴滴出行专家工程师,OPPO文档数据库mongodb负责人,负责近千万级峰值TPS/数万亿级数据量文档数据库mongodb研发和运维工作,专注于分布式缓存、高性能中间件、数据库等研发,持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》。git账号:y12345yz

注册时间:2020-10-04

  • 博文量
    33
  • 访问量
    25625