ITPub博客

首页 > 区块链 > 区块链 > 【刘文彬】 Debug EOS:nodeos + mongodbplugin

【刘文彬】 Debug EOS:nodeos + mongodbplugin

区块链 作者:圆方圆区块链 时间:2018-12-12 20:34:29 0 删除 编辑


原文链接:醒者呆的博客园,https://www.cnblogs.com/Evsward/p/storage.html

上文书说到区块链的存储方式,并结合了EOSIO进行分析,其中也提到了使用CLion调试EOS的方法。本文将继续深入细致地展开对加载了mongo db plugin的nodeos的调试过程以及心得。

关键字:源码分析,Debug EOS,nodeos,mongo db plugin,CLion,C++,boost::asio::signal_set,queue

本文涉及的环境:clang-6.0, clang++-6.0, GDB Debugger, make 4.1, mongodb-linux-x86_64-3.6.3, boost 1.67.0

调试EOS: nodeos

关于EOS的调试环境的搭建这里不再赘述了,下文开始针对nodeos程序进行调试。

(一)CMakeList.txt

nodeos开始运行前,要先使用项目的总CmakeList.txt配置,这里我配置了boost库的位置,如果你配置了boost的环境变量可以跳过这里。

 set( BOOST_ROOT "/home/evsward/opt/boost")
  • 这个文件中有很多的set语句,这些语句都是开关,或者路径,或者全局变量,是配置各个子CMakeList.txt而用的。

  • include 语句是为runtime引入相关依赖库。

  • add_subdirectory语句设置了子目录程序。

  • install语句是将相关命令安装到指定位置以供runtime后续使用。

总的CMakeList文件介绍完了,下面会执行到nodeos目录下的CMakeList.txt文件:

  • add_executable( nodeos main.cpp )语句设定了nodeos程序执行入口。

  • set, configure_file, include, install 等都是为runtime准备的环境相关的。

  • 重点语句target link libraries,这里定义了链runtime环境需要启动的plugin。(注意记住这个顺序)

(二)static register plugin

我们打开每一个plugin的cpp文件,会发现有一个static的register方法的调用。这里会首先执行按上面plugin定义的顺序中第一个login_plugin,它的static语句如下:

static appbase::abstract_plugin& _login_plugin = app().register_plugin<login_plugin>();

执行此语句时,会先执行app(),这是application单例方法。

(三)application

  • application是nodeos的执行者,上面调用的app函数:

application& application::instance() {
   static application _app;
   return _app;
}
application& app() { return application::instance(); }


application与plugin拥有相同的实现函数,而由于它作为执行者、统筹者的存在,它会安排调用所有plugin,例如set_program_options。
  • 执行app()以后获取到了application的实例,然后调用了register plugin函数,通过模板类(泛型类)携带了login plugin的类型。register_plugin函数是模板函数,定义在application.hpp文件中。

  • application.hpp 中定义了私有的内存变量

    map<string, std::unique_ptr<abstract_plugin>> plugins;
    
  • abstract_plugin是所有plugin的基类,它定义了虚函数,需要继承它的子类去实现。他们与application的关系是:

    abstract_plugin=>plugin(对基类的虚函数进一步使用,由application定义管理)=>各个plugin
    
 template<typename Plugin>
auto& registerplugin() {
    auto existing = findplugin<Plugin>(); // 从plugins寻找该plugin是否已注册。
    if(existing)
       return *existing; // 如果已注册了直接返回plugin的实例
auto plug = new Plugin(); // 创建该未注册plugin的实例
plugins[plug->name()].reset(plug); // 先插入到上面定义的内存变量plugins
plug->register_dependencies();// 注册该plugin的依赖plugins,每个plugin内部都会调用APPBASE_PLUGIN_REQUIRES((chain_plugin))来注册自己依赖的别的plugin。
return *plug; // 返回plugin的实例
}

(四)main.cpp->main()

在编译runtime环境结束以后,进入入口函数main(),

int main(int argc, char** argv)

main函数的参数就是调用命令nodeos的通过--加入的参数,我们可以通过nodeos的Edit Configuration来调整。其中argc是个数,argv是参数的值,是一个数组类型。如下图:



我们接着来看main函数,它的函数体是通过app()对application单例进行的设置,包括版本号、data路径、config路径,然后是对于application实例内部方法的调用:

  • initialize<chain plugin, http plugin, net plugin, producer plugin>

  • startup()

  • exec()

main函数执行了内部函数initialize_logging()还通过ilog打印了日志,输出了版本号以及eosio root路径地址。

由于main函数是入口函数,上面也介绍了它主要是对application实例的使用,以及一些异常处理等,接下来会逐一进行debug跟踪分析。

(五)initialize plugin

这个初始化函数是一个模板函数,模板类参数是plugin基类,在main函数调用该函数时传入了基本的插件依赖(这些是不需要我们在config中配置的,是链启动的基础插件):chain plugin, http plugin, net plugin, producer plugin。下面来看initialize函数在application头文件中的声明:

