ITPub博客

首页 > 数据库 > NoSQL > mongodb内核源码实现、性能调优、最佳运维实践系列-网络传输层模块源码实现二

mongodb内核源码实现、性能调优、最佳运维实践系列-网络传输层模块源码实现二

原创 NoSQL 作者:y123456yzzyz 时间:2020-10-26 12:41:09 0 删除 编辑

mongodb 网络传输层模块源码实现二

关于作者

  前滴滴出行技术专家,现任OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/ 十万亿级数据量文档数据库 mongodb 内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《 MongoDB 内核源码设计、性能优化、最佳运维实践》, Github 账号地址 : https://github.com/y123456yz

1. 说明

在之前的 <<Mongodb 网络传输处理源码实现及性能调优 - 体验内核性能极致设计 >> 一文中分析了如何阅读百万级大工程源码、Asio 网络库实现、 transport 传输层网络模块中线程模型实现,但是由于篇幅原因,传输层网络模块中的以下模块实现原理没有分析,本文降将继续分析遗留的以下子模块:

transport_layer 套接字处理及传输层管理子模块

session 会话子模块

Ticket 数据收发子模块

service_entry_point 服务入口点子模块

service_state_machine 状态机子模块 ( 该《模块在网络传输层模块源码实现三》中分析 )

service_executor 线程模型子模块 ( 该《模块在网络传输层模块源码实现四》中分析 )

 

2.  transport_layer 套接字处理及传输层管理子模块

transport_layer 套接字处理及传输层管理子模块功能包括套接字相关初始化处理、结合 asio 库实现异步 accept 处理、不同线程模型管理及初始化等,该模块的源码实现主要由以下几个文件实现:

上图是套接字处理及传输层管理子模块源码实现的相关文件,其中mock test 文件主要用于模拟测试等,所以真正核心的代码实现只有下表的几个文件,对应源码文件功能说明如下表所示:

文件名

功能

transport_layer.h

transport_layer.cpp

该子模块基类,通过该类把本模块和 Ticket 数据分发模块衔接起来,具体类实现在 transport_layer_legacy.cpp transport_layer_asio.cpp

transport_layer_legacy.h

transport_layer_legacy.cpp

早期的传输模块实现方式,现在已淘汰,不在分析

transport_layer_asio.h

transport_layer_asio.cpp

传输模块 Asio 网络 IO 处理实现,同时衔接 ServiceEntryPoint 服务入口模块和 Ticket 数据分发模块

2.1 核心代码实现

该子模块核心代码主要由 TransportLayerManager 类和TransportLayerASIO 类相关接口实现。

2.1.1   TransportLayerManager 类核心代码实现

TransportLayerManager 类主要成员及接口如下:

1. //网络会话链接,消息处理管理相关的类,在createWithConfig构造该类存入_tls  
2. class TransportLayerManager final : public TransportLayer {  
3.     //以下四个接口真正实现在TransportLayerASIO类中具体实现  
4.     Ticket sourceMessage(...) override;  
5.     Ticket sinkMessage(...) override;      
6.     Status wait(Ticket&& ticket) override;  
7.     void asyncWait(...) override;  
8.     //配置初始化实现  
9.     std::unique_ptr<TransportLayer> createWithConfig(...);  
10.   
11.     //createWithConfig中赋值,对应TransportLayerASIO,  
12.     //实际上容器中就一个成员,就是TransportLayerASIO  
13.     std::vector<std::unique_ptr<TransportLayer>> _tls;  
14. };

  TransportLayerManager 类包含一个 _tls 成员,该类最核心的 createWithConfig 接口代码实现如下:

1. //根据配置构造相应类信息  _initAndListen中调用  
2. std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(...) {  
3.     std::unique_ptr<TransportLayer> transportLayer;  
4.     //服务类型,也就是本实例是mongos还是mongod  
5.     //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos  
6.     auto sep = ctx->getServiceEntryPoint();  
7.     //net.transportLayer配置模式,默认asio, legacy模式已淘汰  
8.     if (config->transportLayer == "asio") {  
9.          //同步方式还是异步方式,默认synchronous  
10.         if (config->serviceExecutor == "adaptive") {  
11.             //动态线程池模型,也就是异步模式  
12.             opts.transportMode = transport::Mode::kAsynchronous;  
13.         } else if (config->serviceExecutor == "synchronous") {  
14.             //一个链接一个线程模型,也就是同步模式  
15.             opts.transportMode = transport::Mode::kSynchronous;  
16.         }   
17.         //如果配置是asio,构造TransportLayerASIO类  
18.         auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);  
19.         if (config->serviceExecutor == "adaptive") { //异步方式  
20.              //构造动态线程模型对应的执行器ServiceExecutorAdaptive  
21.             ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>(  
22.                 ctx, transportLayerASIO->getIOContext()));  
23.          } else if (config->serviceExecutor == "synchronous") { //同步方式  
24.             //构造一个链接一个线程模型对应的执行器ServiceExecutorSynchronous  
25.             ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx));  
26.          }  
27.          //transportLayerASIO转换为transportLayer类  
28.          transportLayer = std::move(transportLayerASIO);  
29.     }   
30.    //transportLayer转存到对应retVector数组中并返回  
31.     std::vector<std::unique_ptr<TransportLayer>> retVector;  
32.     retVector.emplace_back(std::move(transportLayer));  
33.     return stdx::make_unique<TransportLayerManager>(std::move(retVector));  
34. }

     createWithConfig 函数根据配置文件来确定对应的TransportLayer ,如果 net.transportLayer 配置为 asio ,则选用TransportLayerASIO 类来进行底层的网络 IO 处理,如果配置为 legacy ,则选用TransportLayerLegacy legacy 模式当前已淘汰,本文只分析 asio 模式实现。

