ITPub博客

首页 > 数据库 > PostgreSQL > PostgreSQL 源码解读(122)- MVCC#7(提交事务-整体流程)

PostgreSQL 源码解读(122)- MVCC#7(提交事务-整体流程)

原创 PostgreSQL 作者:husthxd 时间:2019-01-18 11:06:14 0 删除 编辑

本节介绍了PostgreSQL提交事务的整体处理逻辑,主要解析了函数CommitTransaction的实现逻辑。

一、数据结构

TransactionState
事务状态结构体


/*
 *  transaction states - transaction state from server perspective
 *  事务状态枚举 - 服务器视角的事务状态
 */
typedef enum TransState
{
    TRANS_DEFAULT,              /* idle 空闲 */
    TRANS_START,                /* transaction starting 事务启动 */
    TRANS_INPROGRESS,           /* inside a valid transaction 进行中 */
    TRANS_COMMIT,               /* commit in progress 提交中 */
    TRANS_ABORT,                /* abort in progress 回滚中 */
    TRANS_PREPARE               /* prepare in progress 准备中 */
} TransState;
/*
 *  transaction block states - transaction state of client queries
 *  事务块状态 - 客户端查询的事务状态
 *
 * Note: the subtransaction states are used only for non-topmost
 * transactions; the others appear only in the topmost transaction.
 * 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
 */
