前滴滴出行技术专家,现任OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/ 十万亿级数据量文档数据库 mongodb 内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《 MongoDB 内核源码设计、性能优化、最佳运维实践》, Github 账号地址 : https://github.com/y123456yz
<<transport_layer 网络传输层模块源码实现 >> 中分享了 mongodb 内核底层网络 IO 处理相关实现,包括套接字初始化、一个完整 mongodb 报文的读取、获取到 DB 数据发送给客户端等。 Mongodb 支持多种增、删、改、查、聚合处理、 cluster 处理等操作,每个操作在内核实现中对应一个 command ,每个 command 有不同的功能, mongodb 内核如何进行 command 源码处理将是本文分析的重点
此外,mongodb 提供了 mongostat 工具来监控当前集群的各种操作统计。 Mongostat 监控统计如下图所示:
其中,insert 、 delete 、 update 、 query 这四项统计比较好理解,分别对应增、删、改、查。但是, comand 、 getmore 不是很好理解, command 代表什么统计? getMore 代表什么统计?,这两项相对比较难理解。
此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。
Command 命令处理模块分为: mongos 操作命令、 mongod 操作命令、 mongodb 集群内部命令,具体定义如下 :
① mongos 操作命令,客户端可以通过 mongos 访问集群相关的命令。
② mongod 操作命令:客户端可以通过 mongod 复制集和 cfg server 访问集群的相关命令。
③ mongodb 集群内部命令: mongos 、 mongod 、 mongo-cfg 集群实例之间交互的命令。
Command 命令处理模块核心代码实现如下:
《command 命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。
<<transport_layer 网络传输层模块源码实现三 >> 一文中,我们对 service_state_machine 状态机调度子模块进行了分析,该模块中的 dealTask 任务进行 mongodb 内部业务逻辑处理,其核心实现如下:
1. //dealTask处理 2. void ServiceStateMachine::_processMessage(ThreadGuard guard) { 3. ...... 4. //command处理、DB访问后的数据通过dbresponse返回 5. DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); 6. ...... 7. }
上面的_sep 对应 mongod 或者 mongos 实例的服务入口实现,该 _seq 成员分别在如下代码中初始化为 ServiceEntryPointMongod 和 ServiceEntryPointMongod 类实现。 SSM 状态机的 _seq 成员初始化赋值核心代码实现如下:
1. //mongos实例启动初始化 2. static ExitCode runMongosServer() { 3. ...... 4. //mongos实例对应sep为ServiceEntryPointMongos 5. auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext()); 6. getGlobalServiceContext()->setServiceEntryPoint(std::move(sep)); 7. ...... 8. } 9. 10. //mongod实例启动初始化 11. ExitCode _initAndListen(int listenPort) { 12. ...... 13. //mongod实例对应sep为ServiceEntryPointMongod 14. serviceContext->setServiceEntryPoint( 15. stdx::make_unique<ServiceEntryPointMongod>(serviceContext)); 16. ...... 17. } 18. 19. //SSM状态机初始化 20. ServiceStateMachine::ServiceStateMachine(...) 21. : _state{State::Created}, 22. //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量 23. _sep{svcContext->getServiceEntryPoint()}, 24. ...... }
通过上面的几个核心接口实现,把mongos 和 mongod 两个实例的服务入口与状态机 SSM( ServiceStateMachine ) 联系起来,最终和下面的 command 命令处理模块关联。
dealTask 进行一次 mongodb 请求的内部逻辑处理,该处理由 _sep->handleRequest( ) 接口实现。由于 mongos 和 mongod 服务入口分别由 ServiceEntryPointMongos 和 ServiceEntryPointMongo d 两个类实现,因此 dealTask 也就演变为如下接口处理:
① mongos 实例: ServiceEntryPointMongos :: handleRequest (...)
② Mongod 实例 : : ServiceEntryPointMongo d:: handleRequest (...)
这两个接口入参都是OperationContext 和 Message ,分别对应操作上下文、请求原始数据内容。下文会分析 Message 解析实现、 OperationContext 服务上下文实现将在后续章节分析。
Mongod 和 mongos 实例服务入口类都继承自网络传输模块中的 ServiceEntryPointImpl 类,如下图所示:
Tips: mongos 和 mongod 服务入口类为何要继承网络传输模块服务入口类?
原因是一个请求对应一个链接session ,该 session 对应的请求又和 SSM 状态机唯一对应。所有客户端请求对应的 SSM 状态机信息全部保存再 ServiceEntryPointImpl._sessions 成员中,而 command 命令处理模块为 SSM 状态机任务中的 dealTask 任务,通过该继承关系, ServiceEntryPointMongod 和 ServiceEntryPointMongos 子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的 session 链接信息。
在《transport_layer 网络传输层模块源码实现二》中的数据收发子模块完成了一个完整 mongodb 报文的接收,一个 mongodb 报文由 Header 头部 +opCode 包体组成,如下图所示:
上图中各个字段说明如下表:
opCode 取值比较多,早期版本中 OP_INSERT 、 OP_DELETE 、 OP_UPDATE 、 OP_QUERY 分别针对增删改查请求, Mongodb 从 3.6 版本开始默认使用 OP_MSG 操作作为默认 opCode ,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以 OP_MSG 操作码对应协议为例进行分析,其他操作码协议分析过程类似, OP_MSG 请求协议格式如下:
1. OP_MSG { 2. //mongodb报文头部 3. MsgHeader header; 4. //位图,用于标识报文是否需要校验 是否需要应答等 5. uint32 flagBits; // message flags 6. //报文内容,例如find write等命令内容通过bson格式存在于该结构中 7. Sections[] sections; // data sections 8. //报文CRC校验 9. optional<uint32> checksum; // optional CRC-32C checksum }
OP_MSG 各个字段说明如下表:
一个完整OP_MSG 请求格式如下:
除了通用头部header 外,客户端命令请求实际上都保存于 sections 字段中,该字段存放的是请求的原始 bson 格式数据。 BSON 是由 10gen 开发的一个数据格式,目前主要用于 MongoDB 中,是 MongoDB 的数据存储格式。 BSON 基于 JSON 格式,选择 JSON 进行改造的原因主要是 JSON 的通用性及 JSON 的 schemaless 的特性。 BSON 相比JSON 具有以下特性 :
① Lightweight ( 更轻量级 )
② Traversable ( 易操作 )
③ Efficient ( 高效性能 )
本文重点不是分析bson 协议格式, bson 协议实现细节将在后续章节分享。 bson 协议更多设计细节详见: http://bsonspec.org/
总结:一个完整mongodb 报文由 header+body 组成,其中 header 长度固定为 16 字节, body 长度等于 messageLength -16 。 Header 部分协议解析由 message.cpp 和 message.h 两源码文件实现, body 部分对应的 OP_MSG 类请求解析由 op_msg.cpp 和 op_msg.h 两源码文件实现。
Header 头部解析由 src/mongo/util/net 目录下 message.cpp和message.h 两文件完成,该类主要完成通用header 头部和 body 部分的解析、封装。因此报文头部核心代码分为以下两类:
① 报文头部内容解析及封装(MSGHEADER 命名空间实现 )
② 头部和body 内容解析及封装 (MsgData 命名空间实现 )
mongodb 报文头部解析由 namespace MSGHEADER {...} 实现,该类主要成员及接口实现如下:
1. namespace MSGHEADER { 2. //header头部各个字段信息 3. struct Layout { 4. //整个message长度,包括header长度和body长度 5. int32_t messageLength; 6. //requestID 该请求id信息 7. int32_t requestID; 8. //getResponseToMsgId解析 9. int32_t responseTo; 10. //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等 11. int32_t opCode; 12. }; 13. 14. //ConstView实现header头部数据解析 15. class ConstView { 16. public: 17. ...... 18. //初始化构造 19. ConstView(const char* data) : _data(data) {} 20. //获取_data地址 21. const char* view2ptr() const { 22. return data().view(); 23. } 24. //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用 25. //解析header头部的messageLength字段 26. int32_t getMessageLength() const { 27. return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength)); 28. } 29. //解析header头部的requestID字段 30. int32_t getRequestMsgId() const { 31. return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID)); 32. } 33. //解析header头部的getResponseToMsgId字段 34. int32_t getResponseToMsgId() const { 35. return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo)); 36. } 37. //解析header头部的opCode字段 38. int32_t getOpCode() const { 39. return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode)); 40. } 41. 42. protected: 43. //mongodb报文数据起始地址 44. const view_type& data() const { 45. return _data; 46. } 47. private: 48. //数据部分 49. view_type _data; 50. }; 51. 52. //View填充header头部数据 53. class View : public ConstView { 54. public: 55. ...... 56. //构造初始化 57. View(char* data) : ConstView(data) {} 58. //header起始地址 59. char* view2ptr() { 60. return data().view(); 61. } 62. //以下四个接口进行header填充 63. //填充header头部messageLength字段 64. void setMessageLength(int32_t value) { 65. data().write(tagLittleEndian(value), offsetof(Layout, messageLength)); 66. } 67. //填充header头部requestID字段 68. void setRequestMsgId(int32_t value) { 69. data().write(tagLittleEndian(value), offsetof(Layout, requestID)); 70. } 71. //填充header头部responseTo字段 72. void setResponseToMsgId(int32_t value) { 73. data().write(tagLittleEndian(value), offsetof(Layout, responseTo)); 74. } 75. //填充header头部opCode字段 76. void setOpCode(int32_t value) { 77. data().write(tagLittleEndian(value), offsetof(Layout, opCode)); 78. } 79. private: 80. //指向header起始地址 81. view_type data() const { 82. return const_cast<char*>(ConstView::view2ptr()); 83. } 84. }; 85. }
从上面的header 头部解析、填充的实现类可以看出, header 头部解析由 MSGHEADER::ConstView 实现; header 头部填充由 MSGHEADER::View 完成。实际上代码实现上,通过 offsetof 来进行移位,从而快速定位到头部对应字段。
Namespace MSGHEADER{...} 命名空间只负责 header 头部的处理, namespace MsgData{...} 命名空间相对 MSGHEADER 命名空间更加完善,除了处理头部解析封装外,还负责 body 数据起始地址维护、 body 数据封装、数据长度检查等。 MsgData 命名空间核心代码实现如下:
1. namespace MsgData { 2. struct Layout { 3. //数据填充组成:header部分 4. MSGHEADER::Layout header; 5. //数据填充组成: body部分,body先用data占位置 6. char data[4]; 7. }; 8. 9. //解析header字段信息及body其实地址信息 10. class ConstView { 11. public: 12. //初始化构造 13. ConstView(const char* storage) : _storage(storage) {} 14. //获取数据起始地址 15. const char* view2ptr() const { 16. return storage().view(); 17. } 18. 19. //以下四个接口间接执行前面的MSGHEADER中的头部字段解析 20. //填充header头部messageLength字段 21. int32_t getLen() const { 22. return header().getMessageLength(); 23. } 24. //填充header头部requestID字段 25. int32_t getId() const { 26. return header().getRequestMsgId(); 27. } 28. //填充header头部responseTo字段 29. int32_t getResponseToMsgId() const { 30. return header().getResponseToMsgId(); 31. } 32. //获取网络数据报文中的opCode字段 33. NetworkOp getNetworkOp() const { 34. return NetworkOp(header().getOpCode()); 35. } 36. //指向body起始地址 37. const char* data() const { 38. return storage().view(offsetof(Layout, data)); 39. } 40. //messageLength长度检查,opcode检查 41. bool valid() const { 42. if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize)) 43. return false; 44. if (getNetworkOp() < 0 || getNetworkOp() > 30000) 45. return false; 46. return true; 47. } 48. ...... 49. protected: 50. //获取_storage 51. const ConstDataView& storage() const { 52. return _storage; 53. } 54. //指向header起始地址 55. MSGHEADER::ConstView header() const { 56. return storage().view(offsetof(Layout, header)); 57. } 58. private: 59. //mongodb报文存储在这里 60. ConstDataView _storage; 61. }; 62. 63. //填充数据,包括Header和body 64. class View : public ConstView { 65. public: 66. //构造初始化 67. View(char* storage) : ConstView(storage) {} 68. ...... 69. //获取报文起始地址 70. char* view2ptr() { 71. return storage().view(); 72. } 73. 74. //以下四个接口间接执行前面的MSGHEADER中的头部字段构造 75. //以下四个接口完成msg header赋值 76. //填充header头部messageLength字段 77. void setLen(int value) { 78. return header().setMessageLength(value); 79. } 80. //填充header头部messageLength字段 81. void setId(int32_t value) { 82. return header().setRequestMsgId(value); 83. } 84. //填充header头部messageLength字段 85. void setResponseToMsgId(int32_t value) { 86. return header().setResponseToMsgId(value); 87. } 88. //填充header头部messageLength字段 89. void setOperation(int value) { 90. return header().setOpCode(value); 91. } 92. 93. using ConstView::data; 94. //指向data 95. char* data() { 96. return storage().view(offsetof(Layout, data)); 97. } 98. private: 99. //也就是报文起始地址 100. DataView storage() const { 101. return const_cast<char*>(ConstView::view2ptr()); 102. } 103. //指向header头部 104. MSGHEADER::View header() const { 105. return storage().view(offsetof(Layout, header)); 106. } 107. }; 108. 109. ...... 110. //Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度 111. const int MsgDataHeaderSize = sizeof(Value) - 4; 112. 113. //除去头部后的数据部分长度 114. inline int ConstView::dataLen() const { 115. return getLen() - MsgDataHeaderSize; 116. } 117. } // namespace MsgData
和MSGHEADER 命名空间相比, MsgData 这个 namespace 命名空间接口实现和前面的 MSGHEADER 命名空间实现大同小异。 MsgData 不仅仅处理 header 头部的解析组装,还负责 body 部分数据头部指针指向、头部长度检查、 opCode 检查、数据填充等。其中, MsgData 命名空间中 header 头部的解析构造底层依赖 MSGHEADER 实现。
在《transport_layer 网络传输层模块源码实现二》中,从底层 ASIO 库接收到的 mongodb 报文是存放在 Message 结构中存储,最终存放在ServiceStateMachine._inMessage 成员中。
在前面第2 章我们知道 mongod 和 mongso 实例的服务入口接口 handleRequest (...) 中都带有 Message 入参,也就是接收到的 Message 数据通过该接口处理。Message 类主要接口实现如下:
1. //DbMessage._msg成员为该类型 2. class Message { 3. public: 4. //message初始化 5. explicit Message(SharedBuffer data) : _buf(std::move(data)) {} 6. //头部header数据 7. MsgData::View header() const { 8. verify(!empty()); 9. return _buf.get(); 10. } 11. //获取网络数据报文中的op字段 12. NetworkOp operation() const { 13. return header().getNetworkOp(); 14. } 15. //_buf释放为空 16. bool empty() const { 17. return !_buf; 18. } 19. //获取报文总长度messageLength 20. int size() const { 21. if (_buf) { 22. return MsgData::ConstView(_buf.get()).getLen(); 23. } 24. return 0; 25. } 26. //body长度 27. int dataSize() const { 28. return size() - sizeof(MSGHEADER::Value); 29. } 30. //buf重置 31. void reset() { 32. _buf = {}; 33. } 34. // use to set first buffer if empty 35. //_buf直接使用buf空间 36. void setData(SharedBuffer buf) { 37. verify(empty()); 38. _buf = std::move(buf); 39. } 40. //把msgtxt拷贝到_buf中 41. void setData(int operation, const char* msgtxt) { 42. setData(operation, msgtxt, strlen(msgtxt) + 1); 43. } 44. //根据operation和msgdata构造一个完整mongodb报文 45. void setData(int operation, const char* msgdata, size_t len) { 46. verify(empty()); 47. size_t dataLen = len + sizeof(MsgData::Value) - 4; 48. _buf = SharedBuffer::allocate(dataLen); 49. MsgData::View d = _buf.get(); 50. if (len) 51. memcpy(d.data(), msgdata, len); 52. d.setLen(dataLen); 53. d.setOperation(operation); 54. } 55. ...... 56. //获取_buf对应指针 57. const char* buf() const { 58. return _buf.get(); 59. } 60. 61. private: 62. //存放接收数据的buf 63. SharedBuffer _buf; 64. };
Message 是操作 mongodb 收发报文最直接的实现类,该类主要完成一个完整 mongodb 报文封装。有关 mongodb 报文头后面的 body 更多的解析实现在 DbMessage 类中完成, DbMessage 类包含 Message 类成员 _msg 。实际上, Message 报文信息在 handleRequest (...) 实例服务入口中赋值给 DbMessage._msg ,报文后续的 body 处理继续由 DbMessage 类相关接口完成处理。 DbMessage 和 Message 类关系如下 :
1. class DbMessage { 2. ...... 3. //包含Message成员变量 4. const Message& _msg; 5. //mongodb报文起始地址 6. const char* _nsStart; 7. //报文结束地址 8. const char* _theEnd; 9. } 10. 11. DbMessage::DbMessage(const Message& msg) : _msg(msg), 12. _nsStart(NULL), _mark(NULL), _nsLen(0) { 13. //一个mongodb报文(header+body)数据的结束地址 14. _theEnd = _msg.singleData().data() + _msg.singleData().dataLen(); 15. //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整mongodb报文 16. _nextjsobj = _msg.singleData().data(); 17. ...... 18. }
DbMessage . _msg 成员为 DbMessage 类型, DbMessage 的 _nsStart 和 _theEnd 成员分别记录完整mongodb 报文的起始地址和结束地址,通过这两个指针就可以获取一个完整 mongodb 报文的全部内容,包括 header 和 body 。
注意: DbMessage 是早期mongodb 版本 (version<3.6) 中用于报文 body 解析封装的类,这些类针对 opCode=[dbUpdate, dbDelete] 这个区间的操作。在 mongodb 新版本 (version>=3.6) 中, body 解析及封装由 op_msg.h 和 op_msg.cpp 代码文件中的 clase OpMsgRequest{} 完成处理。
Mongodb 从 3.6 版本开始默认使用 OP_MSG 操作作为默认 opCode ,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。 OP_MSG 对应 mongodb 报文 body 解析封装处理由 OpMsg 类相关接口完成, OpMsg::parse(Message) 从 Message 中解析出报文 body 内容,其核心代码实现如下:
1. struct OpMsg { 2. ...... 3. //msg解析赋值见OpMsg::parse 4. //各种命令(insert update find等)都存放在该body中 5. BSONObj body; 6. //sequences用法暂时没看懂,感觉没什么用?先跳过 7. std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse 8. } 1. //从message中解析出OpMsg信息 2. OpMsg OpMsg::parse(const Message& message) try { 3. //message不能为空,并且opCode必须为dbMsg 4. invariant(!message.empty()); 5. invariant(message.operation() == dbMsg); 6. //获取flagBits 7. const uint32_t flags = OpMsg::flags(message); 8. //flagBits有效性检查,bit 0-15中只能对第0和第1位操作 9. uassert(ErrorCodes::IllegalOpMsgFlag, 10. str::stream() << "Message contains illegal flags value: Ob" 11. << std::bitset<32>(flags).to_string(), 12. !containsUnknownRequiredFlags(flags)); 13. 14. //校验码默认4字节 15. constexpr int kCrc32Size = 4; 16. //判断该mongo报文body内容是否启用了校验功能 17. const bool haveChecksum = flags & kChecksumPresent; 18. //如果有启用校验功能,则报文末尾4字节为校验码 19. const int checksumSize = haveChecksum ? kCrc32Size : 0; 20. //sections字段内容 21. BufReader sectionsBuf(message.singleData().data() + sizeof(flags), 22. message.dataSize() - sizeof(flags) - checksumSize); 23. 24. //默认先设置位false 25. bool haveBody = false; 26. OpMsg msg; 27. //解析sections对应命令请求数据 28. while (!sectionsBuf.atEof()) { 29. //BufReader::read读取kind内容,一个字节 30. const auto sectionKind = sectionsBuf.read<Section>(); 31. //kind为0对应命令请求body内容,内容通过bson报错 32. switch (sectionKind) { 33. //sections第一个字节是0说明是body 34. case Section::kBody: { 35. //默认只能有一个body 36. uassert(40430, "Multiple body sections in message", !haveBody); 37. haveBody = true; 38. //命令请求的bson信息保存在这里 39. msg.body = sectionsBuf.read<Validated<BSONObj>>(); 40. break; 41. } 42. 43. //DocSequence暂时没看明白,用到的地方很少,跳过,后续等 44. //该系列文章主流功能分析完成后,从头再回首分析 45. case Section::kDocSequence: { 46. ...... 47. } 48. } 49. } 50. //OP_MSG必须有body内容 51. uassert(40587, "OP_MSG messages must have a body", haveBody); 52. //body和sequence去重判断 53. for (const auto& docSeq : msg.sequences) { 54. ...... 55. } 56. return msg; 57. }
OpMsg 类被 OpMsgRequest 类继承, OpMsgRequest 类中核心接口就是解析出 OpMsg.body 中的库信息和表信息, OpMsgRequest 类代码实现如下:
1. //协议解析得时候会用到,见runCommands 2. struct OpMsgRequest : public OpMsg { 3. ...... 4. //构造初始化 5. explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {} 6. //opMsgRequestFromAnyProtocol->OpMsgRequest::parse 7. //从message中解析出OpMsg所需成员信息 8. static OpMsgRequest parse(const Message& message) { 9. //OpMsg::parse 10. return OpMsgRequest(OpMsg::parse(message)); 11. } 12. //根据db body extraFields填充OpMsgRequest 13. static OpMsgRequest fromDBAndBody(... { 14. OpMsgRequest request; 15. request.body = ([&] { 16. //填充request.body 17. ...... 18. }()); 19. return request; 20. } 21. //从body中获取db name 22. StringData getDatabase() const { 23. if (auto elem = body["$db"]) 24. return elem.checkAndGetStringData(); 25. uasserted(40571, "OP_MSG requests require a $db argument"); 26. } 27. //find insert 等命令信息 body中的第一个elem就是command 名 28. StringData getCommandName() const { 29. return body.firstElementFieldName(); 30. } 31. };
OpMsgRequest 通过 OpMsg::parse(message) 解析出OpMsg 信息,从而获取到 body 内容, GetCommandName() 接口和 getDatabase() 则分别从 body 中获取库 DB 信息、命令名信息。通过该类相关接口,命令名 (find 、 write 、 update 等 ) 和 DB 库都获取到了。
OpMsg 模块除了 OP_MSG 相关报文解析外,还负责 OP_MSG 报文组装填充,该模块接口功能大全如下表:
Mongod 实例服务入口类 ServiceEntryPointMongod 继承 ServiceEntryPointImpl 类, mongod 实例的报文解析处理、命令解析、命令执行都由该类负责处理。 ServiceEntryPointMongod 核心接口可以细分为: opCode 解析及回调处理、命令解析及查找、命令执行三个子模块。
OpCode 操作码解析及其回调处理由 ServiceEntryPointMongod::handleRequest (...) 接口实现,核心代码实现如下 :
1. //mongod服务对于客户端请求的处理 2. //通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage 3. DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) { 4. //获取opCode,3.6版本对应客户端默认使用OP_MSG 5. NetworkOp op = m.operation(); 6. ...... 7. //根据message构造DbMessage 8. DbMessage dbmsg(m); 9. //根据操作上下文获取对应的client 10. Client& c = *opCtx->getClient(); 11. ...... 12. //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息 13. const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; 14. const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); 15. .... 16. //CurOp::debug 初始化opDebug,慢日志相关记录 17. OpDebug& debug = currentOp.debug(); 18. //慢日志阀值 19. long long logThresholdMs = serverGlobalParams.slowMS; 20. //时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作 21. bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); 22. DbResponse dbresponse; 23. if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { 24. //新版本op=dbMsg,因此走这里 25. //从DB获取数据,获取到的数据通过dbresponse返回 26. dbresponse = runCommands(opCtx, m); 27. } else if (op == dbQuery) { 28. ...... 29. //早期mongodb版本查询走这里 30. dbresponse = receivedQuery(opCtx, nsString, c, m); 31. } else if (op == dbGetMore) { 32. //早期mongodb版本查询走这里 33. dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); 34. } else { 35. ...... 36. //早期版本增 删 改走这里处理 37. if (op == dbInsert) { 38. receivedInsert(opCtx, nsString, m); //插入操作入口 新版本CmdInsert::runImpl 39. } else if (op == dbUpdate) { 40. receivedUpdate(opCtx, nsString, m); //更新操作入口 41. } else if (op == dbDelete) { 42. receivedDelete(opCtx, nsString, m); //删除操作入口 43. } 44. } 45. //获取runCommands执行时间,也就是内部处理时间 46. debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()); 47. ...... 48. //慢日志记录 49. if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { 50. Locker::LockerInfo lockerInfo; 51. //OperationContext::lockState LockerImpl<>::getLockerInfo 52. opCtx->lockState()->getLockerInfo(&lockerInfo); 53. 54. //OpDebug::report 记录慢日志到日志文件 55. log() << debug.report(&c, currentOp, lockerInfo.stats); 56. } 57. //各种统计信息 58. recordCurOpMetrics(opCtx); 59. }
Mongod 的 handleRequest() 接口主要完成以下工作:
① 从Message 中获取 OpCode ,早期版本每个命令又对应取值,例如增删改查早期版本分别对应: dbInsert 、 dbDelete 、 dbUpdate 、 dbQuery ; Mongodb 3.6 开始,默认请求对应 OpCode 都是 OP_MSG ,本文默认只分析 OpCode=OP_MSG 相关的处理。
② 获取本操作对应的Client 客户端信息。
③ 如果是早期版本,通过Message 构造 DbMessage ,同时解析出库 . 表信息。
④ 根据不同OpCode 执行对应回调操作, OP_MSG 对应操作为 runCommands(...) ,获取的数据通过 dbresponse 返回。
⑤ 获取到db 层返回的数据后,进行慢日志判断,如果 db 层数据访问超过阀值,记录慢日志。
⑥ 设置debug 的各种统计信息。
从上面的分析可以看出,接口最后调用runCommands(...) ,该接口核心代码实现如下所示:
1. //message解析出对应command执行 2. DbResponse runCommands(OperationContext* opCtx, const Message& message) { 3. //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder 4. //应答数据通过该类构造 5. auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); 6. [&] { 7. OpMsgRequest request; 8. try { // Parse. 9. //协议解析 根据message获取对应OpMsgRequest 10. request = rpc::opMsgRequestFromAnyProtocol(message); 11. } 12. } 13. try { // Execute. 14. //opCtx初始化 15. curOpCommandSetup(opCtx, request); 16. //command初始化为Null 17. Command* c = nullptr; 18. //OpMsgRequest::getCommandName查找 19. if (!(c = Command::findCommand(request.getCommandName()))) { 20. //没有找到相应的command的后续异常处理 21. ...... 22. } 23. //执行command命令,获取到的数据通过replyBuilder.get()返回 24. execCommandDatabase(opCtx, c, request, replyBuilder.get()); 25. } 26. //OpMsgReplyBuilder::done对数据进行序列化操作 27. auto response = replyBuilder->done(); 28. //responseLength赋值 29. CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); 30. // 返回 31. return DbResponse{std::move(response)}; 32. }
RunCommands(...) 接口从 message 中解析出 OpMsg 信息,然后获取该 OpMsg 对应的 command 命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:
① 获取该OpCode 对应 replyBuilder , OP_MSG 操作对应 builder 为 OpMsgReplyBuilder 。
② 根据message 解析出 OpMsgRequest 数据, OpMsgRequest 来中包含了真正的命令请求 bson 信息。
③ opCtx 初始化操作。
④ 通过request.getCommandName() 返回命令信息 ( 如“ find ”、“ update ”等字符串 ) 。
⑤ 通过Command::findCommand(command name) 从 CommandMap 这个 map 表中查找是否支持该 command 命令。如果没找到说明不支持,如果找到说明支持。
⑥ 调用execCommandDatabase(...) 执行该命令,并获取命令的执行结果。
⑦ 根据command 执行结果构造 response 并返回
1. void execCommandDatabase(...) { 2. ...... 3. //获取dbname 4. const auto dbname = request.getDatabase().toString(); 5. ...... 6. //mab表存放从bson中解析出的elem信息 7. StringMap<int> topLevelFields; 8. //body elem解析 9. for (auto&& element : request.body) { 10. //获取bson中的elem信息 11. StringData fieldName = element.fieldNameStringData(); 12. //如果elem信息重复,则异常处理 13. ...... 14. } 15. //如果是help命令,则给出help提示 16. if (Command::isHelpRequest(helpField)) { 17. //给出help提示 18. Command::generateHelpResponse(opCtx, replyBuilder, *command); 19. return; 20. } 21. //权限认证检查,检查该命令执行权限 22. uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); 23. ...... 24. 25. //该命令执行次数统计 db.serverStatus().metrics.commands可以获取统计信息 26. command->incrementCommandsExecuted(); 27. //真正的命令执行在这里面 28. retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime); 29. //该命令执行失败次数统计 30. if (!retval) { 31. command->incrementCommandsFailed(); 32. } 33. ...... 34. }
execCommandDatabase(...) 最终调用RunCommandImpl(...) 进行对应命令的真正处理,该接口核心代码实现如下:
1. bool runCommandImpl(...) { 2. //获取命令请求内容body 3. BSONObj cmd = request.body; 4. //获取请求中的DB库信息 5. const std::string db = request.getDatabase().toString(); 6. //ReadConcern检查 7. Status rcStatus = waitForReadConcern( 8. opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd)); 9. //ReadConcern检查不通过,直接异常提示处理 10. if (!rcStatus.isOK()) { 11. //异常处理 12. return; 13. } 14. if (!command->supportsWriteConcern(cmd)) { 15. //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持 16. if (commandSpecifiesWriteConcern(cmd)) { 17. //异常处理"Command does not support writeConcern" 18. ...... 19. return result; 20. } 21. //调用Command::publicRun执行不同命令操作 22. result = command->publicRun(opCtx, request, inPlaceReplyBob); 23. } 24. //提取WriteConcernOptions信息 25. auto wcResult = extractWriteConcern(opCtx, cmd, db); 26. //提取异常,直接异常处理 27. if (!wcResult.isOK()) { 28. //异常处理 29. ...... 30. return result; 31. } 32. ...... 33. //执行对应的命令Command::publicRun,执行不同命令操作 34. result = command->publicRun(opCtx, request, inPlaceReplyBob); 35. ...... 36. }
RunCommandImpl(...) 接口最终调用该接口入参的 command ,执行 command->publicRun (...) 接口,也就是命令模块的公共 publicRun 。
Mongod 服务入口首先从 message 中解析出 opCode 操作码, 3.6 版本对应客户端默认操作码为 OP_MSQ ,解析出该操作对应 OpMsgRequest 信息。然后从 message 原始数据中解析出 command 命令字符串后,继续通过全局 Map 表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。
mongos 服务入口核心代码实现过程和 mongod 服务入口代码实现流程几乎相同, mongos 实例 message 解析、 OP_MSG 操作码处理、 command 命令查找等流程和上一章节 mongod 实例处理过程类似,本章节不在详细分析。 Mongos 实例服务入口处理调用流程如下:
ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)
最后的接口核心代码实现如下:
1. void runCommand(...) { 2. ...... 3. //获取请求命令name 4. auto const commandName = request.getCommandName(); 5. //从全局map表中查找 6. auto const command = Command::findCommand(commandName); 7. //没有对应的command存在,抛异常说明不支持该命令 8. if (!command) { 9. ...... 10. return; 11. } 12. ...... 13. //执行命令 14. execCommandClient(opCtx, command, request, builder); 15. ...... 16. } 17. 18. void execCommandClient(...) 19. { 20. ...... 21. //认证检查,是否有操作该command命令的权限,没有则异常提示 22. Status status = Command::checkAuthorization(c, opCtx, request); 23. if (!status.isOK()) { 24. Command::appendCommandStatus(result, status); 25. return; 26. } 27. //该命令的执行次数自增,代理上面也是要计数的 28. c->incrementCommandsExecuted(); 29. //如果需要command统计,则加1 30. if (c->shouldAffectCommandCounter()) { 31. globalOpCounters.gotCommand(); 32. } 33. ...... 34. //有部分命令不支持writeconcern配置,报错 35. bool supportsWriteConcern = c->supportsWriteConcern(request.body); 36. //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern" 37. if (!supportsWriteConcern && !wcResult.getValue().usedDefault) { 38. ...... 39. return; 40. } 41. //执行本命令对应的公共publicRun接口,Command::publicRun 42. ok = c->publicRun(opCtx, request, result); 43. ...... 44. }
l Tips: mongos 和 mongod 实例服务入口核心代码实现的一点小区别
① Mongod 实例 opCode 操作码解析、 OpMsg 解析、 command 查找及对应命令调用处理都由 class ServiceEntryPointMongod{...} 类一起完成。
② mongos 实例则把 opCode 操作码解析交由 class ServiceEntryPointMongos{...} 类实现, OpMsg 解析、 command 查找及对应命令调用处理放到了 clase Strategy{...} 类来处理。
Mongodb 报文解析及组装流程总结
① 一个完整mongodb 报文由通用报文 header 头部 +body 部分组成。
② Body 部分内容,根据报文头部的 opCode 来决定不同的body 内容。
③ 3.6 版本对应客户端请求 opCode 默认为 OP_MSG ,该操作码对应 body 部分由 flagBits + sections + checksum 组成,其中 sections 中存放的是真正的命令请求信息,已 bson 数据格式保存。
④ Header 头部和 body 报文体封装及解析过程由 class Message {...} 类实现
⑤ Body 中对应 command 命令名、库名、表名的解析在 mongodb(version<3.6) 低版本协议中由 class DbMessage {...} 类实现
⑥ Body 中对应 command 命令名、库名、表名的解析在 mongodb(version<3.6) 低版本协议中由 struct OpMsgRequest{...} 结构和 struct OpMsg {...} 类实现
Mongos 和 mongod 实例的服务入口处理流程大同小异,整体处理流程如下:
① 从message 解析出 opCode 操作码,根据不同操作码执行对应操作码回调。
② 根据message 解析出 OpMsg request 信息, mongodb 报文的命令信息就存储在该 body 中,该 body 已 bson 格式存储。
③ 从body 中解析出 command 命令字符串信息 ( 如“ insert ”、“ update ”等 ) 。
④ 从全局_commands map 表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。
⑤ 最终找到对应command 命令后,执行 command 的功能 run 接口。
图形化总结如下:
说明: 第3 章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析 command 命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。
Tips: 下期继续分享不同command 命令执行细节。
第1 章节中的统计信息,将在 command 模块核心代码分析完毕后揭晓答案,《 mongodb command 命令处理模块源码实现二》中继续分析,敬请关注。
来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69984922/viewspace-2732975/,如需转载,请注明出处,否则将追究法律责任。