ITPub博客

首页 > 数据库 > NoSQL > mongodb内核源码实现、性能调优、最佳运维实践系列-command命令处理模块源码实现一

mongodb内核源码实现、性能调优、最佳运维实践系列-command命令处理模块源码实现一

原创 NoSQL 作者:y123456yzzyz 时间:2020-11-09 13:35:27 0 删除 编辑

关于作者

前滴滴出行技术专家,现任OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/ 十万亿级数据量文档数据库 mongodb 内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《 MongoDB 内核源码设计、性能优化、最佳运维实践》, Github 账号地址 : https://github.com/y123456yz

1.  背景

    <<transport_layer 网络传输层模块源码实现 >> 中分享了 mongodb 内核底层网络 IO 处理相关实现,包括套接字初始化、一个完整 mongodb 报文的读取、获取到 DB 数据发送给客户端等。 Mongodb 支持多种增、删、改、查、聚合处理、 cluster 处理等操作,每个操作在内核实现中对应一个 command ,每个 command 有不同的功能, mongodb 内核如何进行 command 源码处理将是本文分析的重点

此外,mongodb 提供了 mongostat 工具来监控当前集群的各种操作统计。 Mongostat 监控统计如下图所示:

其中,insert delete update query 这四项统计比较好理解,分别对应增、删、改、查。但是, comand getmore 不是很好理解, command 代表什么统计? getMore 代表什么统计?,这两项相对比较难理解。

此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。

Command 命令处理模块分为: mongos 操作命令、 mongod 操作命令、 mongodb 集群内部命令,具体定义如下 :

①  mongos 操作命令,客户端可以通过 mongos 访问集群相关的命令。

②  mongod 操作命令:客户端可以通过 mongod 复制集和 cfg server 访问集群的相关命令。

③  mongodb 集群内部命令: mongos mongod mongo-cfg 集群实例之间交互的命令。

    Command 命令处理模块核心代码实现如下:

    command 命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。

2. <<transport_layer 网络传输层模块源码实现 >> 衔接回顾

<<transport_layer 网络传输层模块源码实现三 >> 一文中,我们对 service_state_machine 状态机调度子模块进行了分析,该模块中的 dealTask 任务进行 mongodb 内部业务逻辑处理,其核心实现如下:

1. //dealTask处理  
2. void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
3.     ......
4.     //command处理、DB访问后的数据通过dbresponse返回  
5.     DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
6.     ......
7. }

上面的_sep 对应 mongod 或者 mongos 实例的服务入口实现,该 _seq 成员分别在如下代码中初始化为 ServiceEntryPointMongod ServiceEntryPointMongod 类实现。 SSM 状态机的 _seq 成员初始化赋值核心代码实现如下:

1. //mongos实例启动初始化  
2. static ExitCode runMongosServer() {  
3.     ......  
4.     //mongos实例对应sep为ServiceEntryPointMongos  
5.     auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());  
6.     getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));  
7.     ......  
8. }  
9.   
10. //mongod实例启动初始化  
11. ExitCode _initAndListen(int listenPort) {  
12.     ......  
13.     //mongod实例对应sep为ServiceEntryPointMongod  
14.     serviceContext->setServiceEntryPoint(  
15.         stdx::make_unique<ServiceEntryPointMongod>(serviceContext));  
16.     ......  
17. }  
18.   
19. //SSM状态机初始化  
20. ServiceStateMachine::ServiceStateMachine(...)  
21.     : _state{State::Created},  
22.       //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量  
23.       _sep{svcContext->getServiceEntryPoint()},  
24.       ......  
}

      通过上面的几个核心接口实现,把mongos mongod 两个实例的服务入口与状态机 SSM( ServiceStateMachine ) 联系起来,最终和下面的 command 命令处理模块关联。

 dealTask 进行一次 mongodb 请求的内部逻辑处理,该处理由 _sep->handleRequest( ) 接口实现。由于 mongos mongod 服务入口分别由 ServiceEntryPointMongos ServiceEntryPointMongo d 两个类实现,因此 dealTask 也就演变为如下接口处理:

①  mongos 实例: ServiceEntryPointMongos :: handleRequest (...)

②  Mongod 实例 : ServiceEntryPointMongo d:: handleRequest (...)

这两个接口入参都是OperationContext Message ,分别对应操作上下文、请求原始数据内容。下文会分析 Message 解析实现、 OperationContext 服务上下文实现将在后续章节分析。

Mongod mongos 实例服务入口类都继承自网络传输模块中的 ServiceEntryPointImpl 类,如下图所示:

Tips: mongos mongod 服务入口类为何要继承网络传输模块服务入口类?

原因是一个请求对应一个链接session ,该 session 对应的请求又和 SSM 状态机唯一对应。所有客户端请求对应的 SSM 状态机信息全部保存再 ServiceEntryPointImpl._sessions 成员中,而 command 命令处理模块为 SSM 状态机任务中的 dealTask 任务,通过该继承关系, ServiceEntryPointMongod ServiceEntryPointMongos 子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的 session 链接信息。