typedef enum TBlockState
{
    /* not-in-transaction-block states 未进入事务块状态 */
    TBLOCK_DEFAULT,             /* idle 空闲  */
    TBLOCK_STARTED,             /* running single-query transaction 单个查询事务 */
    /* transaction block states 事务块状态 */
    TBLOCK_BEGIN,               /* starting transaction block 开始事务块 */
    TBLOCK_INPROGRESS,          /* live transaction 进行中 */
    TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
    TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
    TBLOCK_END,                 /* COMMIT received 接收到COMMIT */
    TBLOCK_ABORT,               /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
    TBLOCK_ABORT_END,           /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
    TBLOCK_ABORT_PENDING,       /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
    TBLOCK_PREPARE,             /* live xact, PREPARE received 进行中,接收到PREPARE */
    /* subtransaction states 子事务状态 */
    TBLOCK_SUBBEGIN,            /* starting a subtransaction 开启 */
    TBLOCK_SUBINPROGRESS,       /* live subtransaction 进行中 */
    TBLOCK_SUBRELEASE,          /* RELEASE received 接收到RELEASE */
    TBLOCK_SUBCOMMIT,           /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
    TBLOCK_SUBABORT,            /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
    TBLOCK_SUBABORT_END,        /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
    TBLOCK_SUBABORT_PENDING,    /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
    TBLOCK_SUBRESTART,          /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
    TBLOCK_SUBABORT_RESTART     /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;
/*
 *  transaction state structure
 *  事务状态结构体
 */
typedef struct TransactionStateData
{
    //事务ID
    TransactionId transactionId;    /* my XID, or Invalid if none */
    //子事务ID
    SubTransactionId subTransactionId;  /* my subxact ID */
    //保存点名称
    char       *name;           /* savepoint name, if any */
    //保存点级别
    int         savepointLevel; /* savepoint level */
    //低级别的事务状态
    TransState  state;          /* low-level state */
    //高级别的事务状态
    TBlockState blockState;     /* high-level state */
    //事务嵌套深度
    int         nestingLevel;   /* transaction nesting depth */
    //GUC上下文嵌套深度
    int         gucNestLevel;   /* GUC context nesting depth */
    //事务生命周期上下文
    MemoryContext curTransactionContext;    /* my xact-lifetime context */
    //查询资源
    ResourceOwner curTransactionOwner;  /* my query resources */
    //按XID顺序保存的已提交的子事务ID
    TransactionId *childXids;   /* subcommitted child XIDs, in XID order */
    //childXids数组大小
    int         nChildXids;     /* # of subcommitted child XIDs */
    //分配的childXids数组空间
    int         maxChildXids;   /* allocated size of childXids[] */
    //上一个CurrentUserId
    Oid         prevUser;       /* previous CurrentUserId setting */
    //上一个SecurityRestrictionContext
    int         prevSecContext; /* previous SecurityRestrictionContext */
    //上一事务是否只读?
    bool        prevXactReadOnly;   /* entry-time xact r/o state */
    //是否处于Recovery?
    bool        startedInRecovery;  /* did we start in recovery? */
    //XID是否已保存在WAL Record中?
    bool        didLogXid;      /* has xid been included in WAL record? */
    //Enter/ExitParallelMode计数器
    int         parallelModeLevel;  /* Enter/ExitParallelMode counter */
    //父事务状态
    struct TransactionStateData *parent;    /* back link to parent */
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;

二、源码解读

CommitTransaction函数,提交事务,并执行相关的清理操作.


/*
 *  CommitTransaction
 *
 * NB: if you change this routine, better look at PrepareTransaction too!
 * 注意:如果改变了这个过程的逻辑,最好同时处理PrepareTransaction.
 */
static void
CommitTransaction(void)
{
    TransactionState s = CurrentTransactionState;
    TransactionId latestXid;
    bool        is_parallel_worker;
    is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
    /* Enforce parallel mode restrictions during parallel worker commit. */
    //如为并行worker,强制进入并行模式
    if (is_parallel_worker)
        EnterParallelMode();
    ShowTransactionState("CommitTransaction");
    /*
     * check the current transaction state
     * 检查当前事务状态
     */
    if (s->state != TRANS_INPROGRESS)
        elog(WARNING, "CommitTransaction while in %s state",
             TransStateAsString(s->state));
    Assert(s->parent == NULL);
    /*
     * Do pre-commit processing that involves calling user-defined code, such
     * as triggers.  Since closing cursors could queue trigger actions,
     * triggers could open cursors, etc, we have to keep looping until there's
     * nothing left to do.
     * 执行涉及调用用户定义代码(如触发器)的预提交处理。
     * 因为关闭游标可能会执行触发器,触发器可能打开游标,等等,
     *   所以我们必须一直循环,直到没有什么可做的。
     */
    for (;;)
    {
        /*
         * Fire all currently pending deferred triggers.
         * 触发所有当前活动的触发器
         */
        AfterTriggerFireDeferred();
        /*
         * Close open portals (converting holdable ones into static portals).
         * If there weren't any, we are done ... otherwise loop back to check
         * if they queued deferred triggers.  Lather, rinse, repeat.
         * 关闭打开的portals(将可持有门户转换为静态门户).
         * 如果已不存在,则说明已完成.
         *   否则一直循环检查触发器队列.
         */
        if (!PreCommit_Portals(false))
            break;
    }
    CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
                      : XACT_EVENT_PRE_COMMIT);
    /*
     * The remaining actions cannot call any user-defined code, so it's safe
     * to start shutting down within-transaction services.  But note that most
     * of this stuff could still throw an error, which would switch us into
     * the transaction-abort path.
     * 其余的操作不能调用任何用户定义的代码,因此可以安全地开始关闭事务内的服务。
     * 但是请注意,大多数这些动作仍然会抛出错误,这将把执行流程切换到事务中止执行路径上。
     */
    /* If we might have parallel workers, clean them up now. */
    //存在并行worker,清除之
    if (IsInParallelMode())
        AtEOXact_Parallel(true);
    /* Shut down the deferred-trigger manager */
    //关闭延迟触发器管理器
    AfterTriggerEndXact(true);
    /*
     * Let ON COMMIT management do its thing (must happen after closing
     * cursors, to avoid dangling-reference problems)
     * 让ON COMMIT管理器执行这个事情.
     * (必须在关闭游标后发生,以避免挂起引用问题)
     */
    PreCommit_on_commit_actions();
    /* close large objects before lower-level cleanup */
    //在低级别的清理请,关闭大对象
    AtEOXact_LargeObject(true);
    /*
     * Mark serializable transaction as complete for predicate locking
     * purposes.  This should be done as late as we can put it and still allow
     * errors to be raised for failure patterns found at commit.
     * 将可序列化事务标记为谓词锁定完成。
     * 这应该尽可能迟地完成,并且仍然允许在提交时发现的失败从而引发错误。
     */
    PreCommit_CheckForSerializationFailure();
    /*
     * Insert notifications sent by NOTIFY commands into the queue.  This
     * should be late in the pre-commit sequence to minimize time spent
     * holding the notify-insertion lock.
     * 将NOTIFY命令发送的通知插入到队列中。
     * 这应该在预提交序列的末尾,以最小化持有通知插入锁的时间。
     */
    PreCommit_Notify();
    /* Prevent cancel/die interrupt while cleaning up */
    //在清理时禁用中断
    HOLD_INTERRUPTS();
    /* Commit updates to the relation map --- do this as late as possible */
    //提交更新到relation map -- 尽可能晚的执行该动作
    AtEOXact_RelationMap(true, is_parallel_worker);
    /*
     * set the current transaction state information appropriately during
     * commit processing
     * 在commit过程中设置当前事务状态信息.
     */
    s->state = TRANS_COMMIT;
    s->parallelModeLevel = 0;
    if (!is_parallel_worker)
    {
        /*
         * We need to mark our XIDs as committed in pg_xact.  This is where we
         * durably commit.
         * 我们需要在pg_xact中将xid标记为已提交。
         */
        latestXid = RecordTransactionCommit();
    }
    else
    {
        /*
         * We must not mark our XID committed; the parallel master is
         * responsible for that.
         * 并行worker,不需要标记XID的提交标记,并行管理器处理此事情.
         */
        latestXid = InvalidTransactionId;
        /*
         * Make sure the master will know about any WAL we wrote before it
         * commits.
         * 确保master在提交之前知道worker写入的WAL。
         */
        ParallelWorkerReportLastRecEnd(XactLastRecEnd);
    }
    TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
    /*
     * Let others know about no transaction in progress by me. Note that this
     * must be done _before_ releasing locks we hold and _after_
     * RecordTransactionCommit.
     * 通知其他进程知道该进程没有进行中的事务。
     * 注意,这必须在释放持有的锁之前执行,并在RecordTransactionCommit之后执行。
     */
    ProcArrayEndTransaction(MyProc, latestXid);
    /*
     * This is all post-commit cleanup.  Note that if an error is raised here,
     * it's too late to abort the transaction.  This should be just
     * noncritical resource releasing.
     * 这些都是提交后清理。
     * 请注意,如果这里出现错误,则终止事务就太迟了.
     * 这应该是非关键的资源释放。
     *
     * The ordering of operations is not entirely random.  The idea is:
     * release resources visible to other backends (eg, files, buffer pins);
     * then release locks; then release backend-local resources. We want to
     * release locks at the point where any backend waiting for us will see
     * our transaction as being fully cleaned up.
     * 操作的顺序并不是完全随机的。
     * 其思想是:释放对其他后台进程可见的资源(如文件、buffer pins);
     *   然后释放锁;然后释放后端本地资源。
     * 我们希望在所有等待我们的后台进程看到我们的事务被完全清理的时候才释放锁。
     *
     * Resources that can be associated with individual queries are handled by
     * the ResourceOwner mechanism.  The other calls here are for backend-wide
     * state.
     * 与单个查询关联的资源由ResourceOwner机制处理。
     * 这里的其他调用是针对后台进程范围的状态的。
     */
    CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
                      : XACT_EVENT_COMMIT);
    ResourceOwnerRelease(TopTransactionResourceOwner,
                         RESOURCE_RELEASE_BEFORE_LOCKS,
                         true, true);
    /* Check we've released all buffer pins */
    //检查已释放所有的buffer pins
    AtEOXact_Buffers(true);
    /* Clean up the relation cache */
    //清理关系缓存
    AtEOXact_RelationCache(true);
    /*
     * Make catalog changes visible to all backends.  This has to happen after
     * relcache references are dropped (see comments for
     * AtEOXact_RelationCache), but before locks are released (if anyone is
     * waiting for lock on a relation we've modified, we want them to know
     * about the catalog change before they start using the relation).
     * 使目录更改对所有后台进程可见。
     * 这必须发生在relcache引用被删除之后(参见AtEOXact_RelationCache注释),
     *   但是在锁被释放之前(如果有人在等待我们修改的关系的锁,我们希望他们在开始使用关系之前知道目录的更改)。
     */
    AtEOXact_Inval(true);
    AtEOXact_MultiXact();
    ResourceOwnerRelease(TopTransactionResourceOwner,
                         RESOURCE_RELEASE_LOCKS,
                         true, true);
    ResourceOwnerRelease(TopTransactionResourceOwner,
                         RESOURCE_RELEASE_AFTER_LOCKS,
                         true, true);
    /*
     * Likewise, dropping of files deleted during the transaction is best done
     * after releasing relcache and buffer pins.  (This is not strictly
     * necessary during commit, since such pins should have been released
     * already, but this ordering is definitely critical during abort.)  Since
     * this may take many seconds, also delay until after releasing locks.
     * Other backends will observe the attendant catalog changes and not
     * attempt to access affected files.
     * 同样,在事务期间删除的文件的清理最好在释放relcache和buffer pin之后进行。
     * (这在提交过程中并不是必须的,因为这样的pins应该已经被释放了,
     *  但是该顺序在中止过程中绝对是至关重要的。)
     * 因为这可能需要较长的时间,所以也要延迟到释放锁之后。
     * 其他后台进程将监控相关的catalog更改,不尝试访问受影响的文件。
     */
    smgrDoPendingDeletes(true);
    AtCommit_Notify();
    AtEOXact_GUC(true, 1);
    AtEOXact_SPI(true);
    AtEOXact_Enum();
    AtEOXact_on_commit_actions(true);
    AtEOXact_Namespace(true, is_parallel_worker);
    AtEOXact_SMgr();
    AtEOXact_Files(true);
    AtEOXact_ComboCid();
    AtEOXact_HashTables(true);
    AtEOXact_PgStat(true);
    AtEOXact_Snapshot(true, false);
    AtEOXact_ApplyLauncher(true);
    pgstat_report_xact_timestamp(0);
    CurrentResourceOwner = NULL;
    ResourceOwnerDelete(TopTransactionResourceOwner);
    s->curTransactionOwner = NULL;
    CurTransactionResourceOwner = NULL;
    TopTransactionResourceOwner = NULL;
    AtCommit_Memory();
    s->transactionId = InvalidTransactionId;
    s->subTransactionId = InvalidSubTransactionId;
    s->nestingLevel = 0;
    s->gucNestLevel = 0;
    s->childXids = NULL;
    s->nChildXids = 0;
    s->maxChildXids = 0;
    XactTopTransactionId = InvalidTransactionId;
    nParallelCurrentXids = 0;
    /*
     * done with commit processing, set current transaction state back to
     * default
     * 完成提交处理后,将当前事务状态设置为default
     */
    s->state = TRANS_DEFAULT;
    RESUME_INTERRUPTS();
}