asio 模式包含两种线程模型: adaptive ( 动态线程模型 ) synchronous ( 同步线程模型 ) adaptive 模式线程设计采用动态线程方式,线程数和 mongodb 压力直接相关,如果 mongodb 压力大,则线程数增加;如果 mongodb 压力变小,则线程数自动减少。同步线程模式也就是一个链接一个线程模型,线程数的多少和链接数的多少成正比,链接数越多则线程数也越大。

Mongodb 内核实现中通过 opts.transportMode 来标记asio 的线程模型,这两种模型对应标记如下:

线程模型

内核 transportMode 标记

说明

对应线程模型由那个类实现

adaptive

KAsynchronous( 异步 )

adaptive 模型也可以称为异步模型

ServiceExecutorAdaptive

synchronous

KSynchronous( 同步 )

synchronous 模型也可以称为同步模型

ServiceExecutorSynchronous

     说明:adaptive 线程模型被标记为 KAsynchronous synchronous 被标记为 KSynchronous 是有原因的, adaptive 动态线程模型网络 IO 处理借助 epoll 异步实现,而 synchronous 一个链接一个线程模型网络IO 处理是同步读写操作。 Mongodb 网络线程模型具体实现及各种优缺点可以参考: Mongodb 网络传输处理源码实现及性能调优 - 体验内核性能极致设计

2.1.2 TransportLayerASIO 类核心代码实现

TransportLayerASIO 类核心成员及接口如下:

1. class TransportLayerASIO final : public TransportLayer {  
2.     //以下四个接口主要和套接字数据读写相关  
3.     Ticket sourceMessage(...);  
4.     Ticket sinkMessage(...);  
5.     Status wait(Ticket&& ticket);  
6.     void asyncWait(Ticket&& ticket, TicketCallback callback);  
7.     void end(const SessionHandle& session);  
8.     //新链接处理  
9.     void _acceptConnection(GenericAcceptor& acceptor);  
10.       
11.     //adaptive线程模型网络IO上下文处理  
12.     std::shared_ptr<asio::io_context> _workerIOContext;   
13.     //accept接收客户端链接对应的IO上下文  
14.     std::unique_ptr<asio::io_context> _acceptorIOContext;    
15.     //bindIp配置中的ip地址列表,用于bind监听,accept客户端请求  
16.     std::vector<std::pair<SockAddr, GenericAcceptor>> _acceptors;  
17.     //listener线程负责接收客户端新链接  
18.     stdx::thread _listenerThread;  
19.     //服务类型,也就是本实例是mongos还是mongod  
20.     //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos  
21.     ServiceEntryPoint* const _sep = nullptr;  
22.     //当前运行状态  
23.     AtomicWord<bool> _running{false};  
24.     //listener处理相关的配置信息  
25.     Options _listenerOptions;  
26. }  
     从上面的类结构可以看出,该类主要通过listenerThread线程完成bind绑定及listen监听操作,同时部分接口实现新连接上的数据读写。
套接字初始化代码实现如下:
1. Status TransportLayerASIO::setup() {  
2.     std::vector<std::string> listenAddrs;  
3.     //如果没有配置bindIp,则默认监听"127.0.0.1:27017"
4.     if (_listenerOptions.ipList.empty()) {  
5.         listenAddrs = {"127.0.0.1"};  
6.     } else {  
7.         //配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗号分隔符获取ip列表存入ipList  
8.         boost::split(listenAddrs, _listenerOptions.ipList, boost::is_any_of(","), boost::token_compress_on);  
9.     }  
10.     //遍历ip地址列表  
11.     for (auto& ip : listenAddrs) {  
12.         //根据IP和端口构造对应SockAddr结构  
13.         const auto addrs = SockAddr::createAll(  
14.             ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET);  
15.         ......  
16.         //根据addr构造endpoint  
17.         asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize);  
18.         //_acceptorIOContext和_acceptors关联  
19.         GenericAcceptor acceptor(*_acceptorIOContext);  
20.         //epoll注册,也就是fd和epoll关联  
21.         //basic_socket_acceptor::open  
22.         acceptor.open(endpoint.protocol());   
23.          //SO_REUSEADDR配置 basic_socket_acceptor::set_option  
24.         acceptor.set_option(GenericAcceptor::reuse_address(true));  
25.         //非阻塞设置 basic_socket_acceptor::non_blocking  
26.         acceptor.non_blocking(true, ec);    
27.         //bind绑定    
28.         acceptor.bind(endpoint, ec);   
29.         if (ec) {  
30.             return errorCodeToStatus(ec);  
31.         }  
32.     }  
33. }

    从上面的分析可以看出,代码实现首先解析出配置文件中bindIP 中的 ip:port 列表,然后遍历列表绑定所有服务端需要监听的 ip:port ,每个 ip:port 对应一个 GenericAcceptor  ,所有 acceptor 和全局accept IO 上下文 _acceptorIOContext 关联,同时 bind() 绑定所有 ip:port

     Bind() 绑定所有配置文件中的 Ip:port 后,然后通过 TransportLayerASIO::start() 完成后续处理,该接口代码实现如下:

1. //_initAndListen中调用执行   
2. Status TransportLayerASIO::start() { //listen线程处理  
3.     ......  
4.     //这里专门起一个线程做listen相关的accept事件处理  
5.     _listenerThread = stdx::thread([this] {  
6.         //修改线程名  
7.         setThreadName("listener");   
8.         //该函数中循环处理accept事件  
9.         while (_running.load()) {  
10.             asio::io_context::work work(*_acceptorIOContext);   
11.             try {  
12.                 //accept事件调度处理  
13.                  _acceptorIOContext->run();    
14.             } catch (...) { //异常处理  
15.                 severe() << "Uncaught exception in the listener: " << exceptionToStatus();  
16.                 fassertFailed(40491);  
17.             }  
18.         }  
19.     });   
20.    遍历_acceptors,进行listen监听处理  
21.    for (auto& acceptor : _acceptors) {   
22.         acceptor.second.listen(serverGlobalParams.listenBacklog);  
23.         //异步accept回调注册在该函数中  
24.         _acceptConnection(acceptor.second);       
25.     }  
26. }