3.  Mongodb 协议解析

在《transport_layer 网络传输层模块源码实现二》中的数据收发子模块完成了一个完整 mongodb 报文的接收,一个 mongodb 报文由 Header 头部 +opCode 包体组成,如下图所示:

上图中各个字段说明如下表:


opCode 取值比较多,早期版本中 OP_INSERT OP_DELETE OP_UPDATE OP_QUERY 分别针对增删改查请求, Mongodb 3.6 版本开始默认使用 OP_MSG 操作作为默认 opCode ,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以 OP_MSG 操作码对应协议为例进行分析,其他操作码协议分析过程类似, OP_MSG 请求协议格式如下:

1. OP_MSG {  
2.     //mongodb报文头部  
3.     MsgHeader header;            
4.     //位图,用于标识报文是否需要校验 是否需要应答等  
5.     uint32 flagBits;           // message flags  
6.     //报文内容,例如find write等命令内容通过bson格式存在于该结构中  
7.     Sections[] sections;       // data sections  
8.     //报文CRC校验  
9.     optional<uint32> checksum; // optional CRC-32C checksum  
}

OP_MSG 各个字段说明如下表:

一个完整OP_MSG 请求格式如下:

除了通用头部header 外,客户端命令请求实际上都保存于 sections 字段中,该字段存放的是请求的原始 bson 格式数据。 BSON 是由 10gen 开发的一个数据格式,目前主要用于 MongoDB 中,是 MongoDB 的数据存储格式。 BSON 基于 JSON 格式,选择 JSON 进行改造的原因主要是 JSON 的通用性及 JSON schemaless 的特性。 BSON 相比JSON 具有以下特性

①  Lightweight ( 更轻量级 )

②  Traversable ( 易操作 )

③  Efficient ( 高效性能 )

本文重点不是分析bson 协议格式, bson 协议实现细节将在后续章节分享。 bson 协议更多设计细节详见: http://bsonspec.org/

总结:一个完整mongodb 报文由 header+body 组成,其中 header 长度固定为 16 字节, body 长度等于 messageLength -16 Header 部分协议解析由 message.cpp message.h 两源码文件实现, body 部分对应的 OP_MSG 类请求解析由 op_msg.cpp op_msg.h 两源码文件实现。

3.  mongodb 报文通用头部解析及封装源码实现

Header 头部解析由 src/mongo/util/net 目录下 message.cpp和message.h 两文件完成,该类主要完成通用header 头部和 body 部分的解析、封装。因此报文头部核心代码分为以下两类:

①  报文头部内容解析及封装(MSGHEADER 命名空间实现 )

②  头部和body 内容解析及封装 (MsgData 命名空间实现 )

3.1 mongodb 报文头部解析及封装核心代码实现

mongodb 报文头部解析由 namespace MSGHEADER {...} 实现,该类主要成员及接口实现如下:

1. namespace MSGHEADER {  
2. //header头部各个字段信息  
3. struct Layout {  
4.     //整个message长度,包括header长度和body长度  
5.     int32_t messageLength;     
6.     //requestID 该请求id信息  
7.     int32_t requestID;         
8.     //getResponseToMsgId解析  
9.     int32_t responseTo;        
10.     //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等  
11.     int32_t opCode;  
12. };  
13.   
14. //ConstView实现header头部数据解析  
15. class ConstView {   
16. public:  
17.     ......  
18.     //初始化构造  
19.     ConstView(const char* data) : _data(data) {}  
20.     //获取_data地址  
21.     const char* view2ptr() const {  
22.         return data().view();  
23.     }  
24.     //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用  
25.     //解析header头部的messageLength字段  
26.     int32_t getMessageLength() const {  
27.         return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));  
28.     }  
29.     //解析header头部的requestID字段  
30.     int32_t getRequestMsgId() const {  
31.         return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));  
32.     }  
33.     //解析header头部的getResponseToMsgId字段  
34.     int32_t getResponseToMsgId() const {  
35.         return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));  
36.     }  
37.     //解析header头部的opCode字段  
38.     int32_t getOpCode() const {  
39.         return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));  
40.     }  
41.   
42. protected:  
43.     //mongodb报文数据起始地址  
44.     const view_type& data() const {  
45.         return _data;  
46.     }  
47. private:  
48.     //数据部分  
49.     view_type _data;  
50. };  
51.   
52. //View填充header头部数据  
53. class View : public ConstView {  
54. public:  
55.     ......  
56.     //构造初始化  
57.     View(char* data) : ConstView(data) {}  
58.     //header起始地址  
59.     char* view2ptr() {  
60.         return data().view();  
61.     }  
62.     //以下四个接口进行header填充  
63.     //填充header头部messageLength字段  
64.     void setMessageLength(int32_t value) {  
65.         data().write(tagLittleEndian(value), offsetof(Layout, messageLength));  
66.     }  
67.     //填充header头部requestID字段  
68.     void setRequestMsgId(int32_t value) {  
69.         data().write(tagLittleEndian(value), offsetof(Layout, requestID));  
70.     }  
71.     //填充header头部responseTo字段  
72.     void setResponseToMsgId(int32_t value) {  
73.         data().write(tagLittleEndian(value), offsetof(Layout, responseTo));  
74.     }  
75.     //填充header头部opCode字段  
76.     void setOpCode(int32_t value) {  
77.         data().write(tagLittleEndian(value), offsetof(Layout, opCode));  
78.     }  
79. private:  
80.     //指向header起始地址  
81.     view_type data() const {  
82.         return const_cast<char*>(ConstView::view2ptr());  
83.     }  
84. };  
85. }