三、跟踪分析

插入数据,执行commit


10:57:56 (xdb@[local]:5432)testdb=# begin;
BEGIN
10:57:59 (xdb@[local]:5432)testdb=#* insert into t_session1 values(1);
INSERT 0 1
10:58:01 (xdb@[local]:5432)testdb=#* commit;

启动gdb,设置断点


(gdb) b CommitTransaction
Breakpoint 1 at 0x5482ae: file xact.c, line 1969.
(gdb) c
Continuing.
Breakpoint 1, CommitTransaction () at xact.c:1969
1969        TransactionState s = CurrentTransactionState;
(gdb)

查看调用栈


(gdb) bt
#0  CommitTransaction () at xact.c:1969
#1  0x0000000000549078 in CommitTransactionCommand () at xact.c:2831
#2  0x00000000008c8ea9 in finish_xact_command () at postgres.c:2523
#3  0x00000000008c6b5d in exec_simple_query (query_string=0x2c97ec8 "commit;") at postgres.c:1170
#4  0x00000000008cae70 in PostgresMain (argc=1, argv=0x2cc3dc8, dbname=0x2cc3c30 "testdb", username=0x2c94ba8 "xdb")
    at postgres.c:4182
#5  0x000000000082642b in BackendRun (port=0x2cb9c00) at postmaster.c:4361
#6  0x0000000000825b8f in BackendStartup (port=0x2cb9c00) at postmaster.c:4033
#7  0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#8  0x00000000008217b4 in PostmasterMain (argc=1, argv=0x2c92b60) at postmaster.c:1379
#9  0x00000000007488ef in main (argc=1, argv=0x2c92b60) at main.c:228
(gdb)