从上面的 TransportLayerASIO::start() 接口可以看出,mongodb 特地创建了一个 listener 线程用于客户端accept 事件处理,然后借助 ASIO 网络库的 _acceptorIOContext->run() 接口来调度,当有新链接到来的时候,就会执行相应的accept 回调处理, accept 回调注册到 io_context 的流程由 acceptConnection () 完成,该接口核心源码实现如下:

1. //accept新连接到来的回调注册 
2. void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {  
3.       //新链接到来时候的回调函数,服务端接收到新连接都会执行该回调
4.     //注意这里面是递归执行,保证所有accept事件都会一次处理完毕
5.     auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {  
6.         if (!_running.load())  
7.             return;  
8.   
9.         ......  
10.         //每个新的链接都会new一个新的ASIOSession  
11.         std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket)));  
12.         //新的链接处理ServiceEntryPointImpl::startSession,  
13.         //和ServiceEntryPointImpl服务入口点模块关联起来  
14.         _sep->startSession(std::move(session));  
15.         //递归,直到处理完所有的网络accept事件  
16.         _acceptConnection(acceptor);   
17.     };  
18.     //accept新连接到来后服务端的回调处理在这里注册  
19.     acceptor.async_accept(*_workerIOContext, std::move(acceptCb));  
}

TransportLayerASIO::_acceptConnection 的新连接处理过程借助ASIO 库实现,通过 acceptor.async_accept 实现所有监听的 acceptor 回调异步注册。

当服务端接收到客户端新连接事件通知后,会触发执行 acceptCb () 回调,该回调中底层 ASIO 库通过 epoll_wait 获取到所有的 accept 事件,每获取到一个 accept 事件就代表一个新的客户端链接,然后调用 ServiceEntryPointImpl::startSession () 接口处理这个新的链接事件,整个过程递归执行,保证一次可以处理所有的客户端 accept 请求信息。

每个链接都会构造一个唯一的session 信息,该 session 就代表一个唯一的新连接,链接和 session 一一对应。此外,最终会调用 ServiceEntryPointImpl::startSession () 进行真正的 accept() 处理,从而获取到一个新的链接。

注意 TransportLayerASIO::_acceptConnection () 中实现了 TransportLayerASIO 类和 ServiceEntryPointImpl 类的关联,这两个类在该接口实现了关联。

此外,从前面的 TransportLayerASIO 类结构中可以看出,该类还包含如下四个接口:sourceMessage(...) sinkMessage(...) wait(Ticket&& ticket) asyncWait(Ticket&& ticket, TicketCallback callback) ,这四个接口入参都和 Ticket 数据分发子模块相关联,具体核心代码实现如下:

1. //根据asioSession, expiration, message三个信息构造数据接收类ASIOSourceTicket  
2. Ticket TransportLayerASIO::sourceMessage(...) {  
3.     ......  
4.     auto asioSession = checked_pointer_cast<ASIOSession>(session);  
5.     //根据asioSession, expiration, message三个信息构造ASIOSourceTicket  
6.     auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);  
7.     return {this, std::move(ticket)};  
8. }  
9.   
10. //根据asioSession, expiration, message三个信息构造数据发送类ASIOSinkTicket  
11. Ticket TransportLayerASIO::sinkMessage(...) {  
12.     auto asioSession = checked_pointer_cast<ASIOSession>(session);  
13.     auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);  
14.     return {this, std::move(ticket)};  
15. }  
16.   
17. //同步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
18. Status TransportLayerASIO::wait(Ticket&& ticket) {  
19.     //获取对应Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket  
20.     auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));  
21.     auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
22.     ......  
23.     //调用对应fill接口 同步接收ASIOSourceTicket::fill 或者 同步发送ASIOSinkTicket::fill  
24.     asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });  
25.     return waitStatus;  
26. }  
27. //异步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
28. void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {  
29.     //获取对应数据收发的Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket  
30.     auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));  
31.     auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
32.   
33.    //调用对应ASIOTicket::fill  
34.     asioTicket->fill(  
35.         false,   [ callback = std::move(callback),  
36.         ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });  
37. }

  上面四个接口中的前两个接口主要通过 Session, expiration, message 这三个参数来获取对应的 Ticket  信息,实际上mongodb 内核实现中把接收数据的 Ticket 和发送数据的 Ticket 分别用不同的继承类ASIOSourceTicket ASIOSinkTicket 来区分,三个参数的作用如下表所示:

参数名

作用

Session

代表一个链接,一个 session 和一个链接意义对应

expiration

数据收发超时相关设置

message

数据内容

数据收发包括同步收发和异步收发,同步收发通过 TransportLayerASIO::wait () 实现,异步收发通过 TransportLayerASIO::asyncWait () 实现。

 

注意: 以上四个接口把 TransportLayerASIO 类和 Ticket  数据收发类的关联。    

2.2 总结
      transport_layer 套接字处理及传输层管理子模块主要由 transport_layer_manager transport_layer_asio 两个核心类组成,这两个类的核心接口功能总结如下表所示:

类命

接口名

功能说明

 

 

transport_layer_manager

TransportLayerManager::createWithConfig()

根据配置文件选择不同的 TransportLayer serviceExecutor

TransportLayerManager::setup()

已废弃

TransportLayerManager::start()

已废弃

 

 

 

 

 

 

 

TransportLayerASIO

TransportLayerASIO::Options::Options()

Net 相关配置

TransportLayerASIO::TransportLayerASIO()

初始化构造

TransportLayerASIO::sourceMessage()

获取数据接收的 Ticket

TransportLayerASIO::sinkMessage()

获取数据发送的 Ticket

TransportLayerASIO::wait()

同步发送接收或者发送数据

TransportLayerASIO::asyncWait()

异步方式发送接收或者发送数据

TransportLayerASIO::end()

关闭链接

