ITPub博客

首页 > 数据库 > PostgreSQL > PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

原创 PostgreSQL 作者:husthxd 时间:2019-05-14 15:09:06 0 删除 编辑

本节继续介绍聚合函数的实现,主要介绍了agg_retrieve_hash_table函数中与投影相关的实现逻辑,包括函数prepare_projection_slot/finalize_aggregates/project_aggregates.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体


/* ---------------------
 *    AggState information
 *
 *    ss.ss_ScanTupleSlot refers to output of underlying plan.
 *  ss.ss_ScanTupleSlot指的是基础计划的输出.
 *    (ss = ScanState,ps = PlanState)
 *
 *    Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and
 *    ecxt_aggnulls arrays, which hold the computed agg values for the current
 *    input group during evaluation of an Agg node's output tuple(s).  We
 *    create a second ExprContext, tmpcontext, in which to evaluate input
 *    expressions and run the aggregate transition functions.
 *    注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组,
 *      这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值.
 * ---------------------
 */
/* these structs are private in nodeAgg.c: */
//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
    //第一个字段是NodeTag(继承自ScanState)
    ScanState    ss;                /* its first field is NodeTag */
    //targetlist和quals中所有的Aggref
    List       *aggs;            /* all Aggref nodes in targetlist & quals */
    //链表的大小(可以为0)
    int            numaggs;        /* length of list (could be zero!) */
    //pertrans条目大小
    int            numtrans;        /* number of pertrans items */
    //Agg策略模式
    AggStrategy aggstrategy;    /* strategy mode */
    //agg-splitting模式,参见nodes.h
    AggSplit    aggsplit;        /* agg-splitting mode, see nodes.h */
    //指向当前步骤数据的指针
    AggStatePerPhase phase;        /* pointer to current phase data */
    //步骤数(包括0)
    int            numphases;        /* number of phases (including phase 0) */
    //当前步骤
    int            current_phase;    /* current phase number */
    //per-Aggref信息
    AggStatePerAgg peragg;        /* per-Aggref information */
    //per-Trans状态信息
    AggStatePerTrans pertrans;    /* per-Trans state information */
    //长生命周期数据的ExprContexts(hashtable)
    ExprContext *hashcontext;    /* econtexts for long-lived data (hashtable) */
    ////长生命周期数据的ExprContexts(每一个GS使用)
    ExprContext **aggcontexts;    /* econtexts for long-lived data (per GS) */
    //输入表达式的ExprContext
    ExprContext *tmpcontext;    /* econtext for input expressions */
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
    //当前活跃的aggcontext
    ExprContext *curaggcontext; /* currently active aggcontext */
    //当前活跃的aggregate(如存在)
    AggStatePerAgg curperagg;    /* currently active aggregate, if any */
#define FIELDNO_AGGSTATE_CURPERTRANS 16
    //当前活跃的trans state
    AggStatePerTrans curpertrans;    /* currently active trans state, if any */
    //输入结束?
    bool        input_done;        /* indicates end of input */
    //Agg扫描结束?
    bool        agg_done;        /* indicates completion of Agg scan */
    //最后一个grouping set
    int            projected_set;    /* The last projected grouping set */
