Skip to content

Commit

Permalink
get latest changes from graphql-js
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jun 20, 2024
1 parent 06a5da9 commit 8f272eb
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 189 deletions.
280 changes: 127 additions & 153 deletions packages/executor/src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -1,129 +1,64 @@
import type { GraphQLError } from 'graphql';
import { isPromise } from '@graphql-tools/utils';
import { BoxedPromiseOrValue } from './BoxedPromiseOrValue.js';
import { invariant } from './invariant.js';
import { promiseWithResolvers } from './promiseWithResolvers.js';
import type {
DeferredFragmentRecord,
DeferredGroupedFieldSetRecord,
DeferredGroupedFieldSetResult,
IncrementalDataRecord,
IncrementalDataRecordResult,
ReconcilableDeferredGroupedFieldSetResult,
StreamItemRecord,
StreamRecord,
SubsequentResultRecord,
} from './types.js';
import { isDeferredGroupedFieldSetRecord } from './types.js';

interface DeferredFragmentNode {
deferredFragmentRecord: DeferredFragmentRecord;
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
reconcilableResults: Set<ReconcilableDeferredGroupedFieldSetResult>;
children: Array<DeferredFragmentNode>;
}

function isDeferredFragmentNode(
node: DeferredFragmentNode | undefined,
): node is DeferredFragmentNode {
return node !== undefined;
}

function isStreamNode(
record: SubsequentResultNode | IncrementalDataRecord,
): record is StreamRecord {
return 'streamItemQueue' in record;
}

type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
import { isDeferredFragmentRecord, isDeferredGroupedFieldSetRecord } from './types.js';