/**
  * @brief 查看 --plugin(存在于命令行或者config配置文件中)调用这些plugin的 initialize方法。
  * 
  * @tparam Plugin plugin的列表用来初始化,即使在config中没有配置的但被其他plugin所依赖的plugin,都可以统一使用该模板类没有影响。
  * @return true:plugin初始化完成,false:出现异常。
*/
template<typename... Plugin>
bool initialize(int argc, char** argv) {
    return initialize_impl(argc, argv, {find_plugin<Plugin>()...}); // ...是可变参数的语法,上面通过main函数的调用,我们传入了多个plugin。
}


实现类initialize_impl的内容较多,不粘贴源码,直接总结一下:

(1)set program options()函数

application.cpp文件中的set_program_options()函数是用来生成初始化的config.ini文件内容以及nodeos命令行--help的输出内容。

该函数首先遍历插件列表,调用每个插件都会实现的plugin基类的虚函数set program options(options description& cli, options description& cfg),例如下面就是mongo db plugin的实现:

void mongo_db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
   cfg.add_options()
         ("mongodb-queue-size,q", bpo::value<uint32_t>()->default_value(256),
         "The target queue size between nodeos and MongoDB plugin thread.")
         ("mongodb-wipe", bpo::bool_switch()->default_value(false),
         "Required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks to wipe mongo db."
         "This option required to prevent accidental wipe of mongo db.")
         ("mongodb-block-start", bpo::value<uint32_t>()->default_value(0),
         "If specified then only abi data pushed to mongodb until specified block is reached.")
         ("mongodb-uri,m", bpo::value<std::string>(),
         "MongoDB URI connection string, see: 
               " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI."
               " Example: mongodb://127.0.0.1:27017/EOS")
         ;
}


通过调用mongo db plugin的这个方法,就可以拼凑到config.ini文件中关于mongo db plugin的部分,因为这个插件只有对于config.ini配置文件的配置,没有对于命令行的内容,我们可以去查看chain_plugin的实现,它会同时有配置文件和命令行两个方面的内容设置,源码较长请自行查看。

配置的对象options_description_easy_init是一个灵活的结构。可以表示:一个配置项,一个配置的值;一个配置项,一个配置的值,一个注释或者描述;一个配置项,一个注释或者描述。这些多种组合,我们也可以直接去查看自己的config.ini的每一个配置项去对应。
那么是如何拼凑所有的插件配置内容呢?

application.cpp文件中的set program options()函数的函数体中使用了application的两个类变量来存储这些参数:

  • app options:用于接收来自于命令行和config.ini两个参数来源的参数。

  • cfg options:仅存储来自于config.ini配置文件的参数。

插件遍历结束后,我们已经有了所有插件的config.ini配置内容以及命令行提示配置内容,下面要从宏观角度去配置一些属于application的配置项,config.ini中加入了plugins的配置,通过这个配置我们可以手动指定要启动的插件,例如mongo db plugin就是需要手动配置的。接着,要配置命令行的参数内容,包括help, version, print-default-config, data-dir, config-dir, config, logconf。将他们追加存储到上面那两个类变量中。

到这里,application.cpp文件中的set program options()函数的工作就完成了。

上面提到的_app_options和_cfg_options仍就是傻傻分不清楚,他们的用意到底是什么呢?

简单来理解就是,命令行能够做所有配置文件config.ini中的配置的工作,同时命令行还有专属于自己的help, version, print-default-config, data-dir, config-dir, config, logconf配置。这样就好理解了,config.ini是命令行配置的子集,命令行配置是全集。

(2)app全局参数的检测与合并

我们回到initialize impl,目前我们已经拥有了两套默认配置参数,这里直接使用全集 app options配置,我们先接收来自于命令行的参数,将以它为优先级高的方式与 app_options配置合并:

bpo::variables_map options;
bpo::store(bpo::parse_command_line(argc, argv, my->_app_options), options);


(3)app全局参数配置项生效与响应

拿到合并后的配置对象options,依次针对配置项的内容进行响应:

  • help:直接输出 app options配置项的全部内容。

  • version:输出application实例的类成员_version的值。

  • print-default-config:与 app options无关,重新去每个plugin找配置,然后会基于 cfg options生成一份默认的config配置打印到终端界面。

  • data-dir:是设置data目录的命令保存至application的类成员 data dir,没有响应的输出。

  • config-dir:设置config路径,保存在类成员 config dir中。config和data的路径结构如下:

evsward@evsward-TM1701:~/.local/share/eosio/nodeos$ tree
.
├── config
│   └── config.ini
└── data
├── blocks
│   ├── blocks.index
│   ├── blocks.log
│   └── reversible
│       ├── sharedmemory.bin
│       └── sharedmemory.meta
└── state
    ├── forkdb.dat
    ├── sharedmemory.bin
    └── sharedmemory.meta
5 directories, 8 files- logconf:默认是logging.json,放置在config目录下面,可自定义设置,保存在类成员_logging_conf中。
- config:指定配置文件的名字,默认是config.ini。如果发现在config目录下找到config.ini文件,则按照该文件的配置载入。
bpo::store(bpo::parseconfigfile<char>(configfilename.makepreferred().string().cstr(),
                                           my->cfgoptions, true), options);

得到整合好本地config.ini文本配置的options对象。然后对其参数配置项进行设置。

  • plugin:读取配置文件中的plugin配置(多条),对于每一个plugin,要重新调用各自的initialize方法去按照新的配置初始化。

  • autostart plugins:设置前面的初始化插件chain plugin, http plugin, net plugin, producer_plugin,同样分别调用他们的初始化函数设置新的配置。

