ITPub博客

首页 > 数据库 > PostgreSQL > PostgreSQL 源码解读(117)- MVCC#2(获取快照#2)

PostgreSQL 源码解读(117)- MVCC#2(获取快照#2)

原创 PostgreSQL 作者:husthxd 时间:2019-01-11 15:14:37 0 删除 编辑

本节介绍了PostgreSQL获取事务快照的实现逻辑,重点解析了GetTransactionSnapshot->GetSnapshotData的处理过程。

一、数据结构

全局/静态变量


/*
 * Currently registered Snapshots.  Ordered in a heap by xmin, so that we can
 * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
 * 当前已注册的快照.
 * 按照xmin堆排序,这样我们可以快速找到xmin最小的一个,从而可以设置MyPgXact->xmin。
 */
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
 void *arg);
static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
/* first GetTransactionSnapshot call in a transaction? */
bool        FirstSnapshotSet = false;
/*
 * Remember the serializable transaction snapshot, if any.  We cannot trust
 * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
 * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
 * 如存在则记下serializable事务快照.
 * 我们不能信任与IsolationUsesXactSnapshot()结合使用的FirstSnapshotSet,
 *   因为GUC可能会在我们之前重置,改变IsolationUsesXactSnapshot的值。
 */
static Snapshot FirstXactSnapshot = NULL;
/*
 * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
 * mode, and to the latest one taken in a read-committed transaction.
 * SecondarySnapshot is a snapshot that's always up-to-date as of the current
 * instant, even in transaction-snapshot mode.  It should only be used for
 * special-purpose code (say, RI checking.)  CatalogSnapshot points to an
 * MVCC snapshot intended to be used for catalog scans; we must invalidate it
 * whenever a system catalog change occurs.
 * CurrentSnapshot指向在transaction-snapshot模式下获取的唯一快照/在read-committed事务中获取的最新快照。
 * SecondarySnapshot是即使在transaction-snapshot模式下,也总是最新的快照。它应该只用于特殊用途码(例如,RI检查)。
 * CatalogSnapshot指向打算用于catalog扫描的MVCC快照;
 *  无论何时发生system catalog更改,我们都必须马上使其失效。
 *
 * These SnapshotData structs are static to simplify memory allocation
 * (see the hack in GetSnapshotData to avoid repeated malloc/free).
 * 这些SnapshotData结构体是静态的便于简化内存分配.
 * (可以回过头来看GetSnapshotData函数如何避免重复的malloc/free)
 */
static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
/* Pointers to valid snapshots */
//指向有效的快照
static Snapshot CurrentSnapshot = NULL;
static Snapshot SecondarySnapshot = NULL;
static Snapshot CatalogSnapshot = NULL;
static Snapshot HistoricSnapshot = NULL;
/*
 * These are updated by GetSnapshotData.  We initialize them this way
 * for the convenience of TransactionIdIsInProgress: even in bootstrap
 * mode, we don't want it to say that BootstrapTransactionId is in progress.
 * 这些变量通过函数GetSnapshotData更新.
 * 为了便于TransactionIdIsInProgress,以这种方式初始化它们:
 *   即使在引导模式下,我们也不希望表示BootstrapTransactionId正在进行中。
 *
 * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
 * InvalidTransactionId, to ensure that no one tries to use a stale
 * value. Readers should ensure that it has been set to something else
 * before using it.
 * RecentGlobalXmin和RecentGlobalDataXmin初始化为InvalidTransactionId,
 *   以确保没有人尝试使用过时的值。
 * 在使用它之前,读取进程应确保它已经被设置为其他值。
 */
TransactionId TransactionXmin = FirstNormalTransactionId;
TransactionId RecentXmin = FirstNormalTransactionId;
TransactionId RecentGlobalXmin = InvalidTransactionId;
TransactionId RecentGlobalDataXmin = InvalidTransactionId;
/* (table, ctid) => (cmin, cmax) mapping during timetravel */
static HTAB *tuplecid_data = NULL;

MyPgXact
当前的事务信息.


/*
 * Flags for PGXACT->vacuumFlags
 * PGXACT->vacuumFlags标记
 *
 * Note: If you modify these flags, you need to modify PROCARRAY_XXX flags
 * in src/include/storage/procarray.h.
 * 注意:如果修改了这些标记,需要更新src/include/storage/procarray.h中的PROCARRAY_XXX标记
 *
 * PROC_RESERVED may later be assigned for use in vacuumFlags, but its value is
 * used for PROCARRAY_SLOTS_XMIN in procarray.h, so GetOldestXmin won't be able
 * to match and ignore processes with this flag set.
 * PROC_RESERVED可能在接下来分配给vacuumFlags使用,
 *   但是它在procarray.h中用于标识PROCARRAY_SLOTS_XMIN,
 *   因此GetOldestXmin不能匹配和忽略使用此标记的进程.
 */
//是否auto vacuum worker?
#define     PROC_IS_AUTOVACUUM  0x01    /* is it an autovac worker? */
//正在运行lazy vacuum
#define     PROC_IN_VACUUM      0x02    /* currently running lazy vacuum */
//正在运行analyze
#define     PROC_IN_ANALYZE     0x04    /* currently running analyze */
//只能通过auto vacuum设置
#define     PROC_VACUUM_FOR_WRAPAROUND  0x08    /* set by autovac only */
//在事务外部正在执行逻辑解码
#define     PROC_IN_LOGICAL_DECODING    0x10    /* currently doing logical
                                                 * decoding outside xact */
//保留用于procarray
#define     PROC_RESERVED               0x20    /* reserved for procarray */
/* flags reset at EOXact */
//在EOXact时用于重置标记的MASK
#define     PROC_VACUUM_STATE_MASK \
    (PROC_IN_VACUUM | PROC_IN_ANALYZE | PROC_VACUUM_FOR_WRAPAROUND)