#define FIELDNO_AGGSTATE_CURRENT_SET 20
    //将要解析的当前grouping set
    int            current_set;    /* The current grouping set being evaluated */
    //当前投影操作的分组列
    Bitmapset  *grouped_cols;    /* grouped cols in current projection */
    //倒序的分组列链表
    List       *all_grouped_cols;    /* list of all grouped cols in DESC order */
    /* These fields are for grouping set phase data */
    //-------- 下面的列用于grouping set步骤数据
    //所有步骤中最大的sets大小
    int            maxsets;        /* The max number of sets in any phase */
    //所有步骤的数组
    AggStatePerPhase phases;    /* array of all phases */
    //对于phases > 1,已排序的输入信息
    Tuplesortstate *sort_in;    /* sorted input to phases > 1 */
    //对于下一个步骤,输入已拷贝
    Tuplesortstate *sort_out;    /* input is copied here for next phase */
    //排序结果的slot
    TupleTableSlot *sort_slot;    /* slot for sort results */
    /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */
    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
    //per-group指针的grouping set编号数组
    AggStatePerGroup *pergroups;    /* grouping set indexed array of per-group
                                     * pointers */
    //当前组的第一个元组拷贝
    HeapTuple    grp_firstTuple; /* copy of first tuple of current group */
    /* these fields are used in AGG_HASHED and AGG_MIXED modes: */
    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
    //是否已填充hash表?
    bool        table_filled;    /* hash table filled yet? */
    //hash桶数?
    int            num_hashes;
    //相应的哈希表数据数组
    AggStatePerHash perhash;    /* array of per-hashtable data */
    //per-group指针的grouping set编号数组
    AggStatePerGroup *hash_pergroup;    /* grouping set indexed array of
                                         * per-group pointers */
    /* support for evaluation of agg input expressions: */
    //---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
    //首先是->pergroups,然后是hash_pergroup
    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than
                                         * ->hash_pergroup */
    //投影实现机制
    ProjectionInfo *combinedproj;    /* projection machinery */
} AggState;
/* Primitive options supported by nodeAgg.c: */
//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE        0x01    /* substitute combinefn for transfn */
#define AGGSPLITOP_SKIPFINAL    0x02    /* skip finalfn, return state as-is */
#define AGGSPLITOP_SERIALIZE    0x04    /* apply serializefn to output */
#define AGGSPLITOP_DESERIALIZE    0x08    /* apply deserializefn to input */
/* Supported operating modes (i.e., useful combinations of these options): */
//支持的操作模式
typedef enum AggSplit
{
    /* Basic, non-split aggregation: */
    //基本 : 非split聚合
    AGGSPLIT_SIMPLE = 0,
    /* Initial phase of partial aggregation, with serialization: */
    //部分聚合的初始步骤,序列化
    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
    /* Final phase of partial aggregation, with deserialization: */
    //部分聚合的最终步骤,反序列化
    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
/* Test whether an AggSplit value selects each primitive option: */
//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

prepare_projection_slot
prepare_projection_slot函数基于指定的典型元组slot和grouping set准备finalize和project.
比如初始化isnull数组等.


/*
 * Prepare to finalize and project based on the specified representative tuple
 * slot and grouping set.
 * 基于指定的典型元组slot和grouping set准备finalize和project.
 *
 * In the specified tuple slot, force to null all attributes that should be
 * read as null in the context of the current grouping set.  Also stash the
 * current group bitmap where GroupingExpr can get at it.
 * 在指定的元组slot,强制在当前grouping set上下文中应为null的所有属性值为null.
 * 还可以将当前组位图保存在GroupingExpr可以获得的位置.
 *
 * This relies on three conditions:
 * 这取决于下面3个条件:
 *
 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
 * only reference it in evaluations, which will only access individual
 * attributes.
 * 1) 永远不会尝试从该slot中提取整个元组,只是在解析中依赖它,这只会访问单个属性.
 *
 * 2) No system columns are going to need to be nulled. (If a system column is
 * referenced in a group clause, it is actually projected in the outer plan
 * tlist.)
 * 2) 系统列不需要设置为null.
 *    (如在group语句中依赖系统列,实际上已在outer plan tlist中已完成投影)
 *
 * 3) Within a given phase, we never need to recover the value of an attribute
 * once it has been set to null.
 * 3) 在给定的阶段,一旦属性被设置为null,就不需要恢复属性值.
 *
 * Poking into the slot this way is a bit ugly, but the consensus is that the
 * alternative was worse.
 * 以这种方法使用slot有点丑陋,但其他方式更糟糕.
 */
static void
prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
{
    if (aggstate->phase->grouped_cols)
    {
        Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
        aggstate->grouped_cols = grouped_cols;
        if (slot->tts_isempty)
        {
            /*
             * Force all values to be NULL if working on an empty input tuple
             * (i.e. an empty grouping set for which no input rows were
             * supplied).
             * 如输入tuple为空,则强制所有值为NULL.
             * (如不提供输入行的空grouping set)
             */
            ExecStoreAllNullTuple(slot);
        }
        else if (aggstate->all_grouped_cols)
        {
            ListCell   *lc;
            /* all_grouped_cols is arranged in desc order */
            //all_grouped_cols以倒序的方式组织
            slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
            foreach(lc, aggstate->all_grouped_cols)
            {
                int            attnum = lfirst_int(lc);
                if (!bms_is_member(attnum, grouped_cols))
                    slot->tts_isnull[attnum - 1] = true;
            }
        }
    }
}

finalize_aggregates
finalize_aggregates函数计算某一组所有聚合的最终值,实现函数是finalize_aggregate,该实现函数下节再行介绍.


/*
 * Compute the final value of all aggregates for one group.
 * 计算某一组所有聚合的最终值
 *
 * This function handles only one grouping set at a time, which the caller must
 * have selected.  It's also the caller's responsibility to adjust the supplied
 * pergroup parameter to point to the current set's transvalues.
 * 该函数一次只会处理一个grouping set(调用者负责选择).
 * 调用者同样有职责调整提供的pergroup参数为指向当前集合的transvalues.
 *
 * Results are stored in the output econtext aggvalues/aggnulls.
 */
static void
finalize_aggregates(AggState *aggstate,
                    AggStatePerAgg peraggs,
                    AggStatePerGroup pergroup)
{
    ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    Datum       *aggvalues = econtext->ecxt_aggvalues;
    bool       *aggnulls = econtext->ecxt_aggnulls;
    int            aggno;
    int            transno;
    /*
     * If there were any DISTINCT and/or ORDER BY aggregates, sort their
     * inputs and run the transition functions.
     * 如存在DISTINCT或ORDER BY 聚合,排序这些输入并执行转换函数.
     */
    //遍历转换函数
    for (transno = 0; transno < aggstate->numtrans; transno++)
    {
        //转换函数
        AggStatePerTrans pertrans = &aggstate->pertrans[transno];
        //pergroup
        AggStatePerGroup pergroupstate;
        pergroupstate = &pergroup[transno];
        if (pertrans->numSortCols > 0)
        {
            //--- 存在DISTINCT/ORDER BY
            //验证,Hash不需要排序
            Assert(aggstate->aggstrategy != AGG_HASHED &&
                   aggstate->aggstrategy != AGG_MIXED);
            if (pertrans->numInputs == 1)
                //单独
                process_ordered_aggregate_single(aggstate,
                                                 pertrans,
                                                 pergroupstate);
            else
                //多个
                process_ordered_aggregate_multi(aggstate,
                                                pertrans,
                                                pergroupstate);
        }
    }
    /*
     * Run the final functions.
     * 执行获取最终值的函数
     */
    //遍历聚合
    for (aggno = 0; aggno < aggstate->numaggs; aggno++)
    {
        //获取peragg
        AggStatePerAgg peragg = &peraggs[aggno];
        int            transno = peragg->transno;
        AggStatePerGroup pergroupstate;
        //pergroup
        pergroupstate = &pergroup[transno];
        if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
            //并行处理结果
            finalize_partialaggregate(aggstate, peragg, pergroupstate,
                                      &aggvalues[aggno], &aggnulls[aggno]);
        else
            //调用finalize_aggregate获取结果
            finalize_aggregate(aggstate, peragg, pergroupstate,
                               &aggvalues[aggno], &aggnulls[aggno]);
    }
}

project_aggregates
project_aggregates函数投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到).