(4)plugin initialize

承接上文,所有相关的plugin的执行各自的initialize。这个initialize函数是abstract_plugin的虚函数,而该虚函数被plugin类所复写:

virtual void initialize(const variables_map& options) override {
    if(_state == registered) {//如果注册过
       _state = initialized;
       static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });// 先执行依赖plugin的initialize方法。
       static_cast<Impl*>(this)->plugin_initialize(options);// 调用自身的plugin_initialize方法实现。
       //ilog( "initializing plugin ${name}", ("name",name()) );
       app().plugin_initialized(*this);// 保存到一个initialized_plugins类成员变量中,用来按顺序记录已经开始启动运行的plugin。
    }
    assert(_state == initialized); /// 如果插件未注册,则不能执行initialize方法。
}


所以在plugin调用initialize函数的时候,是先执行的以上复写的plugin的虚函数。我们这里先设定几个要跟踪的plugin为目标吧,否则plugin太多,望山止步。

目标:主要研究mongo_db_plugin,以及基础plugin(chain_plugin, http_plugin, net_plugin, producer_plugin),路线是研究主plugin,若有额外的依赖plugin,看情况控制研究的深浅程度。

(5)eosio::mongo db plugin::plugin_initialize

前面写set program options()提到了mongo db plugin,这里研究它的plugin_initialize方法。传入的参数是结合了命令行以及本地config文件的合并配置项,按照此配置环境。

void mongodbplugin::plugininitialize(const variablesmap& options)
{
   try {
      if( options.count( "mongodb-uri" )) {//查mongodb-uri的配置,config.ini中有对应的。
         ilog( "initializing mongodbplugin" );
         my->configured = true;//设置标志位:已配置
     if( options.at( "replay-blockchain" ).as<bool>() || options.at( "hard-replay-blockchain" ).as<bool>() || options.at( "delete-all-blocks" ).as<bool>() ) {//捕捉是否有replay-blockchain、hard-replay-blockchain、delete-all-blocks三个动作,有的话要判断是否要擦出mongo历史数据。
        if( options.at( "mongodb-wipe" ).as<bool>()) {//检查擦除项mongodb-wipe的配置
           ilog( "Wiping mongo database on startup" );
           my->wipe_database_on_startup = true;//如果设置擦除,这里设置本地标志位wipe_database_on_startup
        } else if( options.count( "mongodb-block-start" ) == 0 ) {//如果设置是从0开始,检查是否要全部擦除历史数据。
           EOS_ASSERT( false, chain::plugin_config_exception, "--mongodb-wipe required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks"
                             " --mongodb-wipe will remove all EOS collections from mongodb." );
        }
     }
     if( options.count( "abi-serializer-max-time-ms") == 0 ) {//eosio::chain_plugin的配置
        EOS_ASSERT(false, chain::plugin_config_exception, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks");
     }
     my->abi_serializer_max_time = app().get_plugin<chain_plugin>().get_abi_serializer_max_time();
     if( options.count( "mongodb-queue-size" )) {// queue大小
        my->queue_size = options.at( "mongodb-queue-size" ).as<uint32_t>();
     }
     if( options.count( "mongodb-block-start" )) {// mongo对应的开始区块号
        my->start_block_num = options.at( "mongodb-block-start" ).as<uint32_t>();
     }
     if( my->start_block_num == 0 ) {
        my->start_block_reached = true;
     }
     std::string uri_str = options.at( "mongodb-uri" ).as<std::string>();
     ilog( "connecting to ${u}", ("u", uri_str));
     mongocxx::uri uri = mongocxx::uri{uri_str};
     my->db_name = uri.database();
     if( my->db_name.empty())
        my->db_name = "EOS";// 默认起的库名字为EOS,如果在mongodb-uri有配置的话就使用配置的名字。
     my->mongo_conn = mongocxx::client{uri};// 客户端连接到mongod
     // controller中拉取得信号,在init函数中注册信号机制,始终监听链上信号,作出反应。
     chain_plugin* chain_plug = app().find_plugin<chain_plugin>();//检查chain_plugin是否加载,chain_plugin是必要依赖,下面我们要使用chain的数据。
     EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, ""  );
     auto& chain = chain_plug->chain();// 获得chain实例
     my->chain_id.emplace( chain.get_chain_id());
     // accepted_block_connection对应了chain的signal,是boost提供的一种信号槽机制,这种connection对象有四个,见当前源码的下方展示。
     my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) {// 建立connect,每当chain有accepted_block信号(这些信号是定义在controller.hpp中,稍后会介绍),即调用下面的函数。
        my->accepted_block( bs );// accepted_block认同block信息
     } ));
     my->irreversible_block_connection.emplace(//含义同上
           chain.irreversible_block.connect( [&]( const chain::block_state_ptr& bs ) {
              my->applied_irreversible_block( bs );// applied_irreversible_block,应用不可逆区块
           } ));
     my->accepted_transaction_connection.emplace(//含义同上
           chain.accepted_transaction.connect( [&]( const chain::transaction_metadata_ptr& t ) {
              my->accepted_transaction( t );// accepted_transaction认同交易
           } ));
     my->applied_transaction_connection.emplace(//含义同上
           chain.applied_transaction.connect( [&]( const chain::transaction_trace_ptr& t ) {
              my->applied_transaction( t );// applied_transaction,应用交易
           } ));
     if( my->wipe_database_on_startup ) {
        my->wipe_database();// 擦除mongo历史数据
     }
     my->init();//初始化函数
  } else {
     wlog( "eosio::mongo_db_plugin configured, but no --mongodb-uri specified." );
     wlog( "mongo_db_plugin disabled." );
  }
} FCLOGAND_RETHROW()
}