TransportLayerASIO::setup()

创建套接字并 bind 绑定

TransportLayerASIO::start()

创建 listener 线程做监听操作,同时注册 accept 回调

TransportLayerASIO::shutdown()

shutdown 处理及资源回收

TransportLayerASIO::_acceptConnection()

Accept 回调注册

T ransport_layer_manager 中初始化TransportLayer serviceExecutor net.TransportLayer 配置可以为 legacy asio ,其中 legacy 已经淘汰,当前内核只支持 asio 模式。 asio 配置对应的 TransportLayer TransportLayerASIO 实现,对应的 serviceExecutor 线程模型可以是 adaptive 动态线程模型,也可以是 synchronous 同步线程模型。

套接字创建、bind() 绑定、 listen() 监听、 accept 事件注册等都由本类实现,同时数据分发 Ticket 模块也与本模块关联,一起配合完成整个后续 Ticket 模块模块的同步及异步数据读写流程。此外,本模块还通过 ServiceEntryPoint 服务入口子模块联动,保证了套接字初始化、accept 事件注册完成后,服务入口子模块能有序的进行新连接接收处理。

 

接下来继续分析本模块相关联的 ServiceEntryPoint 服务入口子模块和 Ticket 数据分发子模块实现。

3.  service_entry_point 服务入口点子模块

service_entry_point 服务入口点子模块主要负责如下功能:新连接处理、 Session 会话管理、接收到一个完整报文后的回调处理 ( 含报文解析、认证、引擎层处理等 )

该模块的源码实现主要包含以下几个文件:

  service_entry_point 开头的代码文件都和本模块相关,其中 service_entry_point_utils* 负责工作线程创建, service_entry_point_impl* 完成新链接回调处理及 sesseion 会话管理。

3.1 核心源码实现

     服务入口子模块相关代码实现比较简洁,主要由ServiceEntryPointImpl 类和 service_entry_point_utils 中的线程创建函数组成。

3.1.1 ServiceEntryPointImpl 类核心代码实现

ServiceEntryPointImpl 类主要成员和接口如下:

1. class ServiceEntryPointImpl : public ServiceEntryPoint {  
2.     MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);  
3. public:  
4.     //构造函数  
5.     explicit ServiceEntryPointImpl(ServiceContext* svcCtx);     
6.     //以下三个接口进行session会话处理控制  
7.     void startSession(transport::SessionHandle session) final;  
8.     void endAllSessions(transport::Session::TagMask tags) final;  
9.     bool shutdown(Milliseconds timeout) final;  
10.     //session会话统计  
11.     Stats sessionStats() const final;  
12.     ......  
13. private:  
14.     //该list结构管理所有的ServiceStateMachine信息  
15.     using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>;  
16.     //SSMList对应的迭代器  
17.     using SSMListIterator = SSMList::iterator;  
18.     //赋值ServiceEntryPointImpl::ServiceEntryPointImpl  
19.     //对应ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)类  
20.     ServiceContext* const _svcCtx;   
21.     //该成员变量在代码中没有使用  
22.     AtomicWord<std::size_t> _nWorkers;  
23.     //锁  
24.     mutable stdx::mutex _sessionsMutex;  
25.     //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中  
26.     SSMList _sessions;  
27.     //最大链接数控制  
28.     size_t _maxNumConnections{DEFAULT_MAX_CONN};  
29.     //当前的总链接数,不包括关闭的链接  
30.     AtomicWord<size_t> _currentConnections{0};  
31.     //所有的链接,包括已经关闭的链接  
32.     AtomicWord<size_t> _createdConnections{0};  
};

该类的几个接口主要是session 相关控制处理,该类中的变量成员说明如下:

成员名

功能说明

_svcCtx

服务上下文, mongod 实例对应 ServiceContextMongoD 类, mongos 代理实例对应 ServiceContextNoop

_sessionsMutex

_sessions 锁保护

_sessions

一个新链接对应一个 ssm 保存到 ServiceEntryPointImpl._sessions

_maxNumConnections

最大链接数,默认 1000000 ,可以通过 maxConns 配置

_currentConnections

当前的在线链接数,不包括以前关闭的链接

_createdConnections

所有的链接,包括已经关闭的链接

ServiceEntryPointImpl 类最核心的 startSession () 接口负责每个新连接到来后的内部回调处理,具体实现如下:

1. //新链接到来后的回调处理  
2. void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {   
3.     //获取该新连接对应的服务端和客户端地址信息  
4.     const auto& remoteAddr = session->remote().sockAddr();  
5.     const auto& localAddr = session->local().sockAddr();  
6.     //服务端和客户端地址记录到session中  
7.     auto restrictionEnvironment =  stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr);  
8.     RestrictionEnvironment::set(session, std::move(restrictionEnvironment));  
9.     ......  
10.   
11.     //获取transportMode,kAsynchronous或者kSynchronous  
12.     auto transportMode = _svcCtx->getServiceExecutor()->transportMode();  
13.     //构造ssm  
14.     auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);  
15.     {//该{}体内实现链接计数,同时把ssm统一添加到_sessions列表管理  
16.         stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);  
17.         connectionCount = _sessions.size() + 1; //连接数自增  
18.         if (connectionCount <= _maxNumConnections) {  
19.             //新来的链接对应的session保存到_sessions链表    
20.             //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中  
21.             ssmIt = _sessions.emplace(_sessions.begin(), ssm);  
22.             _currentConnections.store(connectionCount);  
23.             _createdConnections.addAndFetch(1);  
24.         }  
25.     }  
26.     //链接超限,直接退出  
27.     if (connectionCount > _maxNumConnections) {   
28.         ......  
29.         return;  
30.     }  
31.     //链接关闭的回收处理  
32.     ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] {  
33.          ......  
34.     });  
35.     //获取transport模式为同步模式还是异步模式,也就是adaptive线程模式还是synchronous线程模式  
36.     auto ownership = ServiceStateMachine::Ownership::kOwned;  
37.     if (transportMode == transport::Mode::kSynchronous) {  
38.         ownership = ServiceStateMachine::Ownership::kStatic;  
39.     }  
40.     //ServiceStateMachine::start,这里和服务状态机模块衔接起来  
41.     ssm->start(ownership);  
42. }

  该接口拿到该链接对应的服务端和客户端地址后,记录到该链接对应session 中,然后根据该 session transportMode _svcCtx 构建一个服务状态机 ssm(ServiceStateMachine) 。一个新链接对应一个唯一 session ,一个 session 对应一个唯一的服务状态机 ssm ,这三者保持唯一的一对一关系。

      最终, startSession () 让服务入口子模块、 session 会话子模块、 ssm 状态机子模块关联起来。   