/**
* @internal
*/
export class IncrementalGraph {
private _pending: Set<SubsequentResultNode>;
private _deferredFragmentNodes: Map<DeferredFragmentRecord, DeferredFragmentNode>;
private _rootNodes: Set<SubsequentResultRecord>;

private _newPending: Set<SubsequentResultNode>;
private _newIncrementalDataRecords: Set<IncrementalDataRecord>;
private _completedQueue: Array<IncrementalDataRecordResult>;
private _nextQueue: Array<
(iterable: IteratorResult<Iterable<IncrementalDataRecordResult>>) => void
>;

constructor() {
this._pending = new Set();
this._deferredFragmentNodes = new Map();
this._newIncrementalDataRecords = new Set();
this._newPending = new Set();
this._rootNodes = new Set();
this._completedQueue = [];
this._nextQueue = [];
}

addIncrementalDataRecords(incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>): void {
for (const incrementalDataRecord of incrementalDataRecords) {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
} else {
this._addStreamRecord(incrementalDataRecord);
}
}
getNewPending(
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
): ReadonlyArray<SubsequentResultRecord> {
const initialResultChildren = new Set<SubsequentResultRecord>();
this._addIncrementalDataRecords(incrementalDataRecords, undefined, initialResultChildren);
return this._promoteNonEmptyToRoot(initialResultChildren);
}

addCompletedReconcilableDeferredGroupedFieldSet(
reconcilableResult: ReconcilableDeferredGroupedFieldSetResult,
): void {
const deferredFragmentNodes: Array<DeferredFragmentNode> =
reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords
.map(deferredFragmentRecord => this._deferredFragmentNodes.get(deferredFragmentRecord))
.filter<DeferredFragmentNode>(isDeferredFragmentNode);
for (const deferredFragmentNode of deferredFragmentNodes) {
deferredFragmentNode.deferredGroupedFieldSetRecords.delete(
for (const deferredFragmentRecord of reconcilableResult.deferredGroupedFieldSetRecord
.deferredFragmentRecords) {
deferredFragmentRecord.deferredGroupedFieldSetRecords.delete(
reconcilableResult.deferredGroupedFieldSetRecord,
);
deferredFragmentNode.reconcilableResults.add(reconcilableResult);
}
}

getNewPending(): ReadonlyArray<SubsequentResultRecord> {
const newPending: Array<SubsequentResultRecord> = [];
for (const node of this._newPending) {
if (isStreamNode(node)) {
this._pending.add(node);
newPending.push(node);
this._newIncrementalDataRecords.add(node);
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
}
this._pending.add(node);
newPending.push(node.deferredFragmentRecord);
} else {
for (const child of node.children) {
this._newPending.add(child);
}
}
deferredFragmentRecord.reconcilableResults.add(reconcilableResult);
}
this._newPending.clear();

for (const incrementalDataRecord of this._newIncrementalDataRecords) {
if (isStreamNode(incrementalDataRecord)) {
this._onStreamItems(incrementalDataRecord, incrementalDataRecord.streamItemQueue);
} else {
const deferredGroupedFieldSetResult = incrementalDataRecord.result;
const result =
deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue
? deferredGroupedFieldSetResult.value
: deferredGroupedFieldSetResult().value;

if (isPromise(result)) {
result.then(resolved => this._enqueue(resolved));
} else {
this._enqueue(result);
}
}
const incrementalDataRecords = reconcilableResult.incrementalDataRecords;
if (incrementalDataRecords !== undefined) {
this._addIncrementalDataRecords(
incrementalDataRecords,
reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords,
);
}
this._newIncrementalDataRecords.clear();

return newPending;
}

completedIncrementalData() {
Expand Down Expand Up @@ -154,120 +89,159 @@ export class IncrementalGraph {
}

hasNext(): boolean {
return this._pending.size > 0;
return this._rootNodes.size > 0;
}

completeDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
): Array<ReconcilableDeferredGroupedFieldSetResult> | undefined {
const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
// TODO: add test case?
/* c8 ignore next 3 */
if (deferredFragmentNode === undefined) {
return undefined;
}
if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) {
completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord):
| {
newPending: ReadonlyArray<SubsequentResultRecord>;
reconcilableResults: ReadonlyArray<ReconcilableDeferredGroupedFieldSetResult>;
}
| undefined {
if (
!this._rootNodes.has(deferredFragmentRecord) ||
deferredFragmentRecord.deferredGroupedFieldSetRecords.size > 0
) {
return;
}
const reconcilableResults = Array.from(deferredFragmentNode.reconcilableResults);
const reconcilableResults = Array.from(deferredFragmentRecord.reconcilableResults);
this._removePending(deferredFragmentRecord);
for (const reconcilableResult of reconcilableResults) {
for (const otherDeferredFragmentRecord of reconcilableResult.deferredGroupedFieldSetRecord
.deferredFragmentRecords) {
const otherDeferredFragmentNode = this._deferredFragmentNodes.get(
otherDeferredFragmentRecord,
);
if (otherDeferredFragmentNode === undefined) {
continue;
}
otherDeferredFragmentNode.reconcilableResults.delete(reconcilableResult);
otherDeferredFragmentRecord.reconcilableResults.delete(reconcilableResult);
}
}
this._removePending(deferredFragmentNode);
for (const child of deferredFragmentNode.children) {
this._newPending.add(child);
}
return reconcilableResults;
const newPending = this._promoteNonEmptyToRoot(deferredFragmentRecord.children);
return { newPending, reconcilableResults };
}

removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): boolean {
const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
if (deferredFragmentNode === undefined) {
if (!this._rootNodes.has(deferredFragmentRecord)) {
return false;
}
this._removePending(deferredFragmentNode);
this._deferredFragmentNodes.delete(deferredFragmentRecord);
// TODO: add test case for an erroring deferred fragment with child defers
/* c8 ignore next 3 */
for (const child of deferredFragmentNode.children) {
this.removeDeferredFragment(child.deferredFragmentRecord);
}
this._removePending(deferredFragmentRecord);
return true;
}

removeStream(streamRecord: StreamRecord): void {
this._removePending(streamRecord);
}

private _removePending(subsequentResultNode: SubsequentResultNode): void {
this._pending.delete(subsequentResultNode);
if (this._pending.size === 0) {
private _removePending(subsequentResultRecord: SubsequentResultRecord): void {
this._rootNodes.delete(subsequentResultRecord);
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
}
}

private _addDeferredGroupedFieldSetRecord(
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
private _addIncrementalDataRecords(
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
parents: ReadonlyArray<DeferredFragmentRecord> | undefined,
initialResultChildren?: Set<SubsequentResultRecord> | undefined,
): void {
for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) {
const deferredFragmentNode = this._addDeferredFragmentNode(deferredFragmentRecord);
if (this._pending.has(deferredFragmentNode)) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord);
for (const incrementalDataRecord of incrementalDataRecords) {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) {
this._addDeferredFragment(deferredFragmentRecord, initialResultChildren);
deferredFragmentRecord.deferredGroupedFieldSetRecords.add(incrementalDataRecord);
}
if (this._hasPendingFragment(incrementalDataRecord)) {
this._onDeferredGroupedFieldSet(incrementalDataRecord);
}
} else if (parents === undefined) {
invariant(initialResultChildren !== undefined);
initialResultChildren.add(incrementalDataRecord);
} else {
for (const parent of parents) {
this._addDeferredFragment(parent, initialResultChildren);
parent.children.add(incrementalDataRecord);
}
}
deferredFragmentNode.deferredGroupedFieldSetRecords.add(deferredGroupedFieldSetRecord);
}
}

private _addStreamRecord(streamRecord: StreamRecord): void {
this._newPending.add(streamRecord);
private _promoteNonEmptyToRoot(
maybeEmptyNewPending: Set<SubsequentResultRecord>,
): ReadonlyArray<SubsequentResultRecord> {
const newPending: Array<SubsequentResultRecord> = [];
for (const subsequentResultRecord of maybeEmptyNewPending) {
if (isDeferredFragmentRecord(subsequentResultRecord)) {
if (subsequentResultRecord.deferredGroupedFieldSetRecords.size > 0) {
subsequentResultRecord.setAsPending();
for (const deferredGroupedFieldSetRecord of subsequentResultRecord.deferredGroupedFieldSetRecords) {
if (!this._hasPendingFragment(deferredGroupedFieldSetRecord)) {
this._onDeferredGroupedFieldSet(deferredGroupedFieldSetRecord);
}
}
this._rootNodes.add(subsequentResultRecord);
newPending.push(subsequentResultRecord);
continue;
}
for (const child of subsequentResultRecord.children) {
maybeEmptyNewPending.add(child);
}
} else {
this._rootNodes.add(subsequentResultRecord);
newPending.push(subsequentResultRecord);

this._onStreamItems(subsequentResultRecord);
}
}
return newPending;
}

private _hasPendingFragment(
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
): boolean {
return deferredGroupedFieldSetRecord.deferredFragmentRecords.some(deferredFragmentRecord =>
this._rootNodes.has(deferredFragmentRecord),
);
}

private _addDeferredFragmentNode(
private _addDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
): DeferredFragmentNode {
let deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
if (deferredFragmentNode !== undefined) {
return deferredFragmentNode;
subsequentResultRecords: Set<SubsequentResultRecord> | undefined,
): void {
if (this._rootNodes.has(deferredFragmentRecord)) {
return;
}
deferredFragmentNode = {
deferredFragmentRecord,
deferredGroupedFieldSetRecords: new Set(),
reconcilableResults: new Set(),
children: [],
};
this._deferredFragmentNodes.set(deferredFragmentRecord, deferredFragmentNode);
const parent = deferredFragmentRecord.parent;
if (parent === undefined) {
this._newPending.add(deferredFragmentNode);
return deferredFragmentNode;
invariant(subsequentResultRecords !== undefined);
subsequentResultRecords.add(deferredFragmentRecord);
return;
}
parent.children.add(deferredFragmentRecord);
this._addDeferredFragment(parent, subsequentResultRecords);
}

private _onDeferredGroupedFieldSet(
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
): void {
const result = (
deferredGroupedFieldSetRecord.result as BoxedPromiseOrValue<DeferredGroupedFieldSetResult>
).value;
if (isPromise(result)) {
result.then(resolved => this._enqueue(resolved));
} else {
this._enqueue(result);
}
const parentNode = this._addDeferredFragmentNode(parent);
parentNode.children.push(deferredFragmentNode);
return deferredFragmentNode;
}

private async _onStreamItems(
streamRecord: StreamRecord,
streamItemQueue: Array<StreamItemRecord>,
): Promise<void> {
private async _onStreamItems(streamRecord: StreamRecord): Promise<void> {
let items: Array<unknown> = [];
let errors: Array<GraphQLError> = [];
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
const streamItemQueue = streamRecord.streamItemQueue;
let streamItemRecord: StreamItemRecord | undefined;
while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
let result =
typeof streamItemRecord === 'function' ? streamItemRecord().value : streamItemRecord.value;
streamItemRecord instanceof BoxedPromiseOrValue
? streamItemRecord.value
: streamItemRecord().value;
if (isPromise(result)) {
if (items.length > 0) {
this._enqueue({
Expand Down
Loading

0 comments on commit 8f272eb

Please sign in to comment.