四个connection对象的声明如下:

fc::optional<boost::signals2::scoped_connection> accepted_block_connection; fc::optional<boost::signals2::scoped_connection> irreversible_block_connection; fc::optional<boost::signals2::scoped_connection> accepted_transaction_connection; fc::optional<boost::signals2::scoped_connection> applied_transaction_connection;


queue

这段代码中涉及到四个函数分别是accepted block,applied irreversible block,accepted transaction,applied transaction,他们都对应着对queue的操作,mongo db plugin impl类成员定义了一下几种queue:

std::deque<chain::transaction_metadata_ptr> transaction_metadata_queue; std::deque<chain::transaction_metadata_ptr> transaction_metadata_process_queue; std::deque<chain::transaction_trace_ptr> transaction_trace_queue; std::deque<chain::transaction_trace_ptr> transaction_trace_process_queue; std::deque<chain::block_state_ptr> block_state_queue; std::deque<chain::block_state_ptr> block_state_process_queue; std::deque<chain::block_state_ptr> irreversible_block_state_queue; std::deque<chain::block_state_ptr> irreversible_block_state_process_queue;

queue是mongo db plugin自己定义的:

/**
 * 模板类Queue,可以匹配以上我们定义的多个queue类型。
 * 模板类Entry,可以匹配block_state_ptr以及transaction_trace_ptr作为被存储实体类型。
 */
template<typename Queue, typename Entry>
void queue(boost::mutex& mtx, boost::condition_variable& condition, Queue& queue, const Entry& e, size_t queue_size) {
   int sleep_time = 100;//默认线程睡眠时间
   size_t last_queue_size = 0;
   boost::mutex::scoped_lock lock(mtx);//mutex锁机制
   if (queue.size() > queue_size) {//如果超过了我们设定的queue大小,则采取如下措施。
      lock.unlock();//先解锁
      condition.notify_one();// 见下文对condition的介绍
      if (last_queue_size < queue.size()) {//说明queue的增加速度大于我们程序消费处理的速度
         sleep_time += 100;//增加睡眠时间
      } else {
         sleep_time -= 100;//说明queue的增加速度小于我们消费的速度,就要减少睡眠时间,尽快更新last_queue_size的值。
         if (sleep_time < 0) sleep_time = 100;
      }
      last_queue_size = queue.size();
      boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));//线程睡眠,睡眠的时间按照上面的机制定夺。
      lock.lock();//上锁
   }
   queue.emplace_back(e);//生效部分:插入到队列中去。
   lock.unlock();//解锁
   condition.notify_one();
}


mongo db plugin impl::wipe database()

真正执行擦除mongo历史数据的函数,这个动作是由我们配置mongodb-wipe参数来指定。擦除的函数体如下:

void mongodbpluginimpl::wipedatabase() {
   ilog("mongo db wipe_database");
// 定义的六张mongo的表类型,通过客户端连接获取到六张表的权限。
   auto blockstates = mongoconn[db_name][block_states_col];
   auto blocks = mongo_conn[db_name][blocks_col];
   auto trans = mongo_conn[db_name][trans_col];
   auto transtraces = mongoconn[db_name][trans_traces_col];
   auto actions = mongo_conn[db_name][actions_col];
   accounts = mongo_conn[db_name][accounts_col];
// 分别删除,执行drop动作。
   blockstates.drop();
   blocks.drop();
   trans.drop();
   transtraces.drop();
   actions.drop();
   accounts.drop();
}

mongo db plugin_impl::init()

源码较多不粘贴,上面wipe_database函数,我们删除了六张表,在init函数中,我们要对应的建立这六张表。表名初始化:

const std::string mongo_db_plugin_impl::block_states_col = "block_states";
const std::string mongo_db_plugin_impl::blocks_col = "blocks";
const std::string mongo_db_plugin_impl::trans_col = "transactions";
const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces";
const std::string mongo_db_plugin_impl::actions_col = "actions";
const std::string mongo_db_plugin_impl::accounts_col = "accounts";


这就是刘张表对应的名字。这六张表在初始化建立时是一个整体操作,也就是说互为依赖关系,accounts表先创建,通过

accounts = mongo_conn[db_name][accounts_col];

即可创建成功accounts表,其他表亦然,后面不赘述。表数据是由make_document进行组装的。首先我们向accounts表插入一条数据,结构是name为eosio,createAt是当前时间。

  • chain::config::system account name ).to_string()

  • std::chrono::duration cast<std::chrono::milliseconds>(std::chrono::microseconds{fc::time point::now().time since epoch().count()});