/*
 * Prior to PostgreSQL 9.2, the fields below were stored as part of the
 * PGPROC.  However, benchmarking revealed that packing these particular
 * members into a separate array as tightly as possible sped up GetSnapshotData
 * considerably on systems with many CPU cores, by reducing the number of
 * cache lines needing to be fetched.  Thus, think very carefully before adding
 * anything else here.
 */
typedef struct PGXACT
{
    //当前的顶层事务ID(非子事务)
    //出于优化的目的,只读事务并不会分配事务号(xid = 0)
    TransactionId xid;          /* id of top-level transaction currently being
                                 * executed by this proc, if running and XID
                                 * is assigned; else InvalidTransactionId */
    //在启动事务时,当前正在执行的最小事务号XID,但不包括LAZY VACUUM
    //vacuum不能清除删除事务号xid >= xmin的元组
    TransactionId xmin;         /* minimal running XID as it was when we were
                                 * starting our xact, excluding LAZY VACUUM:
                                 * vacuum must not remove tuples deleted by
                                 * xid >= xmin ! */
    //vacuum相关的标记
    uint8       vacuumFlags;    /* vacuum-related flags, see above */
    bool        overflowed;
    bool        delayChkpt;     /* true if this proc delays checkpoint start;
                                 * previously called InCommit */
    uint8       nxids;
} PGXACT;
extern PGDLLIMPORT struct PGXACT *MyPgXact;

Snapshot
SnapshotData结构体指针,SnapshotData结构体可表达的信息囊括了所有可能的快照.
有以下几种不同类型的快照:
1.常规的MVCC快照
2.在恢复期间的MVCC快照(处于Hot-Standby模式)
3.在逻辑解码过程中使用的历史MVCC快照
4.作为参数传递给HeapTupleSatisfiesDirty()函数的快照
5.作为参数传递给HeapTupleSatisfiesNonVacuumable()函数的快照
6.用于在没有成员访问情况下SatisfiesAny、Toast和Self的快照


//SnapshotData结构体指针
typedef struct SnapshotData *Snapshot;
//无效的快照
#define InvalidSnapshot     ((Snapshot) NULL)
/*
 * We use SnapshotData structures to represent both "regular" (MVCC)
 * snapshots and "special" snapshots that have non-MVCC semantics.
 * The specific semantics of a snapshot are encoded by the "satisfies"
 * function.
 * 我们使用SnapshotData结构体表示"regular" (MVCC) snapshots和具有非MVCC语义的"special" snapshots。
 */
//测试函数
typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup,
                                       Snapshot snapshot, Buffer buffer);
//常见的有:
//HeapTupleSatisfiesMVCC:判断元组对某一快照版本是否有效
//HeapTupleSatisfiesUpdate:判断元组是否可更新(同时更新同一个元组)
//HeapTupleSatisfiesDirty:判断当前元组是否存在脏数据
//HeapTupleSatisfiesSelf:判断tuple对自身信息是否有效
//HeapTupleSatisfiesToast:判断是否TOAST表
//HeapTupleSatisfiesVacuum:判断元组是否能被VACUUM删除
//HeapTupleSatisfiesAny:所有元组都可见
//HeapTupleSatisfiesHistoricMVCC:用于CATALOG 表
/*
 * Struct representing all kind of possible snapshots.
 * 该结构体可表达的信息囊括了所有可能的快照.
 * 
 * There are several different kinds of snapshots:
 * * Normal MVCC snapshots
 * * MVCC snapshots taken during recovery (in Hot-Standby mode)
 * * Historic MVCC snapshots used during logical decoding
 * * snapshots passed to HeapTupleSatisfiesDirty()
 * * snapshots passed to HeapTupleSatisfiesNonVacuumable()
 * * snapshots used for SatisfiesAny, Toast, Self where no members are
 *   accessed.
 * 有以下几种不同类型的快照:
 * * 常规的MVCC快照
 * * 在恢复期间的MVCC快照(处于Hot-Standby模式)
 * * 在逻辑解码过程中使用的历史MVCC快照
 * * 作为参数传递给HeapTupleSatisfiesDirty()函数的快照
 * * 作为参数传递给HeapTupleSatisfiesNonVacuumable()函数的快照
 * * 用于在没有成员访问情况下SatisfiesAny、Toast和Self的快照
 *
 * TODO: It's probably a good idea to split this struct using a NodeTag
 * similar to how parser and executor nodes are handled, with one type for
 * each different kind of snapshot to avoid overloading the meaning of
 * individual fields.
 * TODO: 使用类似于parser/executor nodes的处理,使用NodeTag来拆分结构体会是一个好的做法,
 *       使用OO(面向对象继承)的方法.
 */