3.1.2  service_entry_point_utils 核心代码实现

service_entry_point_utils 源码文件只有 launchServiceWorkerThread 一个接口,该接口主要负责工作线程创建,并设置每个工作线程的线程栈大小,如果系统默认栈大于 1M ,则每个工作线程的线程栈大小设置为 1M ,如果系统栈大小小于 1M ,则以系统堆栈大小为准,同时 warning 打印提示。该函数实现如下:   

1. Status launchServiceWorkerThread(stdx::function<void()> task) {  
2.         static const size_t kStackSize = 1024 * 1024;  //1M  
3.         struct rlimit limits;  
4.         //或者系统堆栈大小  
5.         invariant(getrlimit(RLIMIT_STACK, &limits) == 0);  
6.         //如果系统堆栈大小大于1M,则默认设置线程栈大小为1M  
7.         if (limits.rlim_cur > kStackSize) {  
8.             size_t stackSizeToSet = kStackSize;  
9.             int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet);  
10.             if (failed) {  
11.                 const auto ewd = errnoWithDescription(failed);  
12.                 warning() << "pthread_attr_setstacksize failed: " << ewd;  
13.             }  
14.         } else if (limits.rlim_cur < 1024 * 1024) {  
15.             //如果系统栈大小小于1M,则已系统堆栈为准,同时给出告警  
16.             warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB";  
17.         }}  
18.         ......  
19.         //task参数传递给新建线程  
20.         auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task));  
21.         int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());   
22.         ......  
23. }

3.2 总结

service_entry_point 服务入口点子模块主要负责新连接后的回调处理及工作线程创建,该模块和后续的 session 会话模块、 SSM 服务状态机模块衔接配合,完成数据收发的正常逻辑转换处理。上面的分析只列出了服务入口点子模块的核心接口实现,下表总结该模块所有的接口功能:

接口

功能说明

 

 

 

 

ServiceEntryPointImpl

ServiceEntryPointImpl::ServiceEntryPointImpl()

构造初始化,最大链接数限制

ServiceEntryPointImpl::startSession()

新链接的回调处理

ServiceEntryPointImpl::endAllSessions()

Ssm 服务状态机及 session 会话回收处理

ServiceEntryPointImpl::shutdown()

实例下线处理

ServiceEntryPointImpl::sessionStats()

获取链接统计信息

service_entry_point_utils

launchServiceWorkerThread

创建工作线程,同时限制每个线程对应线程栈大小

 

3.  Ticket 数据收发子模块

    Ticket 数据收发子模块主要功能如下:调用 session 子模块进行底层 asio 库处理、拆分数据接收和数据发送到两个类、完整 mongodb 报文读取 、接收或者发送 mongodb 报文后的回调处理。

3.1 ASIOTicket 类核心代码实现

Ticket 数据收发模块相关实现主要由 ASIOTicket 类完成,该类结构如下:

1. //下面的ASIOSinkTicket和ASIOSourceTicket继承该类,用于控制数据的发送和接收  
2. class TransportLayerASIO::ASIOTicket : public TicketImpl {  
3. public:  
4.     //初始化构造  
5.     explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration);  
6.     //获取sessionId  
7.     SessionId sessionId() const final {  
8.         return _sessionId;  
9.     }  
10.     //asio模式没用,针对legacy模型  
11.     Date_t expiration() const final {  
12.         return _expiration;  
13.     }  
14. 
15.     //以下四个接口用于数据收发相关处理  
16.     void fill(bool sync, TicketCallback&& cb);  
17. protected:  
18.     void finishFill(Status status);  
19.     bool isSync() const;  
20.     virtual void fillImpl() = 0;  
21. private:  
22.     //会话信息,一个链接一个session  
23.     std::weak_ptr<ASIOSession> _session;  
24.     //每个session有一个唯一id  
25.     const SessionId _sessionId;  
26.     //asio模型没用,针对legacy生效  
27.     const Date_t _expiration;  
28.     //数据发送或者接收成功后的回调处理  
29.     TicketCallback _fillCallback;  
30.     //同步方式还是异步方式进行数据处理,默认异步  
31.     bool _fillSync;  
32. };

该类保护多个成员变量,这些成员变量功能说明如下:

成员名

作用

_session

Session 会话信息,一个链接对应一个 session

_sessionId

每个 session 都有一个对应的唯一 ID

_expiration

Legacy 模式使用,当前都是用 asio ,该成员已淘汰

_fillCallback

发送或者接收一个完整 mongodb 报文后的回调处理

_fillSync

同步还是异步方式收发数据。 adaptive 线程模型为异步, synchronous 线程模型为同步读写方式

mongodb 在具体实现上,数据接收和数据发送分开实现,分别是数据接收类 ASIOSourceTicket 和数据发送类 ASIOSinkTicket ,这两个类都继承自 ASIOTicket 类,这两个类的主要结构如下:

1. //数据接收的ticket  
2. class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket {  
3. public:  
4.     //初始化构造  
5.     ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg);  
6. protected:  
7.     //数据接收Impl  
8.     void fillImpl() final;  
9. private:  
10.     //接收到mongodb头部数据后的回调处理  
11.     void _headerCallback(const std::error_code& ec, size_t size);  
12.     //接收到mongodb包体数据后的回调处理    
13.     void _bodyCallback(const std::error_code& ec, size_t size);  
14.   
15.     //存储数据的buffer,网络IO读取到的原始数据内容  
16.     SharedBuffer _buffer;  
17.     //数据Message管理,数据来源为_buffer  
18.     Message* _target;  
19. };  
1. 
2.   
20. //数据发送的ticket  
21. class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket {  
22.  public:  
23.     //初始化构造  
24.     ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg);  
25. protected:  
26.     //数据发送Impl  
27.     void fillImpl() final;  
28. private:  
29.     //发送数据完成的回调处理  
30.     void _sinkCallback(const std::error_code& ec, size_t size);  
31.     //需要发送的数据message信息  
32.     Message _msgToSend;  
33. };

   从上面的代码实现可以看出, ASIOSinkTicket  ASIOSourceTicket  类接口及成员实现几乎意义,只是具体的实现方法不同,下面对ASIOSourceTicket ASIOSinkTicket  相关核心代码实现进行分析。

3.1.2 ASIOSourceTicket  数据接收核心代码实现

数据接收过程核心代码如下:

1. //数据接收的fillImpl接口实现  
2. void TransportLayerASIO::ASIOSourceTicket::fillImpl() {    
3.     //获取对应session信息  
4.     auto session = getSession();  
5.     if (!session)  
6.         return;  
7.     //收到读取mongodb头部数据,头部数据长度是固定的kHeaderSize字节  
8.     const auto initBufSize = kHeaderSize;  
9.     _buffer = SharedBuffer::allocate(initBufSize);  
10.   
11.     //调用TransportLayerASIO::ASIOSession::read读取底层数据存入_buffer  
12.     //读完头部数据后执行对应的_headerCallback回调函数  
13.     session->read(isSync(),  
14.                   asio::buffer(_buffer.get(), initBufSize), //先读取头部字段出来  
15.                   [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); });  
16. }  
17.   
18. //读取到mongodb header头部信息后的回调处理  
19. void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) {  
20.     ......  
21.     //获取session信息  
22.     auto session = getSession();  
23.     if (!session)  
24.         return;  
25.     //从_buffer中获取头部信息  
26.     MSGHEADER::View headerView(_buffer.get());  
27.     //获取message长度  
28.     auto msgLen = static_cast<size_t>(headerView.getMessageLength());  
29.     //长度太小或者太大,直接报错  
30.     if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {  
31.         .......  
32.         return;  
33.     }  
34.     ....  
35.    //内容还不够一个mongo协议报文,继续读取body长度字节的数据,读取完毕后开始body处理  
36.    //注意这里是realloc,保证头部和body在同一个buffer中  
37.     _buffer.realloc(msgLen);   
38.     MsgData::View msgView(_buffer.get());  
39.   
40.     //调用底层TransportLayerASIO::ASIOSession::read读取数据body   
41.     session->read(isSync(),  
42.       //数据读取到该buffer                  
43.       asio::buffer(msgView.data(), msgView.dataLen()),  
44.       //读取成功后的回调处理  
45.       [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); });  
46. }  
47.   
48. //_headerCallback对header读取后解析header头部获取到对应的msg长度,然后开始body部分处理  
49. void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) {  
50.     ......  
51.     //buffer转存到_target中  
52.     _target->setData(std::move(_buffer));  
53.     //流量统计  
54.     networkCounter.hitPhysicalIn(_target->size());  
55.     //TransportLayerASIO::ASIOTicket::finishFill    
56.     finishFill(Status::OK()); //包体内容读完后,开始下一阶段的处理    
57.     //报文读取完后的下一阶段就是报文内容处理,开始走ServiceStateMachine::_processMessage  
58. }

Mongodb 协议由 msg header + msg body 组成,一个完整的 mongodb 报文内容格式如下:

上图所示各个字段及body 内容部分功能说明如下表:

Header or body

字段名

功能说明

 

msg header

messageLength

整个 message 长度,包括 header 长度和 body 长度

requestID

该请求 id 信息

responseTo

应答 id

opCode

操作类型: OP_UPDATE OP_INSERT OP_QUERY OP_DELETE

msg body

Body

不同 opCode 对应的包体内容

     ASIOSourceTicket 类的几个核心接口都是围绕这一原则展开,整个mongodb 数据接收流程如下:

1.  读取mongodb 头部 header 数据,解析出 header 中的 messageLength 字段。

2.  检查messageLength 字段是否在指定的合理范围,该字段不能小于 Header 整个头部大小,也不能超过 MaxMessageSizeBytes 最大长度。

3.  Header len 检查通过,说明读取 header 数据完成,于是执行 _headerCallback 回调。

4.  realloc 更多的空间来存储 body 内容。

5.  继续读取body len 长度数据,读取 body 完成后,执行 _bodyCallback 回调处理。

3.1.3 ASIOSinkTicket 数据发送类核心代码实现

    ASIOSinkTicket 发送类相比接收类,没有数据解析相关的流程,因此实现过程会更加简单,具体源码实现如下:

1. //发送数据成功后的回调处理  
2. void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) {  
3.     //发送的网络字节数统计  
4.     networkCounter.hitPhysicalOut(_msgToSend.size());   
5.     //执行SSM中对应的状态流程  
6.     finishFill(ec ? errorCodeToStatus(ec) : Status::OK());  
7. }  
8.   
9. //发送数据的fillImpl  
10. void TransportLayerASIO::ASIOSinkTicket::fillImpl() {  
11.     //获取对应session  
12.     auto session = getSession();  
13.     if (!session)  
14.         return;  
15.   
16.     //调用底层TransportLayerASIO::ASIOSession::write发送数据,发送成功后执行_sinkCallback回调   
17.     session->write(isSync(),  
18.        asio::buffer(_msgToSend.buf(), _msgToSend.size()),  
19.        //发送数据成功后的callback回调  
20.        [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); });  
21. }