通过insert_one方法将该条数据插入accounts表中。

接下来通过create index方法对五张表建立索引,注意transaction traces是没有索引的,init操作时不涉及transaction traces表。

auto blocks = mongoconn[db_name][blocks_col]; // Blocks
blocks.createindex( bsoncxx::fromjson( R"xxx({ "blocknum" : 1 })xxx" ));
blocks.createindex( bsoncxx::fromjson( R"xxx({ "blockid" : 1 })xxx" ));// block建立了两个索引
auto blockstats = mongoconn[db_name][block_states_col];
blockstats.createindex( bsoncxx::fromjson( R"xxx({ "blocknum" : 1 })xxx" ));
blockstats.createindex( bsoncxx::fromjson( R"xxx({ "blockid" : 1 })xxx" ));// block_stats建立了两个索引
// accounts indexes
accounts.createindex( bsoncxx::fromjson( R"xxx({ "name" : 1 })xxx" ));
// transactions indexes
auto trans = mongo_conn[db_name][trans_col]; // Transactions
trans.createindex( bsoncxx::fromjson( R"xxx({ "trx_id" : 1 })xxx" ));
auto actions = mongo_conn[db_name][actions_col];
actions.createindex( bsoncxx::fromjson( R"xxx({ "trx_id" : 1 })xxx" ));

初始化准备就完成了,接下来要建立线程监听出块消息,同步到mongo数据库中来。

ilog("starting db plugin thread");
consumethread = boost::thread([this] { consumeblocks(); });
startup = false;// 结束,调用析构函数,关闭mongodbplugin:设定标志位done = true;

(6)mongo db plugin impl::consume blocks()

上面init函数执行到最后时,开启了一个线程,执行的是consume blocks()函数,如字面含义这是消费区块的函数。这个函数是一个无限循环,保持线程的存活,监听queue的数据随时消费同步到mongo数据库中去,而queue的数据的是由上面plugin initialize函数中的connect信号槽机制连接chain的出块信号往queue里面插入/同步链上数据。

condition

无线循环第一部分就是对condition.wait(lock)的操作,condition在上面queue的源码中有一个notify_one()的调用,实际上就是与wait相互应的操作。

boost::mutex::scoped_lock lock(mtx);
while ( transaction_metadata_queue.empty() &&
         transaction_trace_queue.empty() &&
         block_state_queue.empty() &&
         irreversible_block_state_queue.empty() &&
         !done ) {
    condition.wait(lock);
}


消费区块占用了一个线程,这个线程在上面四个queue是空的时候并且done也没有完成是flase的时候,该线程会通过condition来阻塞线程,参数是mutex的一个锁。

condition.notify one()会重新唤起这个阻塞的线程,而在mongo db plugin中,condition.notify one()出现了3次:

  • queue模板类型,有了新的数据插入的时候。

  • 当queue模板类型的队列超过设置值的时候,要主动唤起consume_block开启消费线程加速消费(上面介绍queue的时候也谈到了队列大小超限时会增加queue插入的睡眠等待时间,这两方面相当于针对中间队列对两边进行开源节流,从而控制了队列的大小)

  • ~mongo db plugin_impl()析构函数中

mongodbpluginimpl::~mongodbpluginimpl() {
   if (!startup) {//标志位,上面init函数结尾有这个值的赋值。
      try {
         ilog( "mongodbplugin shutdown in process please be patient this can take a few minutes" );
         done = true;//设定标志位done,consumeblock()会使用到。
         condition.notifyone();// 唤醒consume_thread线程继续消费掉queue中残余数据。
     consume_thread.join();// 挂起主线程,等待consume_thread线程执行完毕再唤起主线程。
  } catch( std::exception& e ) {
     elog( "Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what()));
  }
}
}

process_queue准备

我们要将链上的数据同步至mongo,是通过上面判断是否为空的那四个queue来做,为了增加消费效率,进入consume block函数以后,要先将数据move导入到一个process queue中来慢慢处理,相当于一个中转站。

size_t transaction_metadata_size = transaction_metadata_queue.size();
if (transaction_metadata_size > 0) {
    transaction_metadata_process_queue = move(transaction_metadata_queue);
    transaction_metadata_queue.clear();
}
size_t transaction_trace_size = transaction_trace_queue.size();
if (transaction_trace_size > 0) {
    transaction_trace_process_queue = move(transaction_trace_queue);
    transaction_trace_queue.clear();
}
size_t block_state_size = block_state_queue.size();
if (block_state_size > 0) {
    block_state_process_queue = move(block_state_queue);
    block_state_queue.clear();
}
size_t irreversible_block_size = irreversible_block_state_queue.size();
if (irreversible_block_size > 0) {
    irreversible_block_state_process_queue = move(irreversible_block_state_queue);
    irreversible_block_state_queue.clear();
}


队列大小报警器

接下来是一个针对四个源队列的大小进行一个监控,当任意超过限额的75%时,会触发报警,打印到控制台。

分发到具体执行函数消费队列

接下来,就是将上面的四个中转的process queue的数据分别分发到不同的函数(对应下面四个 process函数)中去消费处理。最后每个中转队列处理一条,就pop出去一条,都处理结束以后,会再次判断四个源队列的大小是否为空,都消费完了,同时也得有析构函数的done标志位为true,才会中断consume thread线程的consume block()的无线循环。

1. mongo db plugin impl:: process accepted transaction() 执行接收交易, 需要start block reached标识位为true。源码较长不粘贴,语言介绍一下,该函数的主要工作是获得mongo的连接以及库表对象,同时解析传入的const chain::transaction metadata ptr& t 对象,该对象的路线是:

chain=>signal=>mongo_db_plugin connect signal=>queue=>process_queue=>遍历出一条数据即是t

获得这个对象以后,也准备好了mongo数据库的连接库表对象,剩下的工作就是从t解析导入mongo库表了。

mongo作为列存储的nosql文件数据库,这里只接收document类型

这里创建了一个它的对象act_doc,解析过程:

  • 链数据对象的解析

const auto trx_id = t->id;
const auto trx_id_str = trx_id.str();
const auto& trx = t->trx;
const chain::transaction_header& trx_header = trx;


  • mongo数据库存储结构的定义,值数据的传入,通过process_action函数进行处理,

act_doc.append( kvp( "action_num", b_int32{act_num} ), kvp( "trx_id", trx_id_str ));
act_doc.append( kvp( "cfa", b_bool{cfa} ));
act_doc.append( kvp( "account", act.account.to_string()));
act_doc.append( kvp( "name", act.name.to_string()));
act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) {
    for( const auto& auth : act.authorization ) {
        subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) {
          subdoc.append( kvp( "actor", auth.actor.to_string()),
                 kvp( "permission", auth.permission.to_string()));
        } );
    }
} ));


