本文共 51737 字,大约阅读时间需要 172 分钟。
当PostgreSQL需要insert 一条记录的时候,它会把记录头放入xmin,xmax等字段。
xmin的值,就是当前的Transaction的TransactionId。这是为了满足MVCC的需要。
跟踪程序进行了解:
/* * Allocate the next XID for a new transaction or subtransaction. * * The new XID is also stored into MyProc before returning. * * Note: when this is called, we are actually already inside a valid * transaction, since XIDs are now not allocated until the transaction * does something. So it is safe to do a database lookup if we want to * issue a warning about XID wrap. */TransactionIdGetNewTransactionId(bool isSubXact){ TransactionId xid; /* * During bootstrap initialization, we return the special bootstrap * transaction id. */ if (IsBootstrapProcessingMode()) { Assert(!isSubXact); MyProc->xid = BootstrapTransactionId; return BootstrapTransactionId; } /* safety check, we should never get this far in a HS slave */ if (RecoveryInProgress()) elog(ERROR, "cannot assign TransactionIds during recovery"); LWLockAcquire(XidGenLock, LW_EXCLUSIVE); xid = ShmemVariableCache->nextXid; //fprintf(stderr,"In GetNewTransactionId--------1, xid is :%d\n",xid); /*---------- * Check to see if it's safe to assign another XID. This protects against * catastrophic data loss due to XID wraparound. The basic rules are: * * If we're past xidVacLimit, start trying to force autovacuum cycles. * If we're past xidWarnLimit, start issuing warnings. * If we're past xidStopLimit, refuse to execute transactions, unless * we are running in a standalone backend (which gives an escape hatch * to the DBA who somehow got past the earlier defenses). *---------- */ if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)) { /* * For safety's sake, we release XidGenLock while sending signals, * warnings, etc. This is not so much because we care about * preserving concurrency in this situation, as to avoid any * possibility of deadlock while doing get_database_name(). First, * copy all the shared values we'll need in this path. */ TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit; TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit; TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit; Oid oldest_datoid = ShmemVariableCache->oldestXidDB; LWLockRelease(XidGenLock); /* * To avoid swamping the postmaster with signals, we issue the autovac * request only once per 64K transaction starts. This still gives * plenty of chances before we get into real trouble. */ if (IsUnderPostmaster && (xid % 65536) == 0) SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER); if (IsUnderPostmaster && TransactionIdFollowsOrEquals(xid, xidStopLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"", oldest_datname), errhint("Stop the postmaster and use a standalone backend to vacuum that database.\n" "You might also need to commit or roll back old prepared transactions."))); else ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u", oldest_datoid), errhint("Stop the postmaster and use a standalone backend to vacuum that database.\n" "You might also need to commit or roll back old prepared transactions."))); } else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(WARNING, (errmsg("database \"%s\" must be vacuumed within %u transactions", oldest_datname, xidWrapLimit - xid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions."))); else ereport(WARNING, (errmsg("database with OID %u must be vacuumed within %u transactions", oldest_datoid, xidWrapLimit - xid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions."))); } /* Re-acquire lock and start over */ LWLockAcquire(XidGenLock, LW_EXCLUSIVE); xid = ShmemVariableCache->nextXid; } /* * If we are allocating the first XID of a new page of the commit log, * zero out that commit-log page before returning. We must do this while * holding XidGenLock, else another xact could acquire and commit a later * XID before we zero the page. Fortunately, a page of the commit log * holds 32K or more transactions, so we don't have to do this very often. * * Extend pg_subtrans too. */ ExtendCLOG(xid); ExtendSUBTRANS(xid); /* * Now advance the nextXid counter. This must not happen until after we * have successfully completed ExtendCLOG() --- if that routine fails, we * want the next incoming transaction to try it again. We cannot assign * more XIDs until there is CLOG space for them. */ TransactionIdAdvance(ShmemVariableCache->nextXid); /* * We must store the new XID into the shared ProcArray before releasing * XidGenLock. This ensures that every active XID older than * latestCompletedXid is present in the ProcArray, which is essential for * correct OldestXmin tracking; see src/backend/access/transam/README. * * XXX by storing xid into MyProc without acquiring ProcArrayLock, we are * relying on fetch/store of an xid to be atomic, else other backends * might see a partially-set xid here. But holding both locks at once * would be a nasty concurrency hit. So for now, assume atomicity. * * Note that readers of PGPROC xid fields should be careful to fetch the * value only once, rather than assume they can read a value multiple * times and get the same answer each time. * * The same comments apply to the subxact xid count and overflow fields. * * A solution to the atomic-store problem would be to give each PGPROC its * own spinlock used only for fetching/storing that PGPROC's xid and * related fields. * * If there's no room to fit a subtransaction XID into PGPROC, set the * cache-overflowed flag instead. This forces readers to look in * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a * race-condition window, in that the new XID will not appear as running * until its parent link has been placed into pg_subtrans. However, that * will happen before anyone could possibly have a reason to inquire about * the status of the XID, so it seems OK. (Snapshots taken during this * window *will* include the parent XID, so they will deliver the correct * answer later on when someone does have a reason to inquire.) */ { /* * Use volatile pointer to prevent code rearrangement; other backends * could be examining my subxids info concurrently, and we don't want * them to see an invalid intermediate state, such as incrementing * nxids before filling the array entry. Note we are assuming that * TransactionId and int fetch/store are atomic. */ volatile PGPROC *myproc = MyProc; if (!isSubXact) myproc->xid = xid; else { int nxids = myproc->subxids.nxids; if (nxids < PGPROC_MAX_CACHED_SUBXIDS) { myproc->subxids.xids[nxids] = xid; myproc->subxids.nxids = nxids + 1; } else myproc->subxids.overflowed = true; } } LWLockRelease(XidGenLock); //fprintf(stderr,"In GetNewTransactionId--------2, xid is :%d\n",xid); return xid;}
函数 GetNewTransactionId 为 AssignTransactionId 调用:
/* * AssignTransactionId * * Assigns a new permanent XID to the given TransactionState. * We do not assign XIDs to transactions until/unless this is called. * Also, any parent TransactionStates that don't yet have XIDs are assigned * one; this maintains the invariant that a child transaction has an XID * following its parent's. */static voidAssignTransactionId(TransactionState s){ fprintf(stderr,"---------------------In AssignTransactionId\n"); bool isSubXact = (s->parent != NULL); ResourceOwner currentOwner; /* Assert that caller didn't screw up */ Assert(!TransactionIdIsValid(s->transactionId)); Assert(s->state == TRANS_INPROGRESS); /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Musn't recurse here, or we might get a stack overflow * if we're at the bottom of a huge stack of subtransactions none of which * have XIDs yet. */ if (isSubXact && !TransactionIdIsValid(s->parent->transactionId)) { TransactionState p = s->parent; TransactionState *parents; size_t parentOffset = 0; parents = palloc(sizeof(TransactionState) * s->nestingLevel); while (p != NULL && !TransactionIdIsValid(p->transactionId)) { parents[parentOffset++] = p; p = p->parent; } /* * This is technically a recursive call, but the recursion will never * be more than one layer deep. */ while (parentOffset != 0) AssignTransactionId(parents[--parentOffset]); pfree(parents); } /* * Generate a new Xid and record it in PG_PROC and pg_subtrans. * * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in * shared storage other than PG_PROC; because if there's no room for it in * PG_PROC, the subtrans entry is needed to ensure that other backends see * the Xid as "running". See GetNewTransactionId. */ s->transactionId = GetNewTransactionId(isSubXact); fprintf(stderr,"In AssignTransactionId transaction is: %d \n",s->transactionId); if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); /* * If it's a top-level transaction, the predicate locking system needs to * be told about it too. */ if (!isSubXact) RegisterPredicateLockingXid(s->transactionId); /* * Acquire lock on the transaction XID. (We assume this cannot block.) We * have to ensure that the lock is assigned to the transaction's own * ResourceOwner. */ currentOwner = CurrentResourceOwner; PG_TRY(); { CurrentResourceOwner = s->curTransactionOwner; XactLockTableInsert(s->transactionId); } PG_CATCH(); { /* Ensure CurrentResourceOwner is restored on error */ CurrentResourceOwner = currentOwner; PG_RE_THROW(); } PG_END_TRY(); CurrentResourceOwner = currentOwner; /* * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each * top-level transaction we issue a WAL record for the assignment. We * include the top-level xid and all the subxids that have not yet been * reported using XLOG_XACT_ASSIGNMENT records. * * This is required to limit the amount of shared memory required in a hot * standby server to keep track of in-progress XIDs. See notes for * RecordKnownAssignedTransactionIds(). * * We don't keep track of the immediate parent of each subxid, only the * top-level transaction that each subxact belongs to. This is correct in * recovery only because aborted subtransactions are separately WAL * logged. */ if (isSubXact && XLogStandbyInfoActive()) { unreportedXids[nUnreportedXids] = s->transactionId; nUnreportedXids++; /* * ensure this test matches similar one in * RecoverPreparedTransactions() */ if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS) { XLogRecData rdata[2]; xl_xact_assignment xlrec; /* * xtop is always set by now because we recurse up transaction * stack to the highest unassigned xid and then come back down */ xlrec.xtop = GetTopTransactionId(); Assert(TransactionIdIsValid(xlrec.xtop)); xlrec.nsubxacts = nUnreportedXids; rdata[0].data = (char *) &xlrec; rdata[0].len = MinSizeOfXactAssignment; rdata[0].buffer = InvalidBuffer; rdata[0].next = &rdata[1]; rdata[1].data = (char *) unreportedXids; rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId); rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata); nUnreportedXids = 0; } }}
而 AssignTransactionId 函数,为 所调用
/* * GetCurrentTransactionId * * This will return the XID of the current transaction (main or sub * transaction), assigning one if it's not yet set. Be careful to call this * only inside a valid xact. */TransactionIdGetCurrentTransactionId(void){ TransactionState s = CurrentTransactionState; if (!TransactionIdIsValid(s->transactionId)) fprintf(stderr,"transaction id invalid.......\n"); else fprintf(stderr,"transaction id OK!.......\n"); if (!TransactionIdIsValid(s->transactionId)) AssignTransactionId(s); return s->transactionId;}
而 GetCurrentTransactionId 函数为 heap_insert 函数所调用:
/* * heap_insert - insert tuple into a heap * * The new tuple is stamped with current transaction ID and the specified * command ID. * * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not * logged in WAL, even for a non-temp relation. Safe usage of this behavior * requires that we arrange that all new tuples go into new pages not * containing any tuples from other transactions, and that the relation gets * fsync'd before commit. (See also heap_sync() comments) * * The HEAP_INSERT_SKIP_FSM option is passed directly to * RelationGetBufferForTuple, which see for more info. * * Note that these options will be applied when inserting into the heap's * TOAST table, too, if the tuple requires any out-of-line data. * * The BulkInsertState object (if any; bistate can be NULL for default * behavior) is also just passed through to RelationGetBufferForTuple. * * The return value is the OID assigned to the tuple (either here or by the * caller), or InvalidOid if no OID. The header fields of *tup are updated * to match the stored tuple; in particular tup->t_self receives the actual * TID where the tuple was stored. But note that any toasting of fields * within the tuple data is NOT reflected into *tup. */Oidheap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate){ fprintf(stderr,"In heap_insert------------------------------1\n"); TransactionId xid = GetCurrentTransactionId(); fprintf(stderr,"xid is :%d.......\n",(int)xid); HeapTuple heaptup; Buffer buffer; bool all_visible_cleared = false; if (relation->rd_rel->relhasoids) {#ifdef NOT_USED /* this is redundant with an Assert in HeapTupleSetOid */ Assert(tup->t_data->t_infomask & HEAP_HASOID);#endif /* * If the object id of this tuple has already been assigned, trust the * caller. There are a couple of ways this can happen. At initial db * creation, the backend program sets oids for tuples. When we define * an index, we set the oid. Finally, in the future, we may allow * users to set their own object ids in order to support a persistent * object store (objects need to contain pointers to one another). */ if (!OidIsValid(HeapTupleGetOid(tup))) HeapTupleSetOid(tup, GetNewOid(relation)); } else { /* check there is not space for an OID */ Assert(!(tup->t_data->t_infomask & HEAP_HASOID)); } tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; HeapTupleHeaderSetXmin(tup->t_data, xid); HeapTupleHeaderSetCmin(tup->t_data, cid); HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */ tup->t_tableOid = RelationGetRelid(relation); /* * If the new tuple is too big for storage or contains already toasted * out-of-line attributes from some other relation, invoke the toaster. * * Note: below this point, heaptup is the data we actually intend to store * into the relation; tup is the caller's original untoasted data. */ if (relation->rd_rel->relkind != RELKIND_RELATION) { /* toast table entries should never be recursively toasted */ Assert(!HeapTupleHasExternal(tup)); heaptup = tup; } else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) heaptup = toast_insert_or_update(relation, tup, NULL, options); else heaptup = tup; /* * We're about to do the actual insert -- but check for conflict first, * to avoid possibly having to roll back work we've just done. * * For a heap insert, we only need to check for table-level SSI locks. * Our new tuple can't possibly conflict with existing tuple locks, and * heap page locks are only consolidated versions of tuple locks; they do * not lock "gaps" as index page locks do. So we don't need to identify * a buffer before making the call. */ CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); /* Find buffer to insert this tuple into */ buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); RelationPutHeapTuple(relation, buffer, heaptup); if (PageIsAllVisible(BufferGetPage(buffer))) { all_visible_cleared = true; PageClearAllVisible(BufferGetPage(buffer)); } /* * XXX Should we set PageSetPrunable on this page ? * * The inserting transaction may eventually abort thus making this tuple * DEAD and hence available for pruning. Though we don't want to optimize * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the * aborted tuple will never be pruned until next vacuum is triggered. * * If you do add PageSetPrunable here, add it in heap_xlog_insert too. */ MarkBufferDirty(buffer); /* XLOG stuff */ if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation)) { xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; XLogRecData rdata[3]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapInsert; rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); xlhdr.t_infomask2 = heaptup->t_data->t_infomask2; xlhdr.t_infomask = heaptup->t_data->t_infomask; xlhdr.t_hoff = heaptup->t_data->t_hoff; /* * note we mark rdata[1] as belonging to buffer; if XLogInsert decides * to write the whole page to the xlog, we don't need to store * xl_heap_header in the xlog. */ rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; rdata[1].buffer = buffer; rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); rdata[2].buffer = buffer; rdata[2].buffer_std = true; rdata[2].next = NULL; /* * If this is the single and first tuple on page, we can reinit the * page instead of restoring the whole thing. Set flag, and hide * buffer references from XLogInsert. */ if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; rdata[1].buffer = rdata[2].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); UnlockReleaseBuffer(buffer); /* Clear the bit in the visibility map if necessary */ if (all_visible_cleared) visibilitymap_clear(relation, ItemPointerGetBlockNumber(&(heaptup->t_self))); /* * If tuple is cachable, mark it for invalidation from the caches in case * we abort. Note it is OK to do this after releasing the buffer, because * the heaptup data structure is all in local memory, not in the shared * buffer. */ CacheInvalidateHeapTuple(relation, heaptup); pgstat_count_heap_insert(relation); /* * If heaptup is a private copy, release it. Don't forget to copy t_self * back to the caller's image, too. */ if (heaptup != tup) { tup->t_self = heaptup->t_self; heap_freetuple(heaptup); } fprintf(stderr,"In heap_insert------------------------------2\n"); return HeapTupleGetOid(tup);}
而heap_insert 函数为 所调用:
/* ---------------------------------------------------------------- * ExecInsert * * For INSERT, we have to insert the tuple into the target relation * and insert appropriate tuples into the index relations. * * Returns RETURNING result if any, otherwise NULL. * ---------------------------------------------------------------- */static TupleTableSlot *ExecInsert(TupleTableSlot *slot, TupleTableSlot *planSlot, EState *estate, bool canSetTag){ HeapTuple tuple; ResultRelInfo *resultRelInfo; Relation resultRelationDesc; Oid newId; List *recheckIndexes = NIL; /** if (slot == NULL) fprintf(stderr,"---In ExecInsert...slot is null\n"); else fprintf(stderr,"---In ExecInsert...slot is not null\n"); fprintf(stderr,"IN ExecInsert-----------------------------100\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; if (htp == NULL) fprintf(stderr,"htp is NULL\n"); else fprintf(stderr,"htp is NOT NULL\n"); } fprintf(stderr,"--------------------------------------------\n"); */ // /* * get the heap tuple out of the tuple table slot, making sure we have a * writable copy */ tuple = ExecMaterializeSlot(slot); fprintf(stderr,"--------------------------------------------150\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { ///fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; HeapTupleHeader theader = htp->t_data; if (theader == NULL) fprintf(stderr,"heap tuple header is NULL\n"); else{ ////fprintf(stderr,"heap tuple header is NOT NULL\n"); HeapTupleFields htfds = theader->t_choice.t_heap; TransactionId txmin = htfds.t_xmin; TransactionId txmax = htfds.t_xmax; fprintf(stderr,"t_xmin is :%d ", (int)txmin ); fprintf(stderr,"t_xmax is :%d \n", (int)txmax ); } } /* * get information on the (current) result relation */ resultRelInfo = estate->es_result_relation_info; resultRelationDesc = resultRelInfo->ri_RelationDesc; /* * If the result relation has OIDs, force the tuple's OID to zero so that * heap_insert will assign a fresh OID. Usually the OID already will be * zero at this point, but there are corner cases where the plan tree can * return a tuple extracted literally from some table with the same * rowtype. * * XXX if we ever wanted to allow users to assign their own OIDs to new * rows, this'd be the place to do it. For the moment, we make a point of * doing this before calling triggers, so that a user-supplied trigger * could hack the OID if desired. */ if (resultRelationDesc->rd_rel->relhasoids) HeapTupleSetOid(tuple, InvalidOid); /* BEFORE ROW INSERT Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) { slot = ExecBRInsertTriggers(estate, resultRelInfo, slot); if (slot == NULL) /* "do nothing" */ return NULL; /* trigger might have changed tuple */ tuple = ExecMaterializeSlot(slot); } fprintf(stderr,"--------------------------------------------200\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { ///fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; HeapTupleHeader theader = htp->t_data; if (theader == NULL) fprintf(stderr,"heap tuple header is NULL\n"); else{ ////fprintf(stderr,"heap tuple header is NOT NULL\n"); HeapTupleFields htfds = theader->t_choice.t_heap; TransactionId txmin = htfds.t_xmin; TransactionId txmax = htfds.t_xmax; fprintf(stderr,"t_xmin is :%d ", (int)txmin ); fprintf(stderr,"t_xmax is :%d \n", (int)txmax ); } } /* INSTEAD OF ROW INSERT Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row) { ////fprintf(stderr,"x--------------------------1\n"); slot = ExecIRInsertTriggers(estate, resultRelInfo, slot); if (slot == NULL) /* "do nothing" */ return NULL; /* trigger might have changed tuple */ tuple = ExecMaterializeSlot(slot); newId = InvalidOid; } else { fprintf(stderr,"x--------------------------2\n"); /* * Check the constraints of the tuple */ if (resultRelationDesc->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); fprintf(stderr,"IN ExecInsert-----------------------------210\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { ///fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; HeapTupleHeader theader = htp->t_data; if (theader == NULL) fprintf(stderr,"heap tuple header is NULL\n"); else{ ///fprintf(stderr,"heap tuple header is NOT NULL\n"); HeapTupleFields htfds = theader->t_choice.t_heap; TransactionId txmin = htfds.t_xmin; TransactionId txmax = htfds.t_xmax; fprintf(stderr,"t_xmin is :%d ", (int)txmin ); fprintf(stderr,"t_xmax is :%d \n", (int)txmax ); /** if ( htfds == NULL ) fprintf(stderr,"t_heap is NULL\n"); else fprintf(stderr,"t_heap is not NULL\n"); */ } } /* * insert the tuple * * Note: heap_insert returns the tid (location) of the new tuple in * the t_self field. */ newId = heap_insert(resultRelationDesc, tuple, estate->es_output_cid, 0, NULL); fprintf(stderr,"IN ExecInsert-----------------------------230\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { ///fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; HeapTupleHeader theader = htp->t_data; if (theader == NULL) fprintf(stderr,"heap tuple header is NULL\n"); else{ ///fprintf(stderr,"heap tuple header is NOT NULL\n"); HeapTupleFields htfds = theader->t_choice.t_heap; TransactionId txmin = htfds.t_xmin; TransactionId txmax = htfds.t_xmax; fprintf(stderr,"t_xmin is :%d ", (int)txmin ); fprintf(stderr,"t_xmax is :%d \n", (int)txmax ); /** if ( htfds == NULL ) fprintf(stderr,"t_heap is NULL\n"); else fprintf(stderr,"t_heap is not NULL\n"); */ } } /* * insert index entries for tuple */ if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate); } fprintf(stderr,"IN ExecInsert-----------------------------250\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { ///fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; HeapTupleHeader theader = htp->t_data; if (theader == NULL) fprintf(stderr,"heap tuple header is NULL\n"); else{ ///fprintf(stderr,"heap tuple header is NOT NULL\n"); HeapTupleFields htfds = theader->t_choice.t_heap; TransactionId txmin = htfds.t_xmin; TransactionId txmax = htfds.t_xmax; fprintf(stderr,"t_xmin is :%d ", (int)txmin ); fprintf(stderr,"t_xmax is :%d \n", (int)txmax ); /** if ( htfds == NULL ) fprintf(stderr,"t_heap is NULL\n"); else fprintf(stderr,"t_heap is not NULL\n"); */ } } if (canSetTag) { (estate->es_processed)++; estate->es_lastoid = newId; setLastTid(&(tuple->t_self)); } /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, tuple, recheckIndexes); fprintf(stderr,"IN ExecInsert-----------------------------300\n"); if (slot->tts_isempty) fprintf(stderr,"slot. tts_isempty!\n"); else { ///fprintf(stderr,"slot, tts not empty!\n"); HeapTuple htp = slot->tts_tuple; HeapTupleHeader theader = htp->t_data; if (theader == NULL) fprintf(stderr,"heap tuple header is NULL\n"); else{ ///fprintf(stderr,"heap tuple header is NOT NULL\n"); HeapTupleFields htfds = theader->t_choice.t_heap; TransactionId txmin = htfds.t_xmin; TransactionId txmax = htfds.t_xmax; fprintf(stderr,"t_xmin is :%d ", (int)txmin ); fprintf(stderr,"t_xmax is :%d \n", (int)txmax ); /** if ( htfds == NULL ) fprintf(stderr,"t_heap is NULL\n"); else fprintf(stderr,"t_heap is not NULL\n"); */ } } list_free(recheckIndexes); /* Process RETURNING if present */ if (resultRelInfo->ri_projectReturning) return ExecProcessReturning(resultRelInfo->ri_projectReturning, slot, planSlot); return NULL;}
接着要看这个:
typedef struct VariableCacheData{ /* * These fields are protected by OidGenLock. */ Oid nextOid; /* next OID to assign */ uint32 oidCount; /* OIDs available before must do XLOG work */ /* * These fields are protected by XidGenLock. */ TransactionId nextXid; /* next XID to assign */ TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */ TransactionId xidVacLimit; /* start forcing autovacuums here */ TransactionId xidWarnLimit; /* start complaining here */ TransactionId xidStopLimit; /* refuse to advance nextXid beyond here */ TransactionId xidWrapLimit; /* where the world ends */ Oid oldestXidDB; /* database with minimum datfrozenxid */ /* * These fields are protected by ProcArrayLock. */ TransactionId latestCompletedXid; /* newest XID that has committed or * aborted */} VariableCacheData;typedef VariableCacheData *VariableCache;
现在已经知道,xmin是从 ShmemVairableCache->nextXid得来。
而且,即使数据库重新启动后,xmin也能在上一次的基础上继续增加。
可以知道,对其应当是进行了初始化。
而xmin的来源是 checkPoint数据,它是从数据库文件中来。
目前所知:调用关系
PostmasterMain-->StartupDataBase
StartupDataBase是一个宏,展开后是 : StartChildProcess(StartupProcess)
StartupProcess-->StartupXLOG
在StartupXLOG中,有如下的代码:
/* * Get the last valid checkpoint record. If the latest one according * to pg_control is broken, try the next-to-last one. */ checkPointLoc = ControlFile->checkPoint; RedoStartLSN = ControlFile->checkPointCopy.redo; record = ReadCheckpointRecord(checkPointLoc, 1);
ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid;
为何说 ShmemVariableCache是在共享内存中呢,下面代码会有所启示:
/* * InitShmemAllocation() --- set up shared-memory space allocation. * * This should be called only in the postmaster or a standalone backend. */voidInitShmemAllocation(void){ fprintf(stderr,"In InitShmemAllocation.....start by process %d\n",getpid()); PGShmemHeader *shmhdr = ShmemSegHdr; Assert(shmhdr != NULL); /* * Initialize the spinlock used by ShmemAlloc. We have to do the space * allocation the hard way, since obviously ShmemAlloc can't be called * yet. */ ShmemLock = (slock_t *) (((char *) shmhdr) + shmhdr->freeoffset); shmhdr->freeoffset += MAXALIGN(sizeof(slock_t)); Assert(shmhdr->freeoffset <= shmhdr->totalsize); SpinLockInit(ShmemLock); /* ShmemIndex can't be set up yet (need LWLocks first) */ shmhdr->index = NULL; ShmemIndex = (HTAB *) NULL; /* * Initialize ShmemVariableCache for transaction manager. (This doesn't * really belong here, but not worth moving.) */ ShmemVariableCache = (VariableCache) ShmemAlloc(sizeof(*ShmemVariableCache)); memset(ShmemVariableCache, 0, sizeof(*ShmemVariableCache)); fprintf(stderr,"When Initiating, nextXid is: %d \n", (int)ShmemVariableCache->nextXid); fprintf(stderr,"In InitShmemAllocation.....end by process %d\n\n",getpid());}
而ShmemAlloc是关键:
/* * ShmemAlloc -- allocate max-aligned chunk from shared memory * * Assumes ShmemLock and ShmemSegHdr are initialized. * * Returns: real pointer to memory or NULL if we are out * of space. Has to return a real pointer in order * to be compatible with malloc(). */void *ShmemAlloc(Size size){ Size newStart; Size newFree; void *newSpace; /* use volatile pointer to prevent code rearrangement */ volatile PGShmemHeader *shmemseghdr = ShmemSegHdr; /* * ensure all space is adequately aligned. */ size = MAXALIGN(size); Assert(shmemseghdr != NULL); SpinLockAcquire(ShmemLock); newStart = shmemseghdr->freeoffset; /* extra alignment for large requests, since they are probably buffers */ if (size >= BLCKSZ) newStart = BUFFERALIGN(newStart); newFree = newStart + size; if (newFree <= shmemseghdr->totalsize) { newSpace = (void *) ((char *) ShmemBase + newStart); shmemseghdr->freeoffset = newFree; } else newSpace = NULL; SpinLockRelease(ShmemLock); if (!newSpace) ereport(WARNING, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); return newSpace;}
/* shared memory global variables */static PGShmemHeader *ShmemSegHdr; /* shared mem segment header */static void *ShmemBase; /* start address of shared memory */static void *ShmemEnd; /* end+1 address of shared memory */slock_t *ShmemLock; /* spinlock for shared memory and LWLock * allocation */static HTAB *ShmemIndex = NULL; /* primary index hashtable for shmem *//* * InitShmemAccess() --- set up basic pointers to shared memory. * * Note: the argument should be declared "PGShmemHeader *seghdr", * but we use void to avoid having to include ipc.h in shmem.h. */voidInitShmemAccess(void *seghdr){ PGShmemHeader *shmhdr = (PGShmemHeader *) seghdr; ShmemSegHdr = shmhdr; ShmemBase = (void *) shmhdr; ShmemEnd = (char *) ShmemBase + shmhdr->totalsize;}
数据库系统启动的时候的情形已经有所了解了。那么运行中,transaction id 是如何递增的呢。如果我运行两次 GetNewTransactionId,就可以发现 transactionid 每次加2了。
/* * AssignTransactionId * * Assigns a new permanent XID to the given TransactionState. * We do not assign XIDs to transactions until/unless this is called. * Also, any parent TransactionStates that don't yet have XIDs are assigned * one; this maintains the invariant that a child transaction has an XID * following its parent's. */static voidAssignTransactionId(TransactionState s){ fprintf(stderr,"************---------------------In AssignTransactionId..start by process %d\n",getpid()); bool isSubXact = (s->parent != NULL); ResourceOwner currentOwner; /* Assert that caller didn't screw up */ Assert(!TransactionIdIsValid(s->transactionId)); Assert(s->state == TRANS_INPROGRESS); /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Musn't recurse here, or we might get a stack overflow * if we're at the bottom of a huge stack of subtransactions none of which * have XIDs yet. */ if (isSubXact && !TransactionIdIsValid(s->parent->transactionId)) { TransactionState p = s->parent; TransactionState *parents; size_t parentOffset = 0; parents = palloc(sizeof(TransactionState) * s->nestingLevel); while (p != NULL && !TransactionIdIsValid(p->transactionId)) { parents[parentOffset++] = p; p = p->parent; } /* * This is technically a recursive call, but the recursion will never * be more than one layer deep. */ while (parentOffset != 0) AssignTransactionId(parents[--parentOffset]); pfree(parents); } /* * Generate a new Xid and record it in PG_PROC and pg_subtrans. * * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in * shared storage other than PG_PROC; because if there's no room for it in * PG_PROC, the subtrans entry is needed to ensure that other backends see * the Xid as "running". See GetNewTransactionId. */ s->transactionId = GetNewTransactionId(isSubXact); #####added by gaojian fprintf(stderr,"In AssignTransactionId ....1.... transaction is: %d \n",s->transactionId); s->transactionId = GetNewTransactionId(isSubXact); fprintf(stderr,"In AssignTransactionId ....2.... transaction is: %d \n",s->transactionId); /////#####added by gaojian if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); /* * If it's a top-level transaction, the predicate locking system needs to * be told about it too. */ if (!isSubXact) RegisterPredicateLockingXid(s->transactionId); /* * Acquire lock on the transaction XID. (We assume this cannot block.) We * have to ensure that the lock is assigned to the transaction's own * ResourceOwner. */ currentOwner = CurrentResourceOwner; PG_TRY(); { CurrentResourceOwner = s->curTransactionOwner; XactLockTableInsert(s->transactionId); } PG_CATCH(); { /* Ensure CurrentResourceOwner is restored on error */ CurrentResourceOwner = currentOwner; PG_RE_THROW(); } PG_END_TRY(); CurrentResourceOwner = currentOwner; /* * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each * top-level transaction we issue a WAL record for the assignment. We * include the top-level xid and all the subxids that have not yet been * reported using XLOG_XACT_ASSIGNMENT records. * * This is required to limit the amount of shared memory required in a hot * standby server to keep track of in-progress XIDs. See notes for * RecordKnownAssignedTransactionIds(). * * We don't keep track of the immediate parent of each subxid, only the * top-level transaction that each subxact belongs to. This is correct in * recovery only because aborted subtransactions are separately WAL * logged. */ if (isSubXact && XLogStandbyInfoActive()) { unreportedXids[nUnreportedXids] = s->transactionId; nUnreportedXids++; /* * ensure this test matches similar one in * RecoverPreparedTransactions() */ if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS) { XLogRecData rdata[2]; xl_xact_assignment xlrec; /* * xtop is always set by now because we recurse up transaction * stack to the highest unassigned xid and then come back down */ xlrec.xtop = GetTopTransactionId(); Assert(TransactionIdIsValid(xlrec.xtop)); xlrec.nsubxacts = nUnreportedXids; rdata[0].data = (char *) &xlrec; rdata[0].len = MinSizeOfXactAssignment; rdata[0].buffer = InvalidBuffer; rdata[0].next = &rdata[1]; rdata[1].data = (char *) unreportedXids; rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId); rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata); nUnreportedXids = 0; } } fprintf(stderr,"---------------------In AssignTransactionId end..by process %d\n\n",getpid());}
关键在这个:GetNewTransactionId函数中,调用了 TransactionIdAdvance(ShmemVariableCache->nextXid)
/* * Allocate the next XID for a new transaction or subtransaction. * * The new XID is also stored into MyProc before returning. * * Note: when this is called, we are actually already inside a valid * transaction, since XIDs are now not allocated until the transaction * does something. So it is safe to do a database lookup if we want to * issue a warning about XID wrap. */TransactionIdGetNewTransactionId(bool isSubXact){ fprintf(stderr,"*********In GetNewTransactionId.....start by process %d\n",getpid()); TransactionId xid; /* * During bootstrap initialization, we return the special bootstrap * transaction id. */ if (IsBootstrapProcessingMode()) { Assert(!isSubXact); MyProc->xid = BootstrapTransactionId; return BootstrapTransactionId; } /* safety check, we should never get this far in a HS slave */ if (RecoveryInProgress()) elog(ERROR, "cannot assign TransactionIds during recovery"); LWLockAcquire(XidGenLock, LW_EXCLUSIVE); xid = ShmemVariableCache->nextXid; fprintf(stderr,"In GetNewTransactionId--------1, xid is :%d\n",xid); /*---------- * Check to see if it's safe to assign another XID. This protects against * catastrophic data loss due to XID wraparound. The basic rules are: * * If we're past xidVacLimit, start trying to force autovacuum cycles. * If we're past xidWarnLimit, start issuing warnings. * If we're past xidStopLimit, refuse to execute transactions, unless * we are running in a standalone backend (which gives an escape hatch * to the DBA who somehow got past the earlier defenses). *---------- */ if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)) { /* * For safety's sake, we release XidGenLock while sending signals, * warnings, etc. This is not so much because we care about * preserving concurrency in this situation, as to avoid any * possibility of deadlock while doing get_database_name(). First, * copy all the shared values we'll need in this path. */ TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit; TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit; TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit; Oid oldest_datoid = ShmemVariableCache->oldestXidDB; LWLockRelease(XidGenLock); /* * To avoid swamping the postmaster with signals, we issue the autovac * request only once per 64K transaction starts. This still gives * plenty of chances before we get into real trouble. */ if (IsUnderPostmaster && (xid % 65536) == 0) SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER); if (IsUnderPostmaster && TransactionIdFollowsOrEquals(xid, xidStopLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"", oldest_datname), errhint("Stop the postmaster and use a standalone backend to vacuum that database.\n" "You might also need to commit or roll back old prepared transactions."))); else ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u", oldest_datoid), errhint("Stop the postmaster and use a standalone backend to vacuum that database.\n" "You might also need to commit or roll back old prepared transactions."))); } else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(WARNING, (errmsg("database \"%s\" must be vacuumed within %u transactions", oldest_datname, xidWrapLimit - xid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions."))); else ereport(WARNING, (errmsg("database with OID %u must be vacuumed within %u transactions", oldest_datoid, xidWrapLimit - xid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions."))); } /* Re-acquire lock and start over */ LWLockAcquire(XidGenLock, LW_EXCLUSIVE); xid = ShmemVariableCache->nextXid; } /* * If we are allocating the first XID of a new page of the commit log, * zero out that commit-log page before returning. We must do this while * holding XidGenLock, else another xact could acquire and commit a later * XID before we zero the page. Fortunately, a page of the commit log * holds 32K or more transactions, so we don't have to do this very often. * * Extend pg_subtrans too. */ ExtendCLOG(xid); ExtendSUBTRANS(xid); /* * Now advance the nextXid counter. This must not happen until after we * have successfully completed ExtendCLOG() --- if that routine fails, we * want the next incoming transaction to try it again. We cannot assign * more XIDs until there is CLOG space for them. */ TransactionIdAdvance(ShmemVariableCache->nextXid); /* * We must store the new XID into the shared ProcArray before releasing * XidGenLock. This ensures that every active XID older than * latestCompletedXid is present in the ProcArray, which is essential for * correct OldestXmin tracking; see src/backend/access/transam/README. * * XXX by storing xid into MyProc without acquiring ProcArrayLock, we are * relying on fetch/store of an xid to be atomic, else other backends * might see a partially-set xid here. But holding both locks at once * would be a nasty concurrency hit. So for now, assume atomicity. * * Note that readers of PGPROC xid fields should be careful to fetch the * value only once, rather than assume they can read a value multiple * times and get the same answer each time. * * The same comments apply to the subxact xid count and overflow fields. * * A solution to the atomic-store problem would be to give each PGPROC its * own spinlock used only for fetching/storing that PGPROC's xid and * related fields. * * If there's no room to fit a subtransaction XID into PGPROC, set the * cache-overflowed flag instead. This forces readers to look in * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a * race-condition window, in that the new XID will not appear as running * until its parent link has been placed into pg_subtrans. However, that * will happen before anyone could possibly have a reason to inquire about * the status of the XID, so it seems OK. (Snapshots taken during this * window *will* include the parent XID, so they will deliver the correct * answer later on when someone does have a reason to inquire.) */ { /* * Use volatile pointer to prevent code rearrangement; other backends * could be examining my subxids info concurrently, and we don't want * them to see an invalid intermediate state, such as incrementing * nxids before filling the array entry. Note we are assuming that * TransactionId and int fetch/store are atomic. */ volatile PGPROC *myproc = MyProc; if (!isSubXact) myproc->xid = xid; else { int nxids = myproc->subxids.nxids; if (nxids < PGPROC_MAX_CACHED_SUBXIDS) { myproc->subxids.xids[nxids] = xid; myproc->subxids.nxids = nxids + 1; } else myproc->subxids.overflowed = true; } } LWLockRelease(XidGenLock); fprintf(stderr,"****************In GetNewTransactionId...xid is:%d..end by process %d\n\n",xid,getpid()); return xid;}
转载地址:http://exqyx.baihongyu.com/