diff --git a/packages/executor/src/execution/IncrementalGraph.ts b/packages/executor/src/execution/IncrementalGraph.ts index eb7de40ab32..0a4dc5a5632 100644 --- a/packages/executor/src/execution/IncrementalGraph.ts +++ b/packages/executor/src/execution/IncrementalGraph.ts @@ -1,10 +1,12 @@ import type { GraphQLError } from 'graphql'; import { isPromise } from '@graphql-tools/utils'; import { BoxedPromiseOrValue } from './BoxedPromiseOrValue.js'; +import { invariant } from './invariant.js'; import { promiseWithResolvers } from './promiseWithResolvers.js'; import type { DeferredFragmentRecord, DeferredGroupedFieldSetRecord, + DeferredGroupedFieldSetResult, IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, @@ -12,118 +14,51 @@ import type { StreamRecord, SubsequentResultRecord, } from './types.js'; -import { isDeferredGroupedFieldSetRecord } from './types.js'; - -interface DeferredFragmentNode { - deferredFragmentRecord: DeferredFragmentRecord; - deferredGroupedFieldSetRecords: Set; - reconcilableResults: Set; - children: Array; -} - -function isDeferredFragmentNode( - node: DeferredFragmentNode | undefined, -): node is DeferredFragmentNode { - return node !== undefined; -} - -function isStreamNode( - record: SubsequentResultNode | IncrementalDataRecord, -): record is StreamRecord { - return 'streamItemQueue' in record; -} - -type SubsequentResultNode = DeferredFragmentNode | StreamRecord; +import { isDeferredFragmentRecord, isDeferredGroupedFieldSetRecord } from './types.js'; /** * @internal */ export class IncrementalGraph { - private _pending: Set; - private _deferredFragmentNodes: Map; + private _rootNodes: Set; - private _newPending: Set; - private _newIncrementalDataRecords: Set; private _completedQueue: Array; private _nextQueue: Array< (iterable: IteratorResult>) => void >; constructor() { - this._pending = new Set(); - this._deferredFragmentNodes = new Map(); - this._newIncrementalDataRecords = new Set(); - this._newPending = new Set(); + this._rootNodes = new Set(); this._completedQueue = []; this._nextQueue = []; } - addIncrementalDataRecords(incrementalDataRecords: ReadonlyArray): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { - this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); - } else { - this._addStreamRecord(incrementalDataRecord); - } - } + getNewPending( + incrementalDataRecords: ReadonlyArray, + ): ReadonlyArray { + const initialResultChildren = new Set(); + this._addIncrementalDataRecords(incrementalDataRecords, undefined, initialResultChildren); + return this._promoteNonEmptyToRoot(initialResultChildren); } addCompletedReconcilableDeferredGroupedFieldSet( reconcilableResult: ReconcilableDeferredGroupedFieldSetResult, ): void { - const deferredFragmentNodes: Array = - reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords - .map(deferredFragmentRecord => this._deferredFragmentNodes.get(deferredFragmentRecord)) - .filter(isDeferredFragmentNode); - for (const deferredFragmentNode of deferredFragmentNodes) { - deferredFragmentNode.deferredGroupedFieldSetRecords.delete( + for (const deferredFragmentRecord of reconcilableResult.deferredGroupedFieldSetRecord + .deferredFragmentRecords) { + deferredFragmentRecord.deferredGroupedFieldSetRecords.delete( reconcilableResult.deferredGroupedFieldSetRecord, ); - deferredFragmentNode.reconcilableResults.add(reconcilableResult); - } - } - - getNewPending(): ReadonlyArray { - const newPending: Array = []; - for (const node of this._newPending) { - if (isStreamNode(node)) { - this._pending.add(node); - newPending.push(node); - this._newIncrementalDataRecords.add(node); - } else if (node.deferredGroupedFieldSetRecords.size > 0) { - for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); - } - this._pending.add(node); - newPending.push(node.deferredFragmentRecord); - } else { - for (const child of node.children) { - this._newPending.add(child); - } - } + deferredFragmentRecord.reconcilableResults.add(reconcilableResult); } - this._newPending.clear(); - for (const incrementalDataRecord of this._newIncrementalDataRecords) { - if (isStreamNode(incrementalDataRecord)) { - this._onStreamItems(incrementalDataRecord, incrementalDataRecord.streamItemQueue); - } else { - const deferredGroupedFieldSetResult = incrementalDataRecord.result; - const result = - deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue - ? deferredGroupedFieldSetResult.value - : deferredGroupedFieldSetResult().value; - - if (isPromise(result)) { - result.then(resolved => this._enqueue(resolved)); - } else { - this._enqueue(result); - } - } + const incrementalDataRecords = reconcilableResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + this._addIncrementalDataRecords( + incrementalDataRecords, + reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords, + ); } - this._newIncrementalDataRecords.clear(); - - return newPending; } completedIncrementalData() { @@ -154,53 +89,38 @@ export class IncrementalGraph { } hasNext(): boolean { - return this._pending.size > 0; + return this._rootNodes.size > 0; } - completeDeferredFragment( - deferredFragmentRecord: DeferredFragmentRecord, - ): Array | undefined { - const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord); - // TODO: add test case? - /* c8 ignore next 3 */ - if (deferredFragmentNode === undefined) { - return undefined; - } - if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) { + completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): + | { + newPending: ReadonlyArray; + reconcilableResults: ReadonlyArray; + } + | undefined { + if ( + !this._rootNodes.has(deferredFragmentRecord) || + deferredFragmentRecord.deferredGroupedFieldSetRecords.size > 0 + ) { return; } - const reconcilableResults = Array.from(deferredFragmentNode.reconcilableResults); + const reconcilableResults = Array.from(deferredFragmentRecord.reconcilableResults); + this._removePending(deferredFragmentRecord); for (const reconcilableResult of reconcilableResults) { for (const otherDeferredFragmentRecord of reconcilableResult.deferredGroupedFieldSetRecord .deferredFragmentRecords) { - const otherDeferredFragmentNode = this._deferredFragmentNodes.get( - otherDeferredFragmentRecord, - ); - if (otherDeferredFragmentNode === undefined) { - continue; - } - otherDeferredFragmentNode.reconcilableResults.delete(reconcilableResult); + otherDeferredFragmentRecord.reconcilableResults.delete(reconcilableResult); } } - this._removePending(deferredFragmentNode); - for (const child of deferredFragmentNode.children) { - this._newPending.add(child); - } - return reconcilableResults; + const newPending = this._promoteNonEmptyToRoot(deferredFragmentRecord.children); + return { newPending, reconcilableResults }; } removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): boolean { - const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord); - if (deferredFragmentNode === undefined) { + if (!this._rootNodes.has(deferredFragmentRecord)) { return false; } - this._removePending(deferredFragmentNode); - this._deferredFragmentNodes.delete(deferredFragmentRecord); - // TODO: add test case for an erroring deferred fragment with child defers - /* c8 ignore next 3 */ - for (const child of deferredFragmentNode.children) { - this.removeDeferredFragment(child.deferredFragmentRecord); - } + this._removePending(deferredFragmentRecord); return true; } @@ -208,66 +128,120 @@ export class IncrementalGraph { this._removePending(streamRecord); } - private _removePending(subsequentResultNode: SubsequentResultNode): void { - this._pending.delete(subsequentResultNode); - if (this._pending.size === 0) { + private _removePending(subsequentResultRecord: SubsequentResultRecord): void { + this._rootNodes.delete(subsequentResultRecord); + if (this._rootNodes.size === 0) { for (const resolve of this._nextQueue) { resolve({ value: undefined, done: true }); } } } - private _addDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + private _addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + parents: ReadonlyArray | undefined, + initialResultChildren?: Set | undefined, ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - const deferredFragmentNode = this._addDeferredFragmentNode(deferredFragmentRecord); - if (this._pending.has(deferredFragmentNode)) { - this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord); + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { + this._addDeferredFragment(deferredFragmentRecord, initialResultChildren); + deferredFragmentRecord.deferredGroupedFieldSetRecords.add(incrementalDataRecord); + } + if (this._hasPendingFragment(incrementalDataRecord)) { + this._onDeferredGroupedFieldSet(incrementalDataRecord); + } + } else if (parents === undefined) { + invariant(initialResultChildren !== undefined); + initialResultChildren.add(incrementalDataRecord); + } else { + for (const parent of parents) { + this._addDeferredFragment(parent, initialResultChildren); + parent.children.add(incrementalDataRecord); + } } - deferredFragmentNode.deferredGroupedFieldSetRecords.add(deferredGroupedFieldSetRecord); } } - private _addStreamRecord(streamRecord: StreamRecord): void { - this._newPending.add(streamRecord); + private _promoteNonEmptyToRoot( + maybeEmptyNewPending: Set, + ): ReadonlyArray { + const newPending: Array = []; + for (const subsequentResultRecord of maybeEmptyNewPending) { + if (isDeferredFragmentRecord(subsequentResultRecord)) { + if (subsequentResultRecord.deferredGroupedFieldSetRecords.size > 0) { + subsequentResultRecord.setAsPending(); + for (const deferredGroupedFieldSetRecord of subsequentResultRecord.deferredGroupedFieldSetRecords) { + if (!this._hasPendingFragment(deferredGroupedFieldSetRecord)) { + this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord); + } + } + this._rootNodes.add(subsequentResultRecord); + newPending.push(subsequentResultRecord); + continue; + } + for (const child of subsequentResultRecord.children) { + maybeEmptyNewPending.add(child); + } + } else { + this._rootNodes.add(subsequentResultRecord); + newPending.push(subsequentResultRecord); + + this._onStreamItems(subsequentResultRecord); + } + } + return newPending; + } + + private _hasPendingFragment( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): boolean { + return deferredGroupedFieldSetRecord.deferredFragmentRecords.some(deferredFragmentRecord => + this._rootNodes.has(deferredFragmentRecord), + ); } - private _addDeferredFragmentNode( + private _addDeferredFragment( deferredFragmentRecord: DeferredFragmentRecord, - ): DeferredFragmentNode { - let deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord); - if (deferredFragmentNode !== undefined) { - return deferredFragmentNode; + subsequentResultRecords: Set | undefined, + ): void { + if (this._rootNodes.has(deferredFragmentRecord)) { + return; } - deferredFragmentNode = { - deferredFragmentRecord, - deferredGroupedFieldSetRecords: new Set(), - reconcilableResults: new Set(), - children: [], - }; - this._deferredFragmentNodes.set(deferredFragmentRecord, deferredFragmentNode); const parent = deferredFragmentRecord.parent; if (parent === undefined) { - this._newPending.add(deferredFragmentNode); - return deferredFragmentNode; + invariant(subsequentResultRecords !== undefined); + subsequentResultRecords.add(deferredFragmentRecord); + return; + } + parent.children.add(deferredFragmentRecord); + this._addDeferredFragment(parent, subsequentResultRecords); + } + + private _onDeferredGroupedFieldSet( + deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + ): void { + const result = ( + deferredGroupedFieldSetRecord.result as BoxedPromiseOrValue + ).value; + if (isPromise(result)) { + result.then(resolved => this._enqueue(resolved)); + } else { + this._enqueue(result); } - const parentNode = this._addDeferredFragmentNode(parent); - parentNode.children.push(deferredFragmentNode); - return deferredFragmentNode; } - private async _onStreamItems( - streamRecord: StreamRecord, - streamItemQueue: Array, - ): Promise { + private async _onStreamItems(streamRecord: StreamRecord): Promise { let items: Array = []; let errors: Array = []; let incrementalDataRecords: Array = []; + const streamItemQueue = streamRecord.streamItemQueue; let streamItemRecord: StreamItemRecord | undefined; while ((streamItemRecord = streamItemQueue.shift()) !== undefined) { let result = - typeof streamItemRecord === 'function' ? streamItemRecord().value : streamItemRecord.value; + streamItemRecord instanceof BoxedPromiseOrValue + ? streamItemRecord.value + : streamItemRecord().value; if (isPromise(result)) { if (items.length > 0) { this._enqueue({ diff --git a/packages/executor/src/execution/IncrementalPublisher.ts b/packages/executor/src/execution/IncrementalPublisher.ts index 8b879426593..02e63a2e414 100644 --- a/packages/executor/src/execution/IncrementalPublisher.ts +++ b/packages/executor/src/execution/IncrementalPublisher.ts @@ -68,8 +68,7 @@ class IncrementalPublisher { errors: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, ): IncrementalExecutionResults { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - const newPending = this._incrementalGraph.getNewPending(); + const newPending = this._incrementalGraph.getNewPending(incrementalDataRecords); const pending = this._pendingSourcesToResults(newPending); @@ -215,8 +214,6 @@ class IncrementalPublisher { } else { this._handleCompletedStreamItems(completedIncrementalData, context); } - const newPending = this._incrementalGraph.getNewPending(); - context.pending.push(...this._pendingSourcesToResults(newPending)); } private _handleCompletedDeferredGroupedFieldSet( @@ -236,7 +233,6 @@ class IncrementalPublisher { id, errors: deferredGroupedFieldSetResult.errors, }); - this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord); } return; } @@ -245,27 +241,17 @@ class IncrementalPublisher { deferredGroupedFieldSetResult, ); - const incrementalDataRecords = deferredGroupedFieldSetResult.incrementalDataRecords; - if (incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords(incrementalDataRecords); - } - for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredGroupedFieldSetRecord .deferredFragmentRecords) { - const id = deferredFragmentRecord.id; - // TODO: add test case for this. - // Presumably, this can occur if an error causes a fragment to be completed early, - // while an asynchronous deferred grouped field set result is enqueued. - /* c8 ignore next 3 */ - if (id === undefined) { - continue; - } - const reconcilableResults = - this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); - if (reconcilableResults === undefined) { + const completion = this._incrementalGraph.completeDeferredFragment(deferredFragmentRecord); + if (completion === undefined) { continue; } + const id = deferredFragmentRecord.id; + invariant(id !== undefined); const incremental = context.incremental; + const { newPending, reconcilableResults } = completion; + context.pending.push(...this._pendingSourcesToResults(newPending)); for (const reconcilableResult of reconcilableResults) { const { bestId, subPath } = this._getBestIdAndSubPath( id, @@ -321,8 +307,10 @@ class IncrementalPublisher { context.incremental.push(incrementalEntry); - if (streamItemsResult.incrementalDataRecords !== undefined) { - this._incrementalGraph.addIncrementalDataRecords(streamItemsResult.incrementalDataRecords); + const incrementalDataRecords = streamItemsResult.incrementalDataRecords; + if (incrementalDataRecords !== undefined) { + const newPending = this._incrementalGraph.getNewPending(incrementalDataRecords); + context.pending.push(...this._pendingSourcesToResults(newPending)); } } } diff --git a/packages/executor/src/execution/__tests__/defer-test.ts b/packages/executor/src/execution/__tests__/defer-test.ts index dce6241fec6..104e0c42ba4 100644 --- a/packages/executor/src/execution/__tests__/defer-test.ts +++ b/packages/executor/src/execution/__tests__/defer-test.ts @@ -11,6 +11,7 @@ import { import { expectJSON } from '../../__testUtils__/expectJSON.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; import { execute } from '../execute.js'; +import { promiseWithResolvers } from '../promiseWithResolvers.js'; import type { InitialIncrementalExecutionResult, SubsequentIncrementalExecutionResult, @@ -849,6 +850,135 @@ describe('Execute: defer directive', () => { ]); }); + it('Initiates all deferred grouped field sets immediately if and only if they have been released as pending', async () => { + const document = parse(` + query { + ... @defer { + a { + ... @defer { + b { + c { d } + } + } + } + } + ... @defer { + a { + someField + ... @defer { + b { + e { f } + } + } + } + } + } + `); + + const { promise: slowFieldPromise, resolve: resolveSlowField } = promiseWithResolvers(); + let cResolverCalled = false; + let eResolverCalled = false; + const executeResult = execute({ + schema, + document, + rootValue: { + a: { + someField: slowFieldPromise, + b: { + c: () => { + cResolverCalled = true; + return { d: 'd' }; + }, + e: () => { + eResolverCalled = true; + return { f: 'f' }; + }, + }, + }, + }, + enableEarlyExecution: false, + }); + + expect('initialResult' in executeResult).toBeTruthy(); + + // @ts-expect-error once we assert that initialResult is in executeResult then it should work fine + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: {}, + pending: [ + { id: '0', path: [] }, + { id: '1', path: [] }, + ], + hasNext: true, + }); + + // @ts-expect-error once we assert that initialResult is in executeResult then it should work fine + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + expect(cResolverCalled).toBe(false); + expect(eResolverCalled).toBe(false); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + value: { + pending: [{ id: '2', path: ['a'] }], + incremental: [ + { + data: { a: {} }, + id: '0', + }, + { + data: { b: {} }, + id: '2', + }, + { + data: { c: { d: 'd' } }, + id: '2', + subPath: ['b'], + }, + ], + completed: [{ id: '0' }, { id: '2' }], + hasNext: true, + }, + done: false, + }); + + expect(cResolverCalled).toBe(true); + expect(eResolverCalled).toBe(false); + + resolveSlowField('someField'); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + pending: [{ id: '3', path: ['a'] }], + incremental: [ + { + data: { someField: 'someField' }, + id: '1', + subPath: ['a'], + }, + { + data: { e: { f: 'f' } }, + id: '3', + subPath: ['b'], + }, + ], + completed: [{ id: '1' }, { id: '3' }], + hasNext: false, + }, + done: false, + }); + + expect(eResolverCalled).toBe(true); + + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('Can deduplicate multiple defers on the same object', async () => { const document = parse(` query { diff --git a/packages/executor/src/execution/execute.ts b/packages/executor/src/execution/execute.ts index 773c00a8cc1..6fe459d5ba9 100644 --- a/packages/executor/src/execution/execute.ts +++ b/packages/executor/src/execution/execute.ts @@ -1585,11 +1585,7 @@ function addNewDeferredFragments( : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. - const deferredFragmentRecord: DeferredFragmentRecord = { - path, - label: newDeferUsage.label, - parent, - }; + const deferredFragmentRecord = new DeferredFragmentRecord(path, newDeferUsage.label, parent); // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); @@ -2024,13 +2020,24 @@ function executeDeferredGroupedFieldSets( deferMap, ); - const shouldDeferThisDeferUsageSet = shouldDefer(parentDeferUsages, deferUsageSet); - - deferredGroupedFieldSetRecord.result = shouldDeferThisDeferUsageSet - ? exeContext.enableEarlyExecution - ? new BoxedPromiseOrValue(Promise.resolve().then(executor)) - : () => new BoxedPromiseOrValue(executor()) - : new BoxedPromiseOrValue(executor()); + if (exeContext.enableEarlyExecution) { + deferredGroupedFieldSetRecord.result = new BoxedPromiseOrValue( + shouldDefer(parentDeferUsages, deferUsageSet) + ? Promise.resolve().then(executor) + : executor(), + ); + } else { + deferredGroupedFieldSetRecord.result = () => new BoxedPromiseOrValue(executor()); + const resolveThunk = () => { + const maybeThunk = deferredGroupedFieldSetRecord.result; + if (!(maybeThunk instanceof BoxedPromiseOrValue)) { + deferredGroupedFieldSetRecord.result = maybeThunk(); + } + }; + for (const deferredFragmentRecord of deferredFragmentRecords) { + deferredFragmentRecord.onPending(resolveThunk); + } + } newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } diff --git a/packages/executor/src/execution/types.ts b/packages/executor/src/execution/types.ts index 81f43cc5383..c65a4805b4f 100644 --- a/packages/executor/src/execution/types.ts +++ b/packages/executor/src/execution/types.ts @@ -202,11 +202,49 @@ export interface DeferredGroupedFieldSetRecord { export type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; -export interface DeferredFragmentRecord { +/** @internal */ +export class DeferredFragmentRecord { path: Path | undefined; label: string | undefined; id?: string | undefined; parent: DeferredFragmentRecord | undefined; + deferredGroupedFieldSetRecords: Set; + reconcilableResults: Set; + children: Set; + pending: boolean; + fns: Array<() => void>; + + constructor( + path: Path | undefined, + label: string | undefined, + parent: DeferredFragmentRecord | undefined, + ) { + this.path = path; + this.label = label; + this.parent = parent; + this.deferredGroupedFieldSetRecords = new Set(); + this.reconcilableResults = new Set(); + this.children = new Set(); + this.pending = false; + this.fns = []; + } + + onPending(fn: () => void): void { + this.fns.push(fn); + } + + setAsPending(): void { + this.pending = true; + for (const fn of this.fns) { + fn(); + } + } +} + +export function isDeferredFragmentRecord( + subsequentResultRecord: SubsequentResultRecord, +): subsequentResultRecord is DeferredFragmentRecord { + return subsequentResultRecord instanceof DeferredFragmentRecord; } export interface StreamItemResult {