process action函数处理的是action数据的匹配,而如果action涉及到新账户的创建,这部分要在process action函数中继续通过update account函数进行处理。update account函数只会过滤由system合约执行的newaccount动作,system合约默认是由chain::config::system account name(就是eosio)来创建的。所以过滤后的action的结构如下:

|field | value

|account | eosio

|name |newaccount



然后会同步在mongo的accounts表中添加一条记录,要有当时的添加时间createdAt。添加之前,要根据这个用户名去mongo中查找,通过函数find account,如果查找到了则update,未查到就insert。

auto find_account(mongocxx::collection& accounts, const account_name& name) {
    using bsoncxx::builder::basic::make_document;
    using bsoncxx::builder::basic::kvp;
    return accounts.find_one( make_document( kvp( "name", name.to_string())));
}

接着,是transaction表的数据插入,这个工作是对trans doc文本类型变量的设置:

  • trx_id设置

  • irreversible设置

  • transaction_header设置

  • signing_keys设置

  • actions设置:遍历源trx的actions,每一项去调用上面定义的process action函数执行action数据的处理发到action array变量中,赋给actions。

  • context free actions,与action的处理过程差不多。

  • transaction_extensions设置

  • signatures

  • context free data

  • createdAt

整合完毕,将trans doc插入到transaction表中去。整个 process accepted transaction执行完毕,其中涉及到了transaction, action, accounts三张表的内容的增加与修改。

2. mongo db plugin impl:: process applied transaction 执行应用交易,需要start block reached标识位为true。这个函数是对mongo中transaction traces表的操作。同样的,是通过一个文本类型变量trans traces doc操作。这个函数的参数传入是transaction trace ptr类型的对象t(对应的上面 process accepted transaction接收的是transaction metadata ptr类型的)

abi_serializer::to_variant, 转化成abi格式的json数据。
abi_serializer::from_variant, 通过abi格式的json数据转换出来对应的对象数据。

3. mongo db plugin impl:: process accepted block

这里先要从process_accepted_block函数进入,上面的下划线_开头的函数都是又没有下划线的相同名字的函数调用的,只是他们除了调用以外都是一些异常的处理,日志的输出工作。而process_accepted_block函数有了简单的逻辑,就是根据标志位start_block_reached作出了处理。前面我们介绍plugin_initialize函数的时候,通过配置文件的配置项"mongodb-block-start",我们设定了全局变量start_block_num作为标志位。这里面就是对于这个参数值的一个判断,如果达到了这个设定的起始区块,则设定全局变量标志位start_block_reached为true,那么就可以进入到_process_accepted_block函数进行处理了。

这个函数是 接收区块处理 。传入的参数为 block state ptr 类型的对象bs。它的路线与上面介绍过的其他函数的参数t是相同的,只是类结构不同,存的数据不同。该函数涉及到mongo的两张表,一个是block_states,另一个是blocks。我们分别来研究。

  • block state doc

 mongocxx::options::update updateopts{};
   updateopts.upsert( true );// upsert模式为true,代表update操作如果未找到对象则新增一条数据。
auto blockstates = mongoconn[db_name][block_states_col];
   auto blockstatedoc = bsoncxx::builder::basic::document{};
   // 数据结构映射
   blockstatedoc.append(kvp( "blocknum", bint32{staticcast<int32t>(blocknum)} ),
                          kvp( "blockid", blockidstr ),
                          kvp( "validated", bbool{bs->validated} ),
                          kvp( "incurrentchain", bbool{bs->incurrentchain} ));