typedef struct SnapshotData
{
    //测试tuple是否可见的函数
    SnapshotSatisfiesFunc satisfies;    /* tuple test function */
    /*
     * The remaining fields are used only for MVCC snapshots, and are normally
     * just zeroes in special snapshots.  (But xmin and xmax are used
     * specially by HeapTupleSatisfiesDirty, and xmin is used specially by
     * HeapTupleSatisfiesNonVacuumable.)
     * 余下的字段仅用于MVCC快照,在特殊快照中通常为0。
     * (xmin和xmax可用于HeapTupleSatisfiesDirty,xmin可用于HeapTupleSatisfiesNonVacuumable)
     *
     * An MVCC snapshot can never see the effects of XIDs >= xmax. It can see
     * the effects of all older XIDs except those listed in the snapshot. xmin
     * is stored as an optimization to avoid needing to search the XID arrays
     * for most tuples.
     *  XIDs >= xmax的事务,对该快照是不可见的(没有任何影响).
     * 对该快照可见的是小于xmax,但不在snapshot列表中的XIDs.
     * 记录xmin是出于优化的目的,避免为大多数tuples搜索XID数组.
     */
    //XID ∈ [2,min)是可见的 
    TransactionId xmin;         /* all XID < xmin are visible to me */
    //XID ∈ [xmax,∞)是不可见的
    TransactionId xmax;         /* all XID >= xmax are invisible to me */
    /*
     * For normal MVCC snapshot this contains the all xact IDs that are in
     * progress, unless the snapshot was taken during recovery in which case
     * it's empty. For historic MVCC snapshots, the meaning is inverted, i.e.
     * it contains *committed* transactions between xmin and xmax.
     * 对于普通的MVCC快照,xip存储了所有正在进行中的XIDs,除非在恢复期间产生的快照(这时候数组为空)
     * 对于历史MVCC快照,意义相反,即它包含xmin和xmax之间的*已提交*事务。
     *
     * note: all ids in xip[] satisfy xmin <= xip[i] < xmax
     * 注意: 所有在xip数组中的XIDs满足xmin <= xip[i] < xmax
     */
    TransactionId *xip;
    //xip数组中的元素个数
    uint32      xcnt;           /* # of xact ids in xip[] */
    /*
     * For non-historic MVCC snapshots, this contains subxact IDs that are in
     * progress (and other transactions that are in progress if taken during
     * recovery). For historic snapshot it contains *all* xids assigned to the
     * replayed transaction, including the toplevel xid.
     * 对于非历史MVCC快照,下面这些域含有活动的subxact IDs.
     *   (以及在恢复过程中状态为进行中的事务).
     * 对于历史MVCC快照,这些域字段含有*所有*用于回放事务的快照,包括顶层事务XIDs.
     *
     * note: all ids in subxip[] are >= xmin, but we don't bother filtering
     * out any that are >= xmax
     * 注意:sbuxip数组中的元素均≥ xmin,但我们不需要过滤掉任何>= xmax的项
     */
    TransactionId *subxip;
    //subxip数组元素个数
    int32       subxcnt;        /* # of xact ids in subxip[] */
    //是否溢出?
    bool        suboverflowed;  /* has the subxip array overflowed? */
    //在Recovery期间的快照?
    bool        takenDuringRecovery;    /* recovery-shaped snapshot? */
    //如为静态快照,则该值为F
    bool        copied;         /* false if it's a static snapshot */
    //在自身的事务中,CID < curcid是可见的
    CommandId   curcid;         /* in my xact, CID < curcid are visible */
    /*
     * An extra return value for HeapTupleSatisfiesDirty, not used in MVCC
     * snapshots.
     * HeapTupleSatisfiesDirty返回的值,在MVCC快照中无用
     */
    uint32      speculativeToken;
    /*
     * Book-keeping information, used by the snapshot manager
     * 用于快照管理器的Book-keeping信息
     */
    //在ActiveSnapshot栈中的引用计数
    uint32      active_count;   /* refcount on ActiveSnapshot stack */
    //在RegisteredSnapshots中的引用计数
    uint32      regd_count;     /* refcount on RegisteredSnapshots */
    //RegisteredSnapshots堆中的链接
    pairingheap_node ph_node;   /* link in the RegisteredSnapshots heap */
    //快照"拍摄"时间戳
    TimestampTz whenTaken;      /* timestamp when snapshot was taken */
    //拍照时WAL stream中的位置
    XLogRecPtr  lsn;            /* position in the WAL stream when taken */
} SnapshotData;

ShmemVariableCache
VariableCache是共享内存中的一种数据结构,用于跟踪OID和XID分配状态。
ShmemVariableCache是VariableCache结构体指针.


/*
 * VariableCache is a data structure in shared memory that is used to track
 * OID and XID assignment state.  For largely historical reasons, there is
 * just one struct with different fields that are protected by different
 * LWLocks.
 * VariableCache是共享内存中的一种数据结构,用于跟踪OID和XID分配状态。
 * 由于历史原因,这个结构体有不同的字段,由不同的LWLocks保护。
 *
 * Note: xidWrapLimit and oldestXidDB are not "active" values, but are
 * used just to generate useful messages when xidWarnLimit or xidStopLimit
 * are exceeded.
 * 注意:xidWrapLimit和oldestXidDB是不"活跃"的值,在xidWarnLimit或xidStopLimit
 *   超出限制时用于产生有用的信息.
 */
typedef struct VariableCacheData
{
    /*
     * These fields are protected by OidGenLock.
     * 这些域字段通过OidGenLock字段保护
     */
    //下一个待分配的OID
    Oid         nextOid;        /* next OID to assign */
    //在必须执行XLOG work前可用OIDs
    uint32      oidCount;       /* OIDs available before must do XLOG work */
    /*
     * These fields are protected by XidGenLock.
     * 这些字段通过XidGenLock锁保护.
     */
    //下一个待分配的事务ID
    TransactionId nextXid;      /* next XID to assign */
    //集群范围内最小datfrozenxid
    TransactionId oldestXid;    /* cluster-wide minimum datfrozenxid */
    //在该XID开始强制执行autovacuum
    TransactionId xidVacLimit;  /* start forcing autovacuums here */
    //在该XID开始提出警告
    TransactionId xidWarnLimit; /* start complaining here */
    //在该XID开外,拒绝生成下一个XID
    TransactionId xidStopLimit; /* refuse to advance nextXid beyond here */
    //"世界末日"XID,需回卷
    TransactionId xidWrapLimit; /* where the world ends */
    //持有最小datfrozenxid的DB
    Oid         oldestXidDB;    /* database with minimum datfrozenxid */
    /*
     * These fields are protected by CommitTsLock
     * 这些字段通过CommitTsLock锁保护
     */
    TransactionId oldestCommitTsXid;
    TransactionId newestCommitTsXid;
    /*
     * These fields are protected by ProcArrayLock.
     * 这些字段通过ProcArrayLock锁保护
     */
    TransactionId latestCompletedXid;   /* newest XID that has committed or
                                         * aborted */
    /*
     * These fields are protected by CLogTruncationLock
     * 这些字段通过CLogTruncationLock锁保护
     */
    //clog中最古老的XID
    TransactionId oldestClogXid;    /* oldest it's safe to look up in clog */
} VariableCacheData;
//结构体指针
typedef VariableCacheData *VariableCache;
/* pointer to "variable cache" in shared memory (set up by shmem.c) */
//共享内存中的指针(通过shmem.c设置)
VariableCache ShmemVariableCache = NULL;