从上面的header 头部解析、填充的实现类可以看出, header 头部解析由 MSGHEADER::ConstView 实现; header 头部填充由 MSGHEADER::View 完成。实际上代码实现上,通过 offsetof 来进行移位,从而快速定位到头部对应字段。

3.2 mongodb 报文头部 +body 解析封装核心代码实现

Namespace MSGHEADER{...} 命名空间只负责 header 头部的处理, namespace MsgData{...} 命名空间相对 MSGHEADER 命名空间更加完善,除了处理头部解析封装外,还负责 body 数据起始地址维护、 body 数据封装、数据长度检查等。 MsgData 命名空间核心代码实现如下:

1. namespace MsgData {  
2. struct Layout {  
3.     //数据填充组成:header部分  
4.     MSGHEADER::Layout header;  
5.     //数据填充组成: body部分,body先用data占位置  
6.     char data[4];  
7. };  
8.   
9. //解析header字段信息及body其实地址信息  
10. class ConstView {  
11. public:  
12.     //初始化构造  
13.     ConstView(const char* storage) : _storage(storage) {}  
14.     //获取数据起始地址  
15.     const char* view2ptr() const {  
16.         return storage().view();  
17.     }  
18.   
19.     //以下四个接口间接执行前面的MSGHEADER中的头部字段解析  
20.     //填充header头部messageLength字段  
21.     int32_t getLen() const {  
22.         return header().getMessageLength();  
23.     }  
24.     //填充header头部requestID字段  
25.     int32_t getId() const {  
26.         return header().getRequestMsgId();  
27.     }  
28.     //填充header头部responseTo字段  
29.     int32_t getResponseToMsgId() const {  
30.         return header().getResponseToMsgId();  
31.     }  
32.     //获取网络数据报文中的opCode字段  
33.     NetworkOp getNetworkOp() const {  
34.         return NetworkOp(header().getOpCode());  
35.     }  
36.     //指向body起始地址  
37.     const char* data() const {  
38.         return storage().view(offsetof(Layout, data));  
39.     }  
40.     //messageLength长度检查,opcode检查  
41.     bool valid() const {  
42.         if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))  
43.             return false;  
44.         if (getNetworkOp() < 0 || getNetworkOp() > 30000)  
45.             return false;  
46.         return true;  
47.     }  
48.     ......  
49. protected:  
50.     //获取_storage  
51.     const ConstDataView& storage() const {  
52.         return _storage;  
53.     }  
54.     //指向header起始地址  
55.     MSGHEADER::ConstView header() const {  
56.         return storage().view(offsetof(Layout, header));  
57.     }  
58. private:  
59.     //mongodb报文存储在这里  
60.     ConstDataView _storage;  
61. };  
62.   
63. //填充数据,包括Header和body  
64. class View : public ConstView {  
65. public:  
66.     //构造初始化  
67.     View(char* storage) : ConstView(storage) {}  
68.     ......  
69.     //获取报文起始地址  
70.     char* view2ptr() {  
71.         return storage().view();  
72.     }  
73.   
74.     //以下四个接口间接执行前面的MSGHEADER中的头部字段构造  
75.     //以下四个接口完成msg header赋值  
76.     //填充header头部messageLength字段  
77.     void setLen(int value) {  
78.         return header().setMessageLength(value);  
79.     }  
80.     //填充header头部messageLength字段  
81.     void setId(int32_t value) {  
82.         return header().setRequestMsgId(value);  
83.     }  
84.     //填充header头部messageLength字段  
85.     void setResponseToMsgId(int32_t value) {  
86.         return header().setResponseToMsgId(value);  
87.     }  
88.     //填充header头部messageLength字段  
89.     void setOperation(int value) {  
90.         return header().setOpCode(value);  
91.     }  
92.   
93.     using ConstView::data;  
94.     //指向data  
95.     char* data() {  
96.         return storage().view(offsetof(Layout, data));  
97.     }  
98. private:  
99.     //也就是报文起始地址  
100.     DataView storage() const {  
101.         return const_cast<char*>(ConstView::view2ptr());  
102.     }  
103.     //指向header头部  
104.     MSGHEADER::View header() const {  
105.         return storage().view(offsetof(Layout, header));  
106.     }  
107. };  
108.   
109. ......  
110. //Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度  
111. const int MsgDataHeaderSize = sizeof(Value) - 4;  
112.   
113. //除去头部后的数据部分长度  
114. inline int ConstView::dataLen() const {   
115.     return getLen() - MsgDataHeaderSize;  
116. }  
117. }  // namespace MsgData

     MSGHEADER 命名空间相比, MsgData 这个 namespace 命名空间接口实现和前面的 MSGHEADER 命名空间实现大同小异。 MsgData 不仅仅处理 header 头部的解析组装,还负责 body 部分数据头部指针指向、头部长度检查、 opCode 检查、数据填充等。其中, MsgData 命名空间中 header 头部的解析构造底层依赖 MSGHEADER 实现。