当前事务信息


(gdb) p *s
$1 = {transactionId = 2410, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS, 
  blockState = TBLOCK_END, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2d3cfa0, 
  curTransactionOwner = 0x2cc5868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, 
  prevXactReadOnly = false, startedInRecovery = false, didLogXid = true, parallelModeLevel = 0, parent = 0x0}
(gdb)

执行相关判断,执行预处理等


(gdb) n
1976        if (is_parallel_worker)
(gdb) 
1979        ShowTransactionState("CommitTransaction");
(gdb) 
1984        if (s->state != TRANS_INPROGRESS)
(gdb) 
1987        Assert(s->parent == NULL);
(gdb) 
2000            AfterTriggerFireDeferred();
(gdb) 
2007            if (!PreCommit_Portals(false))
(gdb) 
2008                break;
(gdb) 
2011        CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
(gdb)

继续执行预处理


(gdb) n
2022        if (IsInParallelMode())
(gdb) 
2026        AfterTriggerEndXact(true);
(gdb) 
2032        PreCommit_on_commit_actions();
(gdb) 
2035        AtEOXact_LargeObject(true);
(gdb) 
(gdb) 
2042        PreCommit_CheckForSerializationFailure();
(gdb) 
2049        PreCommit_Notify();
(gdb) 
2052        HOLD_INTERRUPTS();
(gdb) 
2055        AtEOXact_RelationMap(true);
(gdb)