二、源码解读

GetSnapshotData函数返回快照信息.
重点是构造xmin : xmax : xip_list,其实现逻辑简单总结如下:
1.获取xmax = ShmemVariableCache->latestCompletedXid + 1;
2.遍历全局procArray数组,构建快照信息
2.1 获取进程相应的事务信息pgxact
2.2 获取进程事务ID(pgxact->xid),取最小的xid作为xmin(不包括0)
2.3 把xid放入快照->xip数组中(不包括本进程所在的事务id)


/*
 * GetSnapshotData -- returns information about running transactions.
 * GetSnapshotData -- 返回关于正在运行中的事务的相关信息
 *
 * The returned snapshot includes xmin (lowest still-running xact ID),
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
 * in the range xmin <= xid < xmax.  It is used as follows:
 *      All xact IDs < xmin are considered finished.
 *      All xact IDs >= xmax are considered still running.
 *      For an xact ID xmin <= xid < xmax, consult list to see whether
 *      it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 * 返回的snapshot包括xmin(最小的正在运行的事务ID),xmax(已完结事务ID + 1),
 *   以及在xmin <= xid < xmax之间正在运行的事务IDs.
 * 意义如下:
 *   事务IDs < xmin是已确定完成的事务.
 *   事务IDs >= xmax是正在运行的事务.
 *   对于XID ∈ [xmin,xmax)的事务,需查阅列表确认是否正在运行中
 *
 * All running top-level XIDs are included in the snapshot, except for lazy
 * VACUUM processes.  We also try to include running subtransaction XIDs,
 * but since PGPROC has only a limited cache area for subxact XIDs, full
 * information may not be available.  If we find any overflowed subxid arrays,
 * we have to mark the snapshot's subxid data as overflowed, and extra work
 * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
 * in tqual.c).
 * 所有正在运行的顶层XIDs包含在快照中,除了lazy VACUUM进程.
 * 我们尝试包含所有正在运行的子事务XIDs,但由于PGPROC只有有限的缓存,包含所有的子事务信息暂未实现.
 * 如果我们搜索溢出的子事务数组,我们必须标记快照的subxid数据为溢出,
 * 而且需要执行额外的工作以确定哪些在运行(查看tqual.c中的XidInMVCCSnapshot()函数)
 *
 * We also update the following backend-global variables:
 *      TransactionXmin: the oldest xmin of any snapshot in use in the
 *          current transaction (this is the same as MyPgXact->xmin).
 *      RecentXmin: the xmin computed for the most recent snapshot.  XIDs
 *          older than this are known not running any more.
 *      RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
 *          running transactions, except those running LAZY VACUUM).  This is
 *          the same computation done by
 *          GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM).
 *      RecentGlobalDataXmin: the global xmin for non-catalog tables
 *          >= RecentGlobalXmin
 * 我们同时更新了以下后台全局变量:
 *      TransactionXmin: 当前事务中在所有仍在使用的快照中最旧的xmin(与MyPgXact->xmin一致).
 *      RecentXmin: 最近快照的xmin.小于xmin的事务已知已完结.
 *      RecentGlobalXmin:全局的xmin(除了正在运行的LAZY VACUUM,跨越所有正在运行事务的最旧的TransactionXmin),
 *                       这是使用同样的规则,通过GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM)处理.
 *      RecentGlobalDataXmin:非catalog数据表的全局xmin,该值>= RecentGlobalXmin.
 *
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
 * 注意:不应该使用非静态分配的参数调用这个函数(参见下面的xip分配)。
 */