3.3 Message/DbMessage 核心代码实现

在《transport_layer 网络传输层模块源码实现二》中,从底层 ASIO 库接收到的 mongodb 报文是存放在 Message 结构中存储,最终存放在ServiceStateMachine._inMessage 成员中。

在前面第2 章我们知道 mongod mongso 实例的服务入口接口 handleRequest (...) 中都带有 Message 入参,也就是接收到的 Message 数据通过该接口处理。Message 类主要接口实现如下:

1. //DbMessage._msg成员为该类型  
2. class Message {  
3. public:  
4.     //message初始化  
5.     explicit Message(SharedBuffer data) : _buf(std::move(data)) {}  
6.     //头部header数据  
7.     MsgData::View header() const {  
8.         verify(!empty());  
9.         return _buf.get();  
10.     }  
11.     //获取网络数据报文中的op字段  
12.     NetworkOp operation() const {  
13.         return header().getNetworkOp();  
14.     }  
15.     //_buf释放为空  
16.     bool empty() const {  
17.         return !_buf;  
18.     }  
19.     //获取报文总长度messageLength  
20.     int size() const {  
21.         if (_buf) {  
22.             return MsgData::ConstView(_buf.get()).getLen();  
23.         }  
24.         return 0;  
25.     }  
26.     //body长度  
27.     int dataSize() const {  
28.         return size() - sizeof(MSGHEADER::Value);  
29.     }  
30.     //buf重置  
31.     void reset() {  
32.         _buf = {};  
33.     }  
34.     // use to set first buffer if empty  
35.     //_buf直接使用buf空间  
36.     void setData(SharedBuffer buf) {  
37.         verify(empty());  
38.         _buf = std::move(buf);  
39.     }  
40.      //把msgtxt拷贝到_buf中  
41.     void setData(int operation, const char* msgtxt) {  
42.         setData(operation, msgtxt, strlen(msgtxt) + 1);  
43.     }  
44.     //根据operation和msgdata构造一个完整mongodb报文  
45.     void setData(int operation, const char* msgdata, size_t len) {  
46.         verify(empty());  
47.         size_t dataLen = len + sizeof(MsgData::Value) - 4;  
48.         _buf = SharedBuffer::allocate(dataLen);  
49.         MsgData::View d = _buf.get();  
50.         if (len)  
51.             memcpy(d.data(), msgdata, len);  
52.         d.setLen(dataLen);  
53.         d.setOperation(operation);  
54.     }  
55.     ......  
56.     //获取_buf对应指针  
57.     const char* buf() const {  
58.         return _buf.get();  
59.     }  
60.   
61. private:  
62.     //存放接收数据的buf  
63.     SharedBuffer _buf;  
64. };

Message 是操作 mongodb 收发报文最直接的实现类,该类主要完成一个完整 mongodb 报文封装。有关 mongodb 报文头后面的 body 更多的解析实现在 DbMessage 类中完成, DbMessage 类包含 Message 类成员 _msg 。实际上, Message 报文信息在 handleRequest (...) 实例服务入口中赋值给 DbMessage._msg ,报文后续的 body 处理继续由 DbMessage 类相关接口完成处理。 DbMessage Message 类关系如下 :

1. class DbMessage {  
2.     ......  
3.     //包含Message成员变量  
4.     const Message& _msg;  
5.     //mongodb报文起始地址
6.     const char* _nsStart; 
7.     //报文结束地址
8.     const char* _theEnd; 
9. }  
10.   
11. DbMessage::DbMessage(const Message& msg) : _msg(msg),   
12.   _nsStart(NULL), _mark(NULL), _nsLen(0) {  
13.     //一个mongodb报文(header+body)数据的结束地址  
14.     _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();  
15.     //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整mongodb报文  
16.     _nextjsobj = _msg.singleData().data();  
17.     ......  
18. }

DbMessage . _msg 成员为 DbMessage  类型, DbMessage _nsStart _theEnd 成员分别记录完整mongodb 报文的起始地址和结束地址,通过这两个指针就可以获取一个完整 mongodb 报文的全部内容,包括 header body