3.2 总结

从上面的分析可以看出,Ticket 数据收发模块主要调用 session 会话模块来进行底层数据的读写、解析,当读取或者发送一个完整的 mongodb 报文后最终交由 SSM 服务状态机模块调度处理。

ticket 模块主要接口功能总结如下表所示:

类命

接口名

功能说明

 

 

ASIOTicket

ASIOTicket::getSession()

获取 session 信息

ASIOTicket::isSync()

判断是同步收发还是异步收发

ASIOTicket::finishFill()

执行 _fillCallback 回调

ASIOTicket::fill()

_fillCallback 赋值

ASIOTicket::ASIOTicket()

ASIOTicket 构造初始化

 

ASIOSourceTicket

ASIOSourceTicket::ASIOSourceTicket()

ASIOSourceTicket 初始化

ASIOSourceTicket::_bodyCallback()

接收到 mesg header+body 后的回调处理

ASIOSourceTicket::_headerCallback

接收到 msg header 后的回调处理

 

ASIOSinkTicket

ASIOSinkTicket::ASIOSinkTicket()

ASIOSinkTicket 初始化构造

ASIOSourceTicket::fillImpl()

发送指定 msg 数据,发送完成后执行回调

ASIOSinkTicket::_sinkCallback

发送 msg 成功后的回调处理

前面的分析也可以看出,Ticket 数据收发模块会调用 session 处理模块来进行真正的数据读写,同时接收或者发送完一个完整 mongodb 报文后的后续回调处理讲交由 SSM 服务状态机模块处理。

4.  Session 会话子模块

Session 会话模块功能主要如下:负责记录 HostAndPort 、和底层 asio 库直接互动,实现数据的同步或者异步收发。一个新连接 fd 对应一个唯一的 session ,对 fd 的操作直接映射为 session 操作。 Session 会话子模块主要代码实现相关文件如下:

4.1 session 会话子模块核心代码实现

1. class TransportLayerASIO::ASIOSession : public Session {  
2.     //初始化构造  
3.     ASIOSession(TransportLayerASIO* tl, GenericSocket socket);  
4.     //获取本session使用的tl  
5.     TransportLayer* getTransportLayer();  
6.     //以下四个接口套接字相关,本端/对端地址获取,获取fd,关闭fd等  
7.     const HostAndPort& remote();  
8.     const HostAndPort& local();  
9.     GenericSocket& getSocket();  
10.     void shutdown();  
11.   
12.     //以下四个接口调用asio网络库实现数据的同步收发和异步收发  
13.     void read(...)  
14.     void write(...)  
15.     void opportunisticRead(...)  
16.     void opportunisticWrite(...)  
17.   
18.     //远端地址信息  
19.     HostAndPort _remote;  
20.     //本段地址信息  
21.     HostAndPort _local;  
22.     //赋值见TransportLayerASIO::_acceptConnection  
23.     //也就是fd,一个新连接对应一个_socket  
24.     GenericSocket _socket;  
25.     //SSL相关不做分析,  
26. #ifdef MONGO_CONFIG_SSL  
27.     boost::optional<asio::ssl::stream<decltype(_socket)>> _sslSocket;  
28.     bool _ranHandshake = false;  
29. #endif  
30.   
31.     //本套接字对应的tl,赋值建TransportLayerASIO::_acceptConnection(...)  
32.     TransportLayerASIO* const _tl;  
33. }

该类最核心的三个接口ASIOSession(...) opportunisticRead(...) opportunisticWrite(..) 分别完成套接字处理、调用 asio 库接口实现底层数据读和底层数据写。这三个核心接口源码实现如下:

1. //初始化构造 TransportLayerASIO::_acceptConnection调用  
2. ASIOSession(TransportLayerASIO* tl, GenericSocket socket)  
3.     //fd描述符及TL初始化赋值  
4.     : _socket(std::move(socket)), _tl(tl) {  
5.     std::error_code ec;  
6.   
7.     //异步方式设置为非阻塞读  
8.     _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec);  
9.     fassert(40490, ec.value() == 0);  
10.   
11.     //获取套接字的family  
12.     auto family = endpointToSockAddr(_socket.local_endpoint()).getType();  
13.     //满足AF_INET
14.     if (family == AF_INET || family == AF_INET6) {  
15.         //no_delay keep_alive套接字系统参数设置  
16.         _socket.set_option(asio::ip::tcp::no_delay(true));  
17.         _socket.set_option(asio::socket_base::keep_alive(true));  
18.         //KeepAlive系统参数设置  
19.         setSocketKeepAliveParams(_socket.native_handle());  
20.     }  
21.   
22.     //获取本端和对端地址  
23.     _local = endpointToHostAndPort(_socket.local_endpoint());  
24.     _remote = endpointToHostAndPort(_socket.remote_endpoint(ec));  
25.     if (ec) {  
26.         LOG(3) << "Unable to get remote endpoint address: " << ec.message();  
27.     }  
28. }

该类初始化的时候完成新连接_socket 相关的初始化设置,包括阻塞读写还是非阻塞读写。如果是同步线程模型 ( 一个链接一个线程 ) ,则读写方式位阻塞读写;如果是异步线程模型 (adaptive 动态线程模型 ) ,则调用 asio 网络库接口实现异步读写。

此外,该链接_socket 对应的客户端 ip:port 和服务端 ip:port 也在该初始化类中获取,最终保存到本 session _remote _local 成员中。

数据读取核心代码实现如下:

1. //读取指定长度数据,然后执行handler回调  
2. void opportunisticRead(...) {  
3.     std::error_code ec;  
4.     //如果是异步线程模型,在ASIOSession构造初始化的时候会设置non_blocking非阻塞模式  
5.     //异步线程模型这里实际上是非阻塞读取,如果是同步线程模型,则没有non_blocking设置,也就是阻塞读取  
6.     auto size = asio::read(stream, buffers, ec);    
7.   
8.     //如果是异步读,并且read返回would_block或者try_again说明指定长度的数据还没有读取完毕  
9.     if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {  
10.         //buffers有大小size,实际读最多读size字节  
11.         MutableBufferSequence asyncBuffers(buffers);  
12.         if (size > 0) {  
13.             asyncBuffers += size; //buffer offset向后移动  
14.         }  
15.   
16.         //继续异步方式读取数据,读取到指定长度数据后执行handler回调处理  
17.         asio::async_read(stream, asyncBuffers, std::forward<CompleteHandler>(handler));  
18.     } else {   
19.         //阻塞方式读取read返回后可以保证读取到了size字节长度的数据  
20.         //直接read获取到size字节数据,则直接执行handler   
21.         handler(ec, size);  
22.     }  
23. }

opportunisticRead 首先调用asio::read(stream, buffers, ec) 读取 buffers 对应 size 长度的数据, buffers 空间大小就是需要读取的数据 size 大小。如果是同步线程模型,则这里为阻塞式读取,直到读到 size 字节才会返回;如果是异步线程模型,这这里是非阻塞读取,非阻塞读取当内核网络协议栈数据读取完毕后,如果还没有读到 size 字节,则继续进行 async_read 异步读取。

buffers 指定的 size 字节全部读取完整后,不管是同步模式还是异步模式,最终都会执行 handler 回调,开始后续的数据解析及处理流程。

发送数据核心代码实现如下:

1. //发送数据  
2. void opportunisticWrite(...) {  
3.     std::error_code ec;  
4.     //如果是同步模式,则阻塞写,直到全部写成功。异步模式则非阻塞写  
5.     auto size = asio::write(stream, buffers, ec);   
6.   
7.     //异步写如果返回try_again说明数据还没有发送完,则继续异步写发送  
8.     if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {  
9.         ConstBufferSequence asyncBuffers(buffers);  
10.         if (size > 0) {  //buffer中数据指针偏移计数
11.             asyncBuffers += size;  
12.         }  
13.         //异步写发送完成,执行handler回调  
14.         asio::async_write(stream, asyncBuffers, std::forward<CompleteHandler>(handler));  
15.     } else {  
16.         //同步写成功,则直接执行handler回调处理  
17.         handler(ec, size);  
18.     }  
19. }

数据发送流程和数据接收流程类似,也分位同步模式发送和异步模式发送,同步模式发送为阻塞式写,只有当所有数据通过 asio::write () 发送成功后才返回;异步模式发送为非阻塞写, asio::write () 不一定全部发送出去,因此需要再次调用 asio 库的 asio::async_write( ) 进行异步发送。

不管是同步模式还是异步模式发送数据,最终数据发送成功后,都会调用 handler () 回调来执行后续的流程。

4.2 总结

从上面的代码分析可以看出,session 会话模块最终直接和 asio 网络库交互实现数据的读写操作。该模块核心接口功能总结如下表:

类名

接口名

功能说明

 

 

 

 

ASIOSession

ASIOSession(...)

新连接 fd 相关处理,如是否非阻塞、 delay 设置、 keep_alive 设置、两端地址获取等

getTransportLayer()

获取对应 TL

remote()

获取该链接对应的对端 Ip:port 信息

local()

获取该链接对应的对端 Ip:port 信息

getSocket()

获取该链接的 fd

shutdown()

fd 回收处理

opportunisticRead()

同步或者异步数据读操作

opportunisticWrite()

同步或者异步数据写操作

5.  总结

     Mongodb 网络传输处理源码实现及性能调优 - 体验内核性能极致设计》 一文对 mongodb 网络传输模块中的 ASIO 网络库实现、 service_executor 服务运行子模块 ( 即线程模型子模块 ) 进行了详细的分析,加上本文的 transport_layer 套接字处理及传输层管理子模块、 session 会话子模块、 Ticket 数据收发子模块、 service_entry_point 服务入口点子模块。

    transport_layer 套接字处理及传输层管理子模块主要由 transport_layer_manager transport_layer_asio 两个核心类组成。分别完成 net 相关的配置文件初始化操作,套接字初始化及处理,最终 transport_layer_asio 的相应接口实现了和 ticket 数据分发子模块、服务入口点子模块的关联。

     服务入口子模块主要由ServiceEntryPointImpl 类和 service_entry_point_utils 中的线程创建函数组成,实现新连接 accept 处理及控制。该模块通过 startSession() 让服务入口子模块、 session 会话子模块、 ssm 状态机子模块关联起来。

    ticket 数据收发子模块主要功能如下:调用 session 子模块进行底层 asio 库处理、拆分数据接收和数据发送到两个类、完整 mongodb 报文读取 、接收或者发送 mongodb 报文后的回调处理 , 回调处理由 SSM 服务状态机模块管理,当读取或者发送一个完整的 mongodb 报文后最终交由 SSM 服务状态机模块调度处理。。

    Session 会话模块功能主要如下:负责记录 HostAndPort 、和底层 asio 库直接互动,实现数据的同步或者异步收发。一个新连接 fd 对应一个唯一的 session ,对 fd 的操作直接映射为 session 操作。

到这里,整个mongodb 网络传输层模块分析只差 service_state_machine 状态机调度子模块,状态机调度子模块相比本文分析的几个子模块更加复杂,因此将在下期《 mongodb 网络传输层模块源码分析三》中单独分析。

本文所有源码注释分析详见如下链接: mongodb 网络传输模块详细源码分析


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69984922/viewspace-2729717/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
前滴滴出行专家工程师,OPPO文档数据库mongodb负责人,负责千万级峰值TPS/十万亿级数据量文档数据库mongodb研发和运维工作,专注于分布式缓存、高性能中间件、数据库等研发,持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》。git账号:y12345yz

注册时间:2020-10-04

  • 博文量
    24
  • 访问量
    11945