Snapshot
GetSnapshotData(Snapshot snapshot)
{
    ProcArrayStruct *arrayP = procArray;//进程数组
    TransactionId xmin;//xmin
    TransactionId xmax;//xmax
    TransactionId globalxmin;//全局xmin
    int         index;
    int         count = 0;
    int         subcount = 0;
    bool        suboverflowed = false;
    TransactionId replication_slot_xmin = InvalidTransactionId;
    TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
    Assert(snapshot != NULL);
    /*
     * Allocating space for maxProcs xids is usually overkill; numProcs would
     * be sufficient.  But it seems better to do the malloc while not holding
     * the lock, so we can't look at numProcs.  Likewise, we allocate much
     * more subxip storage than is probably needed.
     * 为maxProcs xids分配空间通常是多余的;numProcs就足够了。
     * 但是在不持有锁的情况下执行malloc似乎更好,因此我们不能查看numProcs。
     * 同样地,我们分配的子xip存储可能比实际需要的多得多。
     *
     * This does open a possibility for avoiding repeated malloc/free: since
     * maxProcs does not change at runtime, we can simply reuse the previous
     * xip arrays if any.  (This relies on the fact that all callers pass
     * static SnapshotData structs.)
     * 这确实为避免重复的malloc/free创造了一种可能性:因为maxProcs在运行时不会改变,
     *   如果有的话,我们可以简单地重用前面的xip数组。
     * (这依赖于所有调用者都传递静态快照数据结构这一事实。)
     */
    if (snapshot->xip == NULL)
    {
        /*
         * First call for this snapshot. Snapshot is same size whether or not
         * we are in recovery, see later comments.
         * 首次调用.快照的大小不管是在常规还是在恢复状态都是一样的,看稍后的注释.
         */
        snapshot->xip = (TransactionId *)
            malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
        if (snapshot->xip == NULL)
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of memory")));
        Assert(snapshot->subxip == NULL);
        snapshot->subxip = (TransactionId *)
            malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
        if (snapshot->subxip == NULL)
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of memory")));
    }
    /*
     * It is sufficient to get shared lock on ProcArrayLock, even if we are
     * going to set MyPgXact->xmin.
     * 即使我们要设置MyPgXact->xmin,也需要获取锁,在ProcArrayLock上获得共享锁就足够了.
     * 
     */
    LWLockAcquire(ProcArrayLock, LW_SHARED);
    /* xmax is always latestCompletedXid + 1 */
    //xmax = latestCompletedXid + 1
    //已完结事务号 + 1
    xmax = ShmemVariableCache->latestCompletedXid;
    Assert(TransactionIdIsNormal(xmax));
    TransactionIdAdvance(xmax);// + 1
    /* initialize xmin calculation with xmax */
    //初始化xmin为xmax
    globalxmin = xmin = xmax;
    //是否处于恢复过程中?
    snapshot->takenDuringRecovery = RecoveryInProgress();
    if (!snapshot->takenDuringRecovery)
    {
        //不是,正常运行中
        int        *pgprocnos = arrayP->pgprocnos;//进程数
        int         numProcs;
        /*
         * Spin over procArray checking xid, xmin, and subxids.  The goal is
         * to gather all active xids, find the lowest xmin, and try to record
         * subxids.
         * Spin Over procArray,检查xid/xmin和subxids.
         * 目标是搜集所有活动的xids,找到最小的xmin,并尝试记录subxids.
         */
        numProcs = arrayP->numProcs;
        for (index = 0; index < numProcs; index++)//遍历procArray数组
        {
            int         pgprocno = pgprocnos[index];//allPgXact[]索引
            PGXACT     *pgxact = &allPgXact[pgprocno];//获取PGXACT
            TransactionId xid;//事务id
            /*
             * Skip over backends doing logical decoding which manages xmin
             * separately (check below) and ones running LAZY VACUUM.
             * 跳过正在执行逻辑解码(单独管理xmin)和执行LAZY VACUUM的进程.
             * 
             */
            if (pgxact->vacuumFlags &
                (PROC_IN_LOGICAL_DECODING | PROC_IN_VACUUM))
                continue;
            /* Update globalxmin to be the smallest valid xmin */
            //更新globalxmin为最小有效的xmin
            xid = UINT32_ACCESS_ONCE(pgxact->xmin);//获取进程事务的xmin
            if (TransactionIdIsNormal(xid) &&
                NormalTransactionIdPrecedes(xid, globalxmin))
                globalxmin = xid;
            /* Fetch xid just once - see GetNewTransactionId */
            //只提取一次xid -- 查看函数GetNewTransactionId
            xid = UINT32_ACCESS_ONCE(pgxact->xid);
            /*
             * If the transaction has no XID assigned, we can skip it; it
             * won't have sub-XIDs either.  If the XID is >= xmax, we can also
             * skip it; such transactions will be treated as running anyway
             * (and any sub-XIDs will also be >= xmax).
             * 如果事务未分配XID事务号,跳过此事务.该事务也不会含有子事务.
             * 如果XID >= xmax,我们也可以跳过,这些事务可被处理为正在运行的思维.
             *   (这些事务的子事务XID也同样会 >= xmax)
             */
            if (!TransactionIdIsNormal(xid)
                || !NormalTransactionIdPrecedes(xid, xmax))
                continue;
            /*
             * We don't include our own XIDs (if any) in the snapshot, but we
             * must include them in xmin.
             * 在快照中,不会包含自己的XIDs,但必须体现在xmin中
             */
            if (NormalTransactionIdPrecedes(xid, xmin))
                //xid 小于 xmin,设置为xid
                xmin = xid;
            if (pgxact == MyPgXact)
                continue;//跳过本事务
            /* Add XID to snapshot. */
            //添加XID到快照中
            snapshot->xip[count++] = xid;
            /*
             * Save subtransaction XIDs if possible (if we've already
             * overflowed, there's no point).  Note that the subxact XIDs must
             * be later than their parent, so no need to check them against
             * xmin.  We could filter against xmax, but it seems better not to
             * do that much work while holding the ProcArrayLock.
             * 如可能,保存子事务XIDs(如果已经溢出,那就没法了).
             * 注意子事务XIDs必须在他们的父事务之后发生,因此无需检查xmin.
             * 我们可以利用xmax进行过滤,但是在持有锁ProcArrayLock时最好不要做那么多的工作。
             *
             * The other backend can add more subxids concurrently, but cannot
             * remove any.  Hence it's important to fetch nxids just once.
             * Should be safe to use memcpy, though.  (We needn't worry about
             * missing any xids added concurrently, because they must postdate
             * xmax.)
             * 其他后台进程可能并发增加子事务ID,但不能清除.
             * 因此,只取一次nxids很重要.不过,使用memcpy是安全的.
             * (不需要担心遗漏并发增加xids,因为他们在xmax之后)
             *
             * Again, our own XIDs are not included in the snapshot.
             * 再次,我们自己的XIDs不需要包含在快照中
             */
            if (!suboverflowed)
            {
                if (pgxact->overflowed)
                    suboverflowed = true;
                else
                {
                    int         nxids = pgxact->nxids;
                    if (nxids > 0)
                    {
                        PGPROC     *proc = &allProcs[pgprocno];
                        pg_read_barrier();  /* pairs with GetNewTransactionId */
                        memcpy(snapshot->subxip + subcount,
                               (void *) proc->subxids.xids,
                               nxids * sizeof(TransactionId));
                        subcount += nxids;
                    }
                }
            }
        }
    }
    else
    {
        /*
         * We're in hot standby, so get XIDs from KnownAssignedXids.
         * 处于hot standby中,通过KnownAssignedXids获取XIDs.
         *
         * We store all xids directly into subxip[]. Here's why:
         * 直接存储所有的xids到subxip[]中,这是因为:
         *
         * In recovery we don't know which xids are top-level and which are
         * subxacts, a design choice that greatly simplifies xid processing.
         * 在恢复过程中,我们不需要知道哪些xids是顶层事务,哪些是子事务,
         * 这可以极大的简化xid处理过程.
         * 
         * It seems like we would want to try to put xids into xip[] only, but
         * that is fairly small. We would either need to make that bigger or
         * to increase the rate at which we WAL-log xid assignment; neither is
         * an appealing choice.
         * 似乎我们只想把xid放到xip[]中,但xip数组是相当小的。
         *   我们要么需要扩展,要么提高WAL-log xid分派的速度;
         *   但这两个选择都不吸引人。
         *
         * We could try to store xids into xip[] first and then into subxip[]
         * if there are too many xids. That only works if the snapshot doesn't
         * overflow because we do not search subxip[] in that case. A simpler
         * way is to just store all xids in the subxact array because this is
         * by far the bigger array. We just leave the xip array empty.
         * 如果xid太多的话,我们尝试先将xid存储到xip[]中,然后再在subxip[]中存储。
         * 这只在快照没有溢出的情况下有效,因为在这种情况下我们不搜索subxip[]。
         * 一种更简单的方法是将所有xid存储在subxact数组中,因为这个数组要大得多。
         * 让xip数组为空。
         *
         * Either way we need to change the way XidInMVCCSnapshot() works
         * depending upon when the snapshot was taken, or change normal
         * snapshot processing so it matches.
         * 无论哪种方式,我们都需要根据快照的拍摄时间更改XidInMVCCSnapshot()的工作方式,
         *   或者更改正常的快照处理,使其匹配。
         *
         * Note: It is possible for recovery to end before we finish taking
         * the snapshot, and for newly assigned transaction ids to be added to
         * the ProcArray.  xmax cannot change while we hold ProcArrayLock, so
         * those newly added transaction ids would be filtered away, so we
         * need not be concerned about them.
         * 注意:在我们完成快照之前,恢复可能会结束,
         *   并且新分配的事务id可能会添加到ProcArray中。
         * 当我们持有锁ProcArrayLock时,xmax无法更改,
         *   因此那些新添加的事务id将被过滤掉,因此无需担心。
         */
        subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
                                                  xmax);
        if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
            suboverflowed = true;
    }
    /*
     * Fetch into local variable while ProcArrayLock is held - the
     * LWLockRelease below is a barrier, ensuring this happens inside the
     * lock.
     * 持有ProcArrayLock锁时,提前到本地变量中,
     *   下面的LWLockRelease是一个屏障,确保这发生在锁内部。
     */
    replication_slot_xmin = procArray->replication_slot_xmin;
    replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
    if (!TransactionIdIsValid(MyPgXact->xmin))
        MyPgXact->xmin = TransactionXmin = xmin;
    LWLockRelease(ProcArrayLock);
    /*
     * Update globalxmin to include actual process xids.  This is a slightly
     * different way of computing it than GetOldestXmin uses, but should give
     * the same result.
     * 更新globalxmin已包含实际的进程xids.
     * 这是一种与GetOldestXmin使用的计算方法略有不同的方法,但是应该会得到相同的结果。
     */
    if (TransactionIdPrecedes(xmin, globalxmin))
        globalxmin = xmin;
    /* Update global variables too */
    //更新全局变量
    RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
    if (!TransactionIdIsNormal(RecentGlobalXmin))
        RecentGlobalXmin = FirstNormalTransactionId;
    /* Check whether there's a replication slot requiring an older xmin. */
    //检查是否存在正在请求更旧xmin的复制slot
    if (TransactionIdIsValid(replication_slot_xmin) &&
        NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
        RecentGlobalXmin = replication_slot_xmin;
    /* Non-catalog tables can be vacuumed if older than this xid */
    //比该xid小的非catalog表可被vacuum进程清除
    RecentGlobalDataXmin = RecentGlobalXmin;
    /*
     * Check whether there's a replication slot requiring an older catalog
     * xmin.
     * 检查是否存在正确请求更旧catalog xmin的复制slot
     */
    if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
        NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
        RecentGlobalXmin = replication_slot_catalog_xmin;
    RecentXmin = xmin;
    snapshot->xmin = xmin;
    snapshot->xmax = xmax;
    snapshot->xcnt = count;
    snapshot->subxcnt = subcount;
    snapshot->suboverflowed = suboverflowed;
    //当前命令id
    snapshot->curcid = GetCurrentCommandId(false);
    /*
     * This is a new snapshot, so set both refcounts are zero, and mark it as
     * not copied in persistent memory.
     * 这是一个新的快照,因此设置refcounts为0,并标记其未在持久化内存中拷贝.
     */
    snapshot->active_count = 0;
    snapshot->regd_count = 0;
    snapshot->copied = false;
    if (old_snapshot_threshold < 0)
    {
        /*
         * If not using "snapshot too old" feature, fill related fields with
         * dummy values that don't require any locking.
         * 如启用"snapshot too old"特性,使用虚拟值填充相关的字段,这里不需要锁.
         */
        snapshot->lsn = InvalidXLogRecPtr;
        snapshot->whenTaken = 0;
    }
    else
    {
        /*
         * Capture the current time and WAL stream location in case this
         * snapshot becomes old enough to need to fall back on the special
         * "old snapshot" logic.
         * 捕获当前时间和WAL流位置,以防快照变得足够旧时需要使用特殊的“old snapshot”逻辑。
         */
        snapshot->lsn = GetXLogInsertRecPtr();
        snapshot->whenTaken = GetSnapshotCurrentTimestamp();
        MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
    }
    //返回快照
    return snapshot;
}