auto json = fc::json::tostring( bhs );
   try {
      const auto& value = bsoncxx::fromjson( json );
      blockstatedoc.append( kvp( "blockheaderstate", value ));// 追加blockheaderstate的值
   } catch( bsoncxx::exception& ) {
      try {
         json = fc::pruneinvalidutf8(json);
         const auto& value = bsoncxx::fromjson( json );
         blockstatedoc.append( kvp( "blockheaderstate", value ));
         blockstatedoc.append( kvp( "non-utf8-purged", bbool{true}));
      } catch( bsoncxx::exception& e ) {
         elog( "Unable to convert blockheaderstate JSON to MongoDB JSON: ${e}", ("e", e.what()));
         elog( "  JSON: ${j}", ("j", json));
      }
   }
   blockstatedoc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值
try {
      // updateone, 没有查询到相关数据则直接新增一条
      if( !blockstates.updateone( makedocument( kvp( "blockid", blockidstr )),
                                    makedocument( kvp( "$set", blockstatedoc.view())), updateopts )) {
         EOSASSERT( false, chain::mongodbinsertfail, "Failed to insert blockstate ${bid}", ("bid", blockid));
      }
   } catch(...) {
      handlemongoexception("blockstates insert: " + json, LINE);
   }
  • block_doc

 auto blocks = mongo_conn[db_name][blocks_col];
   auto blockdoc = bsoncxx::builder::basic::document{};
   // 数据结构映射
   blockdoc.append(kvp( "blocknum", bint32{staticcast<int32t>(blocknum)} ),
                    kvp( "blockid", blockidstr ),
                    kvp( "irreversible", b_bool{false} ));
auto v = tovariantwithabi( *bs->block, accounts, abiserializermaxtime );// 转化为abi格式的数据存储。
   json = fc::json::tostring( v );
   try {
      const auto& value = bsoncxx::fromjson( json );
      blockdoc.append( kvp( "block", value ));// 追加block的值,为json
   } catch( bsoncxx::exception& ) {
      try {
         json = fc::pruneinvalidutf8(json);
         const auto& value = bsoncxx::fromjson( json );
         blockdoc.append( kvp( "block", value ));
         blockdoc.append( kvp( "non-utf8-purged", bbool{true}));
      } catch( bsoncxx::exception& e ) {
         elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()));
         elog( "  JSON: ${j}", ("j", json));
      }
   }
   blockdoc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值
try {
      // updateone, 没有查询到相关数据则直接新增一条
      if( !blocks.updateone( makedocument( kvp( "blockid", blockidstr )),
                              makedocument( kvp( "$set", blockdoc.view())), updateopts )) {
         EOSASSERT( false, chain::mongodbinsertfail, "Failed to insert block ${bid}", ("bid", blockid));
      }
   } catch(...) {
      handlemongoexception("blocks insert: " + json, LINE);
   }

4. mongo db plugin impl:: process irreversible block 执行不可逆区块,,需要start block reached标识位为true。涉及mongo的两张表:blocks表和transactions表。

// 创世块区块号为1,没有信号到accepted_block处理。
if (block_num < 2) return;


传入的参数,思想与上面的几个函数设计是相同的,它的类型与上面的 process accepted block函数相同,是block state ptr类型的对象bs。从bs中获取到区块,首先会通过find block去mongo中查询,如果有的话就不再处理。

  • blocks 数据映射更新插入。由于它是在 process accepted block函数的后面执行,所以语句update opts.upsert( true );在这里的update_one也是有效的。

bulk: 是一系列操作的集合。

mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);// false说明可以并行,所有操作互不影响。若为true,则顺序执行,一旦遇错直接中止,后面的操作不会被执行到。
auto bulk = trans.create_bulk_write(bulk_opts);//所有的操作针对的是trans对象,对应的mongo表为transactions。
auto updatedoc = makedocument( kvp( "$set", makedocument( kvp( "irreversible", bbool{true} ),
  kvp( "validated", bbool{bs->validated} ),
  kvp( "incurrentchain", bbool{bs->incurrentchain} ),
  kvp( "updatedAt", b_date{now}))));
blocks.updateone( makedocument( kvp( "id", irblock->view()["id"].getoid())), update_doc.view());
transactions
transactons是一个数组,一个block可以包含很多条transaction,因此这里要有个循环来处理。
对于transaction在mongo中的存储历史,也有对应的find_transaction去mongo中查询,如果有的话就不再处理。
auto updatedoc = makedocument( kvp( "$set", makedocument( kvp( "irreversible", bbool{true} ),
  kvp( "blockid", blockidstr),
  kvp( "blocknum", bint32{staticcast<int32t>(blocknum)} ),
  kvp( "updatedAt", b_date{now}))));
mongocxx::model::updateone updateop{ 
makedocument(kvp("id", ir_trans->view()["id"].getoid())), update_doc.view()};

最后通过在transaction循环中设定一个标志位transactions in block来确定transaction遍历结束。

mongo db plugin总结

我们是通过nodeos命令的initialize函数跟踪到mongo db plugin的,关于mongo db plugin的一切,可以总结为顺序:

1. set_program_option,设置配置参数
2. plugin_initialize,初始化使plugin配置参数生效,准备mongo连接,queue机制,信号槽监听chain出块action。
3. init,mongo库表初始化,建立索引,定义了consume_thread线程用来消费queue区块数据。initialize周期结束。
4. consume_block,线程触发与等待策略,process_queue消费中转策略,根据四种数据结构(即上文反复提到的那四个结构)分发消费函数。

|table | function insert | function update

|-|-|-

|transactions |accepted trx |irreversible block(bulk)

|actions | accepted trx(bulk) |block states

| accepted block |blocks | accepted block

| irreversible block |transaction traces

|applied trx |accounts |accepted trx

比较特殊的一个表是accounts,它可以过滤actions内容,找到newaccount的action并保存账户到表里。这给我们以启发,我们可以自己定义新的表来过滤自己需要的action,例如我们自己写的智能合约。

(六)initialize_logging()

日志系统初始化。

void initialize_logging()
{
   auto config_path = app().get_logging_conf();
   if(fc::exists(config_path))
     fc::configure_logging(config_path); //故意不去捕捉异常
   for(auto iter : fc::get_appender_map())
     iter.second->initialize(app().get_io_service());
   // 重复以上代码逻辑,利用boost::asio::signal\_set机制,async\_wait。
   logging_conf_loop();
}


(七)startup()

void application::startup() {
   try {
      for (auto plugin : initialized_plugins)//遍历所有已初始化的插件,执行他们的startup函数。
         plugin->startup();
   } catch(...) {
      shutdown();//如有异常,则调用shutdown函数,清空容器,释放资源。
      throw;
   }
}


这里仍旧以mongo db plugin为例,它的startup()是空。而对于其他plugin而言,startup都有很多工作要做,例如producer plugin和chain plugin都非常重要,此外涉及到重要的控制器部分controller也需要仔细研究。由于本文篇幅过长,我们重点仍旧围绕mongo db plugin来介绍整个nodeos启动的生命周期。

(八)exec()

main入口函数执行到最后一个步骤:exec函数。

`
void application::exec() {
   std::sharedptr<boost::asio::signalset> sigintset(new boost::asio::signalset(*ioserv, SIGINT));
   sigintset->async_wait([sigintset,this](const boost::system::errorcode& err, int num) {
     quit();
     sigint_set->cancel();
   });
std::sharedptr<boost::asio::signalset> sigtermset(new boost::asio::signalset(*ioserv, SIGTERM));
   sigtermset->async_wait([sigtermset,this](const boost::system::errorcode& err, int num) {
     quit();
     sigterm_set->cancel();
   });
std::sharedptr<boost::asio::signalset> sigpipeset(new boost::asio::signalset(*ioserv, SIGPIPE));
   sigpipeset->async_wait([sigpipeset,this](const boost::system::errorcode& err, int num) {
     quit();
     sigpipe_set->cancel();
   });
ioserv->run();// 与上面initializelogging的getioservice()获取到的io_serv是同一个对象
shutdown(); /// 同步推出
}`

这个函数与initialize logging的循环中涉及到相同的信号机制boost::asio::signal set。

boost::asio::signal_set

boost库的信号量技术 。它要使用到boost::asio::io_service,这也是上面提到多次的。信号量对象的初始化可参照前文一段代码,如下:

std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));

共享指针这里不谈了,感兴趣的同学请转到这里。它的构造函数是传入了一个boost::asio::io_service以及一个信号number SIGINT。这个SIGINT的声明为:

`

define SIGINT      2   / Interrupt (ANSI).   /

`

这个构造函数实现了向信号量集合中添加了一个信号2。

接着,我要通过async wait来使用信号量。可以贴上上面initialize logging函数的logging conf loop函数。

void logging_conf_loop()
{
   std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(app().get_io_service(), SIGHUP));
   sighup_set->async_wait([sighup_set](const boost::system::error_code& err, int /*num*/) {
      if(!err)
      {
         ilog("Received HUP.  Reloading logging configuration.");
         auto config_path = app().get_logging_conf();
         if(fc::exists(config_path))
            ::detail::configure_logging(config_path);
         for(auto iter : fc::get_appender_map())
            iter.second->initialize(app().get_io_service());
         logging_conf_loop();
      }
   });
}


可以直接通过sighup set->async wait的方式来使用。它的声明定义是: void (boost::system::error_code, int)) 会在所监听的信号触发时调用函数体。当发生错误的时候,退出logging conf loop函数的递归调用。

总结

写到这里,我们的nodeos的命令就启动成功了,由于篇幅限制,我们没有仔细去研究所有依赖的plugin,以及controller的逻辑。本文重点研究了mongo db plugin的源码实现,通过该插件,我们全面分析了nodeos命令启动的所有流程。而对于mongo db plugin插件本身的学习,我们也明白了链数据是如何同步到mongo里面的。接下来,我会继续深入分析其他相关插件的初始化流程以及启动流程,还有controller的逻辑细节,以及出块逻辑等等。

参考资料

EOSIO/eos


相关文章和视频推荐

【许晓笛】EOS 数据库与持久化 API —— 实战

圆方圆学院汇集大批区块链名师,打造精品的区块链技术课程。 在各大平台都长期有优质免费公开课,欢迎报名收看。

公开课地址:

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/31522172/viewspace-2285066/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2018-11-09

  • 博文量
    61
  • 访问量
    27235