/*
 * Project the result of a group (whose aggs have already been calculated by
 * finalize_aggregates). Returns the result slot, or NULL if no row is
 * projected (suppressed by qual).
 * 投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到).
 * 返回结果slot,如无结果行投影(通过qual处理)则返回NULL.
 */
static TupleTableSlot *
project_aggregates(AggState *aggstate)
{
    ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    /*
     * Check the qual (HAVING clause); if the group does not match, ignore it.
     * 检查条件表达式(HAVING子句).如跟group不匹配,则忽略之.
     */
    if (ExecQual(aggstate->ss.ps.qual, econtext))
    {
        /*
         * Form and return projection tuple using the aggregate results and
         * the representative input tuple.
         * 使用聚合结果和相应的输入tuple组成并返回投影元组.
         */
        return ExecProject(aggstate->ss.ps.ps_ProjInfo);
    }
    else
        InstrCountFiltered1(aggstate, 1);
    return NULL;
}
#define InstrCountFiltered1(node, delta) \
    do { \
        if (((PlanState *)(node))->instrument) \
            ((PlanState *)(node))->instrument->nfiltered1 += (delta); \
    } while(0)

ExecProject
ExecProject函数基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中.


/*
 * ExecProject
 *
 * Projects a tuple based on projection info and stores it in the slot passed
 * to ExecBuildProjectInfo().
 * 基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中.
 *
 * Note: the result is always a virtual tuple; therefore it may reference
 * the contents of the exprContext's scan tuples and/or temporary results
 * constructed in the exprContext.  If the caller wishes the result to be
 * valid longer than that data will be valid, he must call ExecMaterializeSlot
 * on the result slot.
 * 注意:结果通常是虚拟元组.因此该元组可能会依赖exprContext扫描元组的内容和/或在exprContext中构建的临时结果.
 * 如果调用者希望结果比数据更长久有效,调用者必须调用在结果slot上调用ExecMaterializeSlot(物化).
 */
#ifndef FRONTEND
static inline TupleTableSlot *
ExecProject(ProjectionInfo *projInfo)
{
    ExprContext *econtext = projInfo->pi_exprContext;
    ExprState  *state = &projInfo->pi_state;
    TupleTableSlot *slot = state->resultslot;
    bool        isnull;
    /*
     * Clear any former contents of the result slot.  This makes it safe for
     * us to use the slot's Datum/isnull arrays as workspace.
     * 清理结果slot的形式内容.
     * 这可以确保slot的Datum/isnull数组是OK的.
     */
    ExecClearTuple(slot);
    /* Run the expression, discarding scalar result from the last column. */
    //执行表达式解析,丢弃scalar结果.
    (void) ExecEvalExprSwitchContext(state, econtext, &isnull);
    /*
     * Successfully formed a result row.  Mark the result slot as containing a
     * valid virtual tuple (inlined version of ExecStoreVirtualTuple()).
     * 成功组成一个结果行.
     * 标记结果slot为包含有效虚拟元组(内联版本的ExecStoreVirtualTuple)
     */
    slot->tts_isempty = false;
    slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
    return slot;
}
#endif

三、跟踪分析

N/A

四、参考资料

PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/6906/viewspace-2644304/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
长期从事政务、金融等行业产品研发和架构设计工作,ITPUB数据库版块资深版主,对Oracle、PostgreSQL以及大数据等相关技术有深入研究。现就职于广州云图数据技术有限公司,系统架构师。

注册时间:2007-12-28

  • 博文量
    1253
  • 访问量
    3727574