三、跟踪分析

执行简单查询,可触发获取快照逻辑.


16:35:08 (xdb@[local]:5432)testdb=# begin;
BEGIN
16:35:13 (xdb@[local]:5432)testdb=#* select 1;

启动gdb,设置断点


(gdb) b GetSnapshotData
Breakpoint 1 at 0x89aef3: file procarray.c, line 1519.
(gdb) c
Continuing.
Breakpoint 1, GetSnapshotData (snapshot=0xf9be60 <CurrentSnapshotData>) at procarray.c:1519
1519        ProcArrayStruct *arrayP = procArray;
(gdb)

输入参数snapshot,实质是全局变量CurrentSnapshotData


(gdb) p *snapshot
$1 = {satisfies = 0xa9310d <HeapTupleSatisfiesMVCC>, xmin = 2354, xmax = 2358, xip = 0x24c7e40, xcnt = 1, 
  subxip = 0x251dfa0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = false, curcid = 0, 
  speculativeToken = 0, active_count = 0, regd_count = 0, ph_node = {first_child = 0x0, next_sibling = 0x0, 
    prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}

查看共享内存(ShmemVariableCache)中的信息.
nextXID = 2358,下一个待分配的事务ID = 2358.


(gdb) p *ShmemVariableCache
$2 = {nextOid = 42605, oidCount = 8183, nextXid = 2358, oldestXid = 561, xidVacLimit = 200000561, 
  xidWarnLimit = 2136484208, xidStopLimit = 2146484208, xidWrapLimit = 2147484208, oldestXidDB = 16400, 
  oldestCommitTsXid = 0, newestCommitTsXid = 0, latestCompletedXid = 2357, oldestClogXid = 561}
(gdb)

获取全局进程数组procArray,赋值->arrayP.
初始化相关变量.


(gdb) n
1524        int         count = 0;
(gdb) n
1525        int         subcount = 0;
(gdb) 
1526        bool        suboverflowed = false;
(gdb) 
1527        volatile TransactionId replication_slot_xmin = InvalidTransactionId;
(gdb) 
1528        volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
(gdb) 
1530        Assert(snapshot != NULL);
(gdb) 
1543        if (snapshot->xip == NULL)
(gdb)

查看进程数组信息和allPgXact[]数组编号(arrayP->pgprocnos数组).
allPgXact定义:static PGXACT *allPgXact;


(gdb) p *arrayP
$3 = {numProcs = 5, maxProcs = 112, maxKnownAssignedXids = 7280, numKnownAssignedXids = 0, tailKnownAssignedXids = 0, 
  headKnownAssignedXids = 0, known_assigned_xids_lck = 0 '\000', lastOverflowedXid = 0, replication_slot_xmin = 0, 
  replication_slot_catalog_xmin = 0, pgprocnos = 0x7f8765d9a3a8}
(gdb) p arrayP->pgprocnos[0]
$4 = 97
(gdb) p arrayP->pgprocnos[1]
$5 = 98
(gdb) p arrayP->pgprocnos[2]
$6 = 99
(gdb) p arrayP->pgprocnos[3]
$7 = 103
(gdb) p arrayP->pgprocnos[4]
$9 = 111

加锁,获取/修改相关信息


(gdb) 
1568        LWLockAcquire(ProcArrayLock, LW_SHARED);

计算xmax


(gdb) n
1571        xmax = ShmemVariableCache->latestCompletedXid;
(gdb) 
1572        Assert(TransactionIdIsNormal(xmax));
(gdb) p xmax
$10 = 2357
(gdb) n
1573        TransactionIdAdvance(xmax);
(gdb) 
1576        globalxmin = xmin = xmax;
(gdb) 
1578        snapshot->takenDuringRecovery = RecoveryInProgress();
(gdb) p xmax
$11 = 2358

判断是否处于恢复状态,当前不是恢复状态,进入相应的处理逻辑


(gdb) n
1580        if (!snapshot->takenDuringRecovery)
(gdb) p snapshot->takenDuringRecovery
$13 = false
(gdb) n
1582            int        *pgprocnos = arrayP->pgprocnos;
(gdb)

获取进程数和PGXACT索引数组,准备遍历


(gdb) n
1590            numProcs = arrayP->numProcs;
(gdb) 
1591            for (index = 0; index < numProcs; index++)
(gdb) 
(gdb) p *pgprocnos
$14 = 97
(gdb) p numProcs
$15 = 5
(gdb)

获取pgxact信息


(gdb) n
1593                int         pgprocno = pgprocnos[index];
(gdb) 
1594                volatile PGXACT *pgxact = &allPgXact[pgprocno];
(gdb) 
1601                if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
(gdb) 
1605                if (pgxact->vacuumFlags & PROC_IN_VACUUM)
(gdb) 
1609                xid = pgxact->xmin; /* fetch just once */
(gdb) p *pgxact
$16 = {xid = 0, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = false, nxids = 0 '\000'}
(gdb)

不是正常的xid,下一个pgxact


(gdb) n
1610                if (TransactionIdIsNormal(xid) &&
(gdb) 
1615                xid = pgxact->xid;
(gdb) 
1623                if (!TransactionIdIsNormal(xid)
(gdb) p xid
$17 = 0
(gdb) n
1625                    continue;
(gdb)

下一个xid = 2355,正常的事务ID


(gdb) 
1591            for (index = 0; index < numProcs; index++)
(gdb) 
1593                int         pgprocno = pgprocnos[index];
(gdb) 
1594                volatile PGXACT *pgxact = &allPgXact[pgprocno];
(gdb) 
1601                if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
(gdb) p *pgxact
$18 = {xid = 2355, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = false, nxids = 0 '\000'}
(gdb)

进行处理


(gdb) n
1605                if (pgxact->vacuumFlags & PROC_IN_VACUUM)
(gdb) 
1609                xid = pgxact->xmin; /* fetch just once */
(gdb) 
1610                if (TransactionIdIsNormal(xid) &&
(gdb) 
1615                xid = pgxact->xid;
(gdb) 
1623                if (!TransactionIdIsNormal(xid)
(gdb) 
1624                    || !NormalTransactionIdPrecedes(xid, xmax))
(gdb) 
1631                if (NormalTransactionIdPrecedes(xid, xmin))
(gdb) p xid
$19 = 2355
(gdb) p xmin
$20 = 2358
(gdb) n
1632                    xmin = xid;
(gdb) 
1633                if (pgxact == MyPgXact)
(gdb)

这是同一个xact,处理下一个xact


(gdb) 
1633                if (pgxact == MyPgXact)
(gdb) p pgxact
$21 = (volatile PGXACT *) 0x7f8765d9a218
(gdb) p MyPgXact
$22 = (struct PGXACT *) 0x7f8765d9a218
(gdb) n
1634                    continue;
(gdb)

下一个是2354


...
(gdb) p *pgxact
$23 = {xid = 2354, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = false, nxids = 0 '\000'}
(gdb)

xmin调整为2354


1631                if (NormalTransactionIdPrecedes(xid, xmin))
(gdb) 
1632                    xmin = xid;
(gdb) 
1633                if (pgxact == MyPgXact)
(gdb) p xmin
$24 = 2354
(gdb)

写入到xip_list中


1637                snapshot->xip[count++] = xid;
(gdb) 
1654                if (!suboverflowed)
(gdb) 
(gdb) p count
$25 = 1

继续循环,完成5个pgxact的遍历


1591            for (index = 0; index < numProcs; index++)
(gdb) 
1715        replication_slot_xmin = procArray->replication_slot_xmin;
(gdb)

无复制信息


(gdb) 
1715        replication_slot_xmin = procArray->replication_slot_xmin;
(gdb) p procArray->replication_slot_xmin
$28 = 0
(gdb) n
1716        replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
(gdb) 
1718        if (!TransactionIdIsValid(MyPgXact->xmin))

调整本进程的事务信息


(gdb) n
1719            MyPgXact->xmin = TransactionXmin = xmin;
(gdb) p MyPgXact->xmin
$29 = 0
(gdb) n

释放锁


1721        LWLockRelease(ProcArrayLock);
(gdb) 
1728        if (TransactionIdPrecedes(xmin, globalxmin))
(gdb)

调整全局xmin


(gdb) p xmin
$30 = 2354
(gdb) p globalxmin
$31 = 2358
(gdb) n
1729            globalxmin = xmin;
(gdb)

更新其他信息


(gdb) 
1732        RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
(gdb) p RecentGlobalXmin
$32 = 2354
(gdb) p vacuum_defer_cleanup_age
$33 = 0
(gdb) n
1733        if (!TransactionIdIsNormal(RecentGlobalXmin))
(gdb) 
1737        if (TransactionIdIsValid(replication_slot_xmin) &&
(gdb) 
1742        RecentGlobalDataXmin = RecentGlobalXmin;
(gdb) p RecentGlobalXmin
$34 = 2354
(gdb) n
1748        if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
(gdb)

填充snapshot域字段信息


(gdb) 
1752        RecentXmin = xmin;
(gdb) 
1754        snapshot->xmin = xmin;
(gdb) 
1755        snapshot->xmax = xmax;
(gdb) 
1756        snapshot->xcnt = count;
(gdb) 
1757        snapshot->subxcnt = subcount;
(gdb) 
1758        snapshot->suboverflowed = suboverflowed;
(gdb) 
1760        snapshot->curcid = GetCurrentCommandId(false);
(gdb) 
1766        snapshot->active_count = 0;
(gdb) 
1767        snapshot->regd_count = 0;
(gdb) 
1768        snapshot->copied = false;
(gdb) 
1770        if (old_snapshot_threshold < 0)
(gdb) 
1776            snapshot->lsn = InvalidXLogRecPtr;
(gdb) 
1777            snapshot->whenTaken = 0;
(gdb) 
1791        return snapshot;
(gdb)

返回snapshot


(gdb) p snapshot
$35 = (Snapshot) 0xf9be60 <CurrentSnapshotData>
(gdb) p *snapshot
$36 = {satisfies = 0xa9310d <HeapTupleSatisfiesMVCC>, xmin = 2354, xmax = 2358, xip = 0x24c7e40, xcnt = 1, 
  subxip = 0x251dfa0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = false, curcid = 0, 
  speculativeToken = 0, active_count = 0, regd_count = 0, ph_node = {first_child = 0x0, next_sibling = 0x0, 
    prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
(gdb)

注意:snapshot->satisfies函数在初始化该全局变量已设置为HeapTupleSatisfiesMVCC.

DONE!

四、参考资料

PG Source Code

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/6906/viewspace-2375566/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
长期从事政务、金融等行业产品研发和架构设计工作,对Oracle、PostgreSQL以及大数据等相关技术有深入研究。现就职于广州云图数据技术有限公司,系统架构师。

注册时间:2007-12-28

  • 博文量
    1152
  • 访问量
    3623504