修改事务状态


2061        s->state = TRANS_COMMIT;
(gdb) 
2062        s->parallelModeLevel = 0;
(gdb)

执行实际的提交事务操作


(gdb) 
2064        if (!is_parallel_worker)
(gdb) 
2070            latestXid = RecordTransactionCommit();
(gdb) 
2087        TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
(gdb)

通知其他进程知道该进程没有进行中的事务。


(gdb) 
2094        ProcArrayEndTransaction(MyProc, latestXid);
(gdb) 
2112        CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
(gdb)

释放资源


(gdb) 
2115        ResourceOwnerRelease(TopTransactionResourceOwner,
(gdb) 
2120        AtEOXact_Buffers(true);
(gdb) 
2123        AtEOXact_RelationCache(true);
(gdb) 
2132        AtEOXact_Inval(true);
(gdb) 
2134        AtEOXact_MultiXact();
(gdb) 
2136        ResourceOwnerRelease(TopTransactionResourceOwner,
(gdb) 
2139        ResourceOwnerRelease(TopTransactionResourceOwner,

执行清理操作


(gdb) 
2152        smgrDoPendingDeletes(true);
(gdb) 
2154        AtCommit_Notify();
(gdb) 
2155        AtEOXact_GUC(true, 1);
(gdb) 
2156        AtEOXact_SPI(true);
(gdb) 
2157        AtEOXact_on_commit_actions(true);
(gdb) 
2158        AtEOXact_Namespace(true, is_parallel_worker);
(gdb) 
2159        AtEOXact_SMgr();
(gdb) 
2160        AtEOXact_Files(true);
(gdb) 
2161        AtEOXact_ComboCid();
(gdb) 
2162        AtEOXact_HashTables(true);
(gdb) 
2163        AtEOXact_PgStat(true);
(gdb) 
2164        AtEOXact_Snapshot(true, false);
(gdb) 
2165        AtEOXact_ApplyLauncher(true);
(gdb) 
2166        pgstat_report_xact_timestamp(0);
(gdb) 
2168        CurrentResourceOwner = NULL;
(gdb) 
2169        ResourceOwnerDelete(TopTransactionResourceOwner);
(gdb) 
2170        s->curTransactionOwner = NULL;
(gdb)

重置事务状态


(gdb) 
2171        CurTransactionResourceOwner = NULL;
(gdb) 
2172        TopTransactionResourceOwner = NULL;
(gdb) 
2174        AtCommit_Memory();
(gdb) 
2176        s->transactionId = InvalidTransactionId;
(gdb) 
2177        s->subTransactionId = InvalidSubTransactionId;
(gdb) 
2178        s->nestingLevel = 0;
(gdb) 
2179        s->gucNestLevel = 0;
(gdb) 
2180        s->childXids = NULL;
(gdb) 
2181        s->nChildXids = 0;
(gdb) 
2182        s->maxChildXids = 0;
(gdb) 
2184        XactTopTransactionId = InvalidTransactionId;
(gdb) 
2185        nParallelCurrentXids = 0;
(gdb) 
2191        s->state = TRANS_DEFAULT;
(gdb) 
2193        RESUME_INTERRUPTS();
(gdb) 
2194    }
(gdb)

重置后的事务状态


(gdb) p *s
$2 = {transactionId = 0, subTransactionId = 0, name = 0x0, savepointLevel = 0, state = TRANS_DEFAULT, 
  blockState = TBLOCK_END, nestingLevel = 0, gucNestLevel = 0, curTransactionContext = 0x0, curTransactionOwner = 0x0, 
  childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, prevXactReadOnly = false, 
  startedInRecovery = false, didLogXid = true, parallelModeLevel = 0, parent = 0x0}
(gdb)

执行完毕


(gdb) n
CommitTransactionCommand () at xact.c:2832
2832                s->blockState = TBLOCK_DEFAULT;
(gdb)

DONE!

四、参考资料

PG Source Code

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/6906/viewspace-2564120/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
长期从事政务、金融等行业产品研发和架构设计工作,对Oracle、PostgreSQL以及大数据等相关技术有深入研究。现就职于广州云图数据技术有限公司,系统架构师。

注册时间:2007-12-28

  • 博文量
    1169
  • 访问量
    3634699