注意: DbMessage 是早期mongodb 版本 (version<3.6) 中用于报文 body 解析封装的类,这些类针对 opCode=[dbUpdate, dbDelete] 这个区间的操作。在 mongodb 新版本 (version>=3.6) 中, body 解析及封装由 op_msg.h op_msg.cpp 代码文件中的 clase OpMsgRequest{} 完成处理。

3.4 OpMsg 报文解析封装核心代码实现

     Mongodb 3.6 版本开始默认使用 OP_MSG 操作作为默认 opCode ,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。 OP_MSG 对应 mongodb 报文 body 解析封装处理由 OpMsg 类相关接口完成, OpMsg::parse(Message) Message 中解析出报文 body 内容,其核心代码实现如下:

1. struct OpMsg {   
2.       ......  
3.     //msg解析赋值见OpMsg::parse     
4.     //各种命令(insert update find等)都存放在该body中  
5.     BSONObj body;    
6.     //sequences用法暂时没看懂,感觉没什么用?先跳过  
7.     std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse  
8. }  
 
1. //从message中解析出OpMsg信息  
2. OpMsg OpMsg::parse(const Message& message) try {  
3.     //message不能为空,并且opCode必须为dbMsg  
4.     invariant(!message.empty());  
5.     invariant(message.operation() == dbMsg);  
6.     //获取flagBits  
7.     const uint32_t flags = OpMsg::flags(message);  
8.     //flagBits有效性检查,bit 0-15中只能对第0和第1位操作  
9.     uassert(ErrorCodes::IllegalOpMsgFlag,  
10.             str::stream() << "Message contains illegal flags value: Ob"  
11.                           << std::bitset<32>(flags).to_string(),  
12.             !containsUnknownRequiredFlags(flags));  
13.   
14.     //校验码默认4字节  
15.     constexpr int kCrc32Size = 4;  
16.     //判断该mongo报文body内容是否启用了校验功能  
17.     const bool haveChecksum = flags & kChecksumPresent;  
18.     //如果有启用校验功能,则报文末尾4字节为校验码  
19.     const int checksumSize = haveChecksum ? kCrc32Size : 0;  
20.     //sections字段内容  
21.     BufReader sectionsBuf(message.singleData().data() + sizeof(flags),  
22.                           message.dataSize() - sizeof(flags) - checksumSize);  
23.   
24.     //默认先设置位false  
25.     bool haveBody = false;  
26.     OpMsg msg;  
27.     //解析sections对应命令请求数据  
28.     while (!sectionsBuf.atEof()) {  
29.         //BufReader::read读取kind内容,一个字节  
30.         const auto sectionKind = sectionsBuf.read<Section>();  
31.         //kind为0对应命令请求body内容,内容通过bson报错  
32.         switch (sectionKind) {  
33.             //sections第一个字节是0说明是body  
34.             case Section::kBody: {  
35.                 //默认只能有一个body  
36.                 uassert(40430, "Multiple body sections in message", !haveBody);  
37.                 haveBody = true;  
38.                 //命令请求的bson信息保存在这里  
39.                 msg.body = sectionsBuf.read<Validated<BSONObj>>();  
40.                 break;  
41.             }  
42.   
43.             //DocSequence暂时没看明白,用到的地方很少,跳过,后续等  
44.             //该系列文章主流功能分析完成后,从头再回首分析  
45.             case Section::kDocSequence: {  
46.                   ......  
47.             }  
48.         }  
49.     }  
50.     //OP_MSG必须有body内容  
51.     uassert(40587, "OP_MSG messages must have a body", haveBody);  
52.     //body和sequence去重判断  
53.     for (const auto& docSeq : msg.sequences) {  
54.         ......  
55.     }  
56.     return msg;  
57. }

OpMsg 类被 OpMsgRequest 类继承, OpMsgRequest 类中核心接口就是解析出 OpMsg.body 中的库信息和表信息, OpMsgRequest 类代码实现如下:

1. //协议解析得时候会用到,见runCommands  
2. struct OpMsgRequest : public OpMsg {  
3.     ......  
4.     //构造初始化  
5.     explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}  
6.     //opMsgRequestFromAnyProtocol->OpMsgRequest::parse   
7.     //从message中解析出OpMsg所需成员信息  
8.     static OpMsgRequest parse(const Message& message) {  
9.         //OpMsg::parse  
10.         return OpMsgRequest(OpMsg::parse(message));  
11.     }  
12.     //根据db body extraFields填充OpMsgRequest  
13.     static OpMsgRequest fromDBAndBody(... {  
14.         OpMsgRequest request;  
15.         request.body = ([&] {  
16.             //填充request.body  
17.             ......  
18.         }());  
19.         return request;  
20.     }  
21.     //从body中获取db name  
22.     StringData getDatabase() const {  
23.         if (auto elem = body["$db"])  
24.             return elem.checkAndGetStringData();  
25.         uasserted(40571, "OP_MSG requests require a $db argument");  
26.     }  
27.     //find  insert 等命令信息  body中的第一个elem就是command 名  
28.     StringData getCommandName() const {  
29.         return body.firstElementFieldName();  
30.     }  
31. };

   OpMsgRequest 通过 OpMsg::parse(message) 解析出OpMsg 信息,从而获取到 body 内容, GetCommandName() 接口和 getDatabase() 则分别从 body 中获取库 DB 信息、命令名信息。通过该类相关接口,命令名 (find write update ) DB 库都获取到了。

OpMsg 模块除了 OP_MSG 相关报文解析外,还负责 OP_MSG 报文组装填充,该模块接口功能大全如下表:

4.  Mongod 实例服务入口核心代码实现

Mongod 实例服务入口类 ServiceEntryPointMongod 继承 ServiceEntryPointImpl 类, mongod 实例的报文解析处理、命令解析、命令执行都由该类负责处理。 ServiceEntryPointMongod 核心接口可以细分为: opCode 解析及回调处理、命令解析及查找、命令执行三个子模块。

4.1 opCode 解析及回调处理

    OpCode 操作码解析及其回调处理由 ServiceEntryPointMongod::handleRequest (...) 接口实现,核心代码实现如下 :

1. //mongod服务对于客户端请求的处理    
2. //通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage  
3. DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {  
4.     //获取opCode,3.6版本对应客户端默认使用OP_MSG  
5.     NetworkOp op = m.operation();   
6.     ......  
7.     //根据message构造DbMessage  
8.     DbMessage dbmsg(m);  
9.     //根据操作上下文获取对应的client  
10.     Client& c = *opCtx->getClient();    
11.     ......  
12.     //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息  
13.     const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;  
14.     const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();  
15.     ....  
16.     //CurOp::debug 初始化opDebug,慢日志相关记录  
17.     OpDebug& debug = currentOp.debug();  
18.     //慢日志阀值  
19.     long long logThresholdMs = serverGlobalParams.slowMS;  
20.     //时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作    
21.     bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));  
22.     DbResponse dbresponse;  
23.     if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {  
24.         //新版本op=dbMsg,因此走这里  
25.         //从DB获取数据,获取到的数据通过dbresponse返回  
26.         dbresponse = runCommands(opCtx, m);     
27.     } else if (op == dbQuery) {  
28.         ......   
29.         //早期mongodb版本查询走这里  
30.         dbresponse = receivedQuery(opCtx, nsString, c, m);  
31.     } else if (op == dbGetMore) {    
32.         //早期mongodb版本查询走这里  
33.         dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);  
34.     } else {  
35.         ......  
36.         //早期版本增 删 改走这里处理  
37.          if (op == dbInsert) {  
38.               receivedInsert(opCtx, nsString, m); //插入操作入口   新版本CmdInsert::runImpl  
39.          } else if (op == dbUpdate) {  
40.               receivedUpdate(opCtx, nsString, m); //更新操作入口    
41.          } else if (op == dbDelete) {  
42.               receivedDelete(opCtx, nsString, m); //删除操作入口    
43.          }   
44.     }  
45.     //获取runCommands执行时间,也就是内部处理时间  
46.     debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());  
47.     ......  
48.     //慢日志记录  
49.     if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {  
50.         Locker::LockerInfo lockerInfo;    
51.         //OperationContext::lockState  LockerImpl<>::getLockerInfo  
52.         opCtx->lockState()->getLockerInfo(&lockerInfo);   
53.   
54.     //OpDebug::report 记录慢日志到日志文件  
55.         log() << debug.report(&c, currentOp, lockerInfo.stats);   
56.     }  
57.     //各种统计信息  
58.     recordCurOpMetrics(opCtx);  
59. }

Mongod handleRequest() 接口主要完成以下工作:

①  Message 中获取 OpCode ,早期版本每个命令又对应取值,例如增删改查早期版本分别对应: dbInsert dbDelete dbUpdate dbQuery Mongodb 3.6 开始,默认请求对应 OpCode 都是 OP_MSG ,本文默认只分析 OpCode=OP_MSG 相关的处理。

②  获取本操作对应的Client 客户端信息。

③  如果是早期版本,通过Message 构造 DbMessage ,同时解析出库 . 表信息。

④  根据不同OpCode 执行对应回调操作, OP_MSG 对应操作为 runCommands(...) ,获取的数据通过 dbresponse 返回。

⑤  获取到db 层返回的数据后,进行慢日志判断,如果 db 层数据访问超过阀值,记录慢日志。

⑥  设置debug 的各种统计信息。

4.2 命令解析及查找

从上面的分析可以看出,接口最后调用runCommands(...) ,该接口核心代码实现如下所示:

1. //message解析出对应command执行  
2. DbResponse runCommands(OperationContext* opCtx, const Message& message) {  
3.     //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder  
4.     //应答数据通过该类构造  
5.     auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));  
6.     [&] {  
7.         OpMsgRequest request;  
8.         try {  // Parse.  
9.             //协议解析 根据message获取对应OpMsgRequest  
10.             request = rpc::opMsgRequestFromAnyProtocol(message);  
11.         }   
12.     }  
13.     try {  // Execute.  
14.         //opCtx初始化  
15.         curOpCommandSetup(opCtx, request);  
16.         //command初始化为Null  
17.         Command* c = nullptr;  
18.         //OpMsgRequest::getCommandName查找  
19.         if (!(c = Command::findCommand(request.getCommandName()))) {   
20.              //没有找到相应的command的后续异常处理  
21.              ......  
22.         }  
23.         //执行command命令,获取到的数据通过replyBuilder.get()返回  
24.         execCommandDatabase(opCtx, c, request, replyBuilder.get());  
25.     }  
26.     //OpMsgReplyBuilder::done对数据进行序列化操作  
27.     auto response = replyBuilder->done();  
28.     //responseLength赋值  
29.     CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();  
30.     // 返回  
31.     return DbResponse{std::move(response)};  
32. }

RunCommands(...) 接口从 message 中解析出 OpMsg 信息,然后获取该 OpMsg 对应的 command 命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:

①  获取该OpCode 对应 replyBuilder OP_MSG 操作对应 builder OpMsgReplyBuilder

②  根据message 解析出 OpMsgRequest 数据, OpMsgRequest 来中包含了真正的命令请求 bson 信息。

③  opCtx 初始化操作。

④  通过request.getCommandName() 返回命令信息 ( 如“ find ”、“ update ”等字符串 )

⑤  通过Command::findCommand(command name) CommandMap 这个 map 表中查找是否支持该 command 命令。如果没找到说明不支持,如果找到说明支持。

⑥  调用execCommandDatabase(...) 执行该命令,并获取命令的执行结果。

⑦  根据command 执行结果构造 response 并返回

4.3 命令执行

1. void execCommandDatabase(...) {  
2.     ......  
3.     //获取dbname  
4.     const auto dbname = request.getDatabase().toString();  
5.     ......  
6.     //mab表存放从bson中解析出的elem信息  
7.     StringMap<int> topLevelFields;  
8.     //body elem解析  
9.     for (auto&& element : request.body) {  
10.         //获取bson中的elem信息  
11.         StringData fieldName = element.fieldNameStringData();  
12.         //如果elem信息重复,则异常处理  
13.         ......  
14.     }  
15.     //如果是help命令,则给出help提示  
16.     if (Command::isHelpRequest(helpField)) {  
17.         //给出help提示  
18.         Command::generateHelpResponse(opCtx, replyBuilder, *command);  
19.         return;  
20.     }  
21.     //权限认证检查,检查该命令执行权限  
22.     uassertStatusOK(Command::checkAuthorization(command, opCtx, request));  
23.     ......  
24.   
25.     //该命令执行次数统计  db.serverStatus().metrics.commands可以获取统计信息  
26.     command->incrementCommandsExecuted();  
27.     //真正的命令执行在这里面  
28.     retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);  
29.     //该命令执行失败次数统计  
30.     if (!retval) {  
31.         command->incrementCommandsFailed();  
32.      }  
33.      ......  
34. }

execCommandDatabase(...) 最终调用RunCommandImpl(...) 进行对应命令的真正处理,该接口核心代码实现如下:

1. bool runCommandImpl(...) {  
2.     //获取命令请求内容body  
3.     BSONObj cmd = request.body;  
4.     //获取请求中的DB库信息  
5.     const std::string db = request.getDatabase().toString();  
6.     //ReadConcern检查  
7.     Status rcStatus = waitForReadConcern(  
8.         opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));  
9.     //ReadConcern检查不通过,直接异常提示处理  
10.     if (!rcStatus.isOK()) {  
11.          //异常处理  
12.          return;  
13.     }  
14.     if (!command->supportsWriteConcern(cmd)) {  
15.         //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持  
16.         if (commandSpecifiesWriteConcern(cmd)) {  
17.             //异常处理"Command does not support writeConcern"  
18.             ......  
19.             return result;  
20.         }  
21.     //调用Command::publicRun执行不同命令操作  
22.         result = command->publicRun(opCtx, request, inPlaceReplyBob);  
23.     }  
24.     //提取WriteConcernOptions信息  
25.     auto wcResult = extractWriteConcern(opCtx, cmd, db);  
26.     //提取异常,直接异常处理  
27.     if (!wcResult.isOK()) {  
28.         //异常处理  
29.         ......  
30.         return result;  
31.     }  
32.     ......  
33.     //执行对应的命令Command::publicRun,执行不同命令操作  
34.     result = command->publicRun(opCtx, request, inPlaceReplyBob);  
35.     ......  
36. }

  RunCommandImpl(...) 接口最终调用该接口入参的 command ,执行  command->publicRun (...) 接口,也就是命令模块的公共 publicRun

4.4 总结

Mongod 服务入口首先从 message 中解析出 opCode 操作码, 3.6 版本对应客户端默认操作码为 OP_MSQ ,解析出该操作对应 OpMsgRequest 信息。然后从 message 原始数据中解析出 command 命令字符串后,继续通过全局 Map 表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。

5. Mongos 实例服务入口核心代码实现

     mongos 服务入口核心代码实现过程和 mongod 服务入口代码实现流程几乎相同, mongos 实例 message 解析、 OP_MSG 操作码处理、 command 命令查找等流程和上一章节 mongod 实例处理过程类似,本章节不在详细分析。 Mongos 实例服务入口处理调用流程如下:

ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)

最后的接口核心代码实现如下:

1. void runCommand(...) {  
2.     ......  
3.     //获取请求命令name  
4.     auto const commandName = request.getCommandName();  
5.     //从全局map表中查找  
6.     auto const command = Command::findCommand(commandName);  
7.     //没有对应的command存在,抛异常说明不支持该命令  
8.     if (!command) {   
9.         ......  
10.         return;  
11.     }   
12.     ......  
13.     //执行命令  
14.     execCommandClient(opCtx, command, request, builder);   
15.     ......  
16. }  
17. 
18. void execCommandClient(...)  
19. {   
20.     ......  
21.     //认证检查,是否有操作该command命令的权限,没有则异常提示  
22.     Status status = Command::checkAuthorization(c, opCtx, request);    
23.     if (!status.isOK()) {  
24.         Command::appendCommandStatus(result, status);  
25.         return;  
26.     }  
27.     //该命令的执行次数自增,代理上面也是要计数的  
28.     c->incrementCommandsExecuted();   
29.     //如果需要command统计,则加1  
30.     if (c->shouldAffectCommandCounter()) {  
31.         globalOpCounters.gotCommand();  
32.     }  
33.     ......  
34.     //有部分命令不支持writeconcern配置,报错  
35.     bool supportsWriteConcern = c->supportsWriteConcern(request.body);  
36.     //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern"  
37.     if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {  
38.         ......  
39.         return;  
40.     }  
41.     //执行本命令对应的公共publicRun接口,Command::publicRun  
42.     ok = c->publicRun(opCtx, request, result);   
43.     ......  
44. }

Tips: mongos mongod 实例服务入口核心代码实现的一点小区别

①  Mongod 实例 opCode 操作码解析、 OpMsg 解析、 command 查找及对应命令调用处理都由 class ServiceEntryPointMongod{...} 类一起完成。

②  mongos 实例则把 opCode 操作码解析交由 class ServiceEntryPointMongos{...} 类实现, OpMsg 解析、 command 查找及对应命令调用处理放到了 clase Strategy{...} 类来处理。

 

6.  总结

  Mongodb 报文解析及组装流程总结

①  一个完整mongodb 报文由通用报文 header 头部 +body 部分组成。

②  Body 部分内容,根据报文头部的 opCode 来决定不同的body 内容。

③  3.6 版本对应客户端请求 opCode 默认为 OP_MSG ,该操作码对应 body 部分由 flagBits + sections + checksum 组成,其中 sections 中存放的是真正的命令请求信息,已 bson 数据格式保存。

④  Header 头部和 body 报文体封装及解析过程由 class Message {...} 类实现

⑤  Body 中对应 command 命令名、库名、表名的解析在 mongodb(version<3.6) 低版本协议中由 class DbMessage {...} 类实现

⑥  Body 中对应 command 命令名、库名、表名的解析在 mongodb(version<3.6) 低版本协议中由 struct OpMsgRequest{...} 结构和 struct OpMsg {...} 类实现

 

  Mongos mongod 实例的服务入口处理流程大同小异,整体处理流程如下:

①  message 解析出 opCode 操作码,根据不同操作码执行对应操作码回调。

②  根据message 解析出 OpMsg request 信息, mongodb 报文的命令信息就存储在该 body 中,该 body bson 格式存储。

③  body 中解析出 command 命令字符串信息 ( 如“ insert ”、“ update ”等 )

④  从全局_commands map 表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。

⑤  最终找到对应command 命令后,执行 command 的功能 run 接口。

   图形化总结如下:


说明: 3 章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析 command 命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。

 

Tips: 下期继续分享不同command 命令执行细节。

7. 遗留问题

     1 章节中的统计信息,将在 command 模块核心代码分析完毕后揭晓答案,《 mongodb command 命令处理模块源码实现二》中继续分析,敬请关注。

 





来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69984922/viewspace-2732975/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
前滴滴出行专家工程师,OPPO文档数据库mongodb负责人,负责近千万级峰值TPS/数万亿级数据量文档数据库mongodb研发和运维工作,专注于分布式缓存、高性能中间件、数据库等研发,持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》。git账号:y12345yz

注册时间:2020-10-04

  • 博文量
    33
  • 访问量
    25645