Skip to content

Commit

Permalink
feat(executors): implement disposable (#6323)
Browse files Browse the repository at this point in the history
* feat(executors): implement disposable

* Ah typo

* Go
  • Loading branch information
ardatan committed Jul 5, 2024
1 parent 8f6a514 commit cacf20f
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 122 deletions.
8 changes: 8 additions & 0 deletions .changeset/wise-singers-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@graphql-tools/executor-graphql-ws": minor
"@graphql-tools/executor-legacy-ws": minor
"@graphql-tools/executor-http": minor
"@graphql-tools/utils": minor
---

Implement Symbol.dispose or Symbol.asyncDispose to make \`Executor\`s \`Disposable\`
26 changes: 18 additions & 8 deletions packages/executors/graphql-ws/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ import { print } from 'graphql';
import { Client, ClientOptions, createClient } from 'graphql-ws';
import WebSocket from 'isomorphic-ws';
import {
DisposableExecutor,
ExecutionRequest,
ExecutionResult,
Executor,
getOperationASTFromRequest,
memoize1,
} from '@graphql-tools/utils';

const defaultPrintFn = memoize1(print);

interface GraphQLWSExecutorOptions extends ClientOptions {
onClient?: (client: Client) => void;
print?: typeof print;
}

function isClient(client: Client | GraphQLWSExecutorOptions): client is Client {
Expand All @@ -18,12 +21,16 @@ function isClient(client: Client | GraphQLWSExecutorOptions): client is Client {

export function buildGraphQLWSExecutor(
clientOptionsOrClient: GraphQLWSExecutorOptions | Client,
): Executor {
): DisposableExecutor {
let graphqlWSClient: Client;
let executorConnectionParams = {};
let printFn = defaultPrintFn;
if (isClient(clientOptionsOrClient)) {
graphqlWSClient = clientOptionsOrClient;
} else {
if (clientOptionsOrClient.print) {
printFn = clientOptionsOrClient.print;
}
graphqlWSClient = createClient({
...clientOptionsOrClient,
webSocketImpl: WebSocket,
Expand All @@ -40,14 +47,12 @@ export function buildGraphQLWSExecutor(
clientOptionsOrClient.onClient(graphqlWSClient);
}
}
return function GraphQLWSExecutor<
const executor = function GraphQLWSExecutor<
TData,
TArgs extends Record<string, any>,
TRoot,
TExtensions extends Record<string, any>,
>(
executionRequest: ExecutionRequest<TArgs, any, TRoot, TExtensions>,
): AsyncIterableIterator<ExecutionResult<TData>> | Promise<ExecutionResult<TData>> {
>(executionRequest: ExecutionRequest<TArgs, any, TRoot, TExtensions>) {
const {
document,
variables,
Expand All @@ -63,7 +68,7 @@ export function buildGraphQLWSExecutor(
extensions['connectionParams'],
);
}
const query = print(document);
const query = printFn(document);
const iterableIterator = graphqlWSClient.iterate<TData, TExtensions>({
query,
variables,
Expand All @@ -75,4 +80,9 @@ export function buildGraphQLWSExecutor(
}
return iterableIterator.next().then(({ value }) => value);
};
const disposableExecutor: DisposableExecutor = executor;
disposableExecutor[Symbol.asyncDispose] = function disposeWS() {
return graphqlWSClient.dispose();
};
return disposableExecutor;
}
52 changes: 46 additions & 6 deletions packages/executors/graphql-ws/tests/graphql-ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { createServer, Server } from 'http';
import { AddressInfo } from 'net';
import { parse } from 'graphql';
import { useServer } from 'graphql-ws/lib/use/ws';
import { Repeater } from 'graphql-yoga';
import { WebSocketServer } from 'ws'; // yarn add ws

import { buildGraphQLWSExecutor } from '@graphql-tools/executor-graphql-ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { Executor, isAsyncIterable } from '@graphql-tools/utils';
import { assertAsyncIterable } from '../../../loaders/url/tests/test-utils';

describe('GraphQL WS Executor', () => {
let server: Server;
Expand Down Expand Up @@ -35,10 +37,28 @@ describe('GraphQL WS Executor', () => {
},
Subscription: {
count: {
subscribe: async function* (_root, { to }) {
for (let i = 0; i < to; i++) {
yield { count: i };
}
subscribe(_, { to }: { to: number }) {
return new Repeater((push, stop) => {
let i = 0;
let closed = false;
let timeout: NodeJS.Timeout;
const pump = async () => {
if (closed) {
return;
}
await push({ count: i });
if (i++ < to) {
timeout = setTimeout(pump, 150);
} else {
stop();
}
};
stop.then(() => {
closed = true;
clearTimeout(timeout);
});
pump();
});
},
},
},
Expand All @@ -53,8 +73,9 @@ describe('GraphQL WS Executor', () => {
url: `ws://localhost:${(server.address() as AddressInfo).port}/graphql`,
});
});
afterAll(async () => {
await new Promise(resolve => server.close(resolve));
afterAll(done => {
server.closeAllConnections();
server.close(done);
});
it('should return a promise of an execution result for regular queries', async () => {
const result = await executor({
Expand Down Expand Up @@ -92,6 +113,25 @@ describe('GraphQL WS Executor', () => {
{ data: { count: 0 } },
{ data: { count: 1 } },
{ data: { count: 2 } },
{ data: { count: 3 } },
]);
});
it('should close connections when disposed', async () => {
const result = await executor({
document: parse(/* GraphQL */ `
subscription {
count(to: 4)
}
`),
});
assertAsyncIterable(result);
for await (const item of result) {
if (item.data?.count === 2) {
await executor[Symbol.asyncDispose]();
}
if (item.data?.count === 3) {
throw new Error('Expected connection to be closed before receiving the third item');
}
}
});
});
12 changes: 0 additions & 12 deletions packages/executors/http/src/addCancelToResponseStream.ts

This file was deleted.

13 changes: 2 additions & 11 deletions packages/executors/http/src/handleEventStreamResponse.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import { ExecutionResult, inspect, isAsyncIterable } from '@graphql-tools/utils';
import { addCancelToResponseStream } from './addCancelToResponseStream.js';
import { handleAsyncIterable } from './handleAsyncIterable.js';
import { handleReadableStream } from './handleReadableStream.js';

export function isReadableStream(value: any): value is ReadableStream {
return value && typeof value.getReader === 'function';
}

export function handleEventStreamResponse(
response: Response,
controller?: AbortController,
): AsyncIterable<ExecutionResult> {
export function handleEventStreamResponse(response: Response): AsyncIterable<ExecutionResult> {
// node-fetch returns body as a promise so we need to resolve it
const body = response.body;
if (body) {
if (isAsyncIterable<Uint8Array | string>(body)) {
const resultStream = handleAsyncIterable(body);
if (controller) {
return addCancelToResponseStream(resultStream, controller);
} else {
return resultStream;
}
return handleAsyncIterable(body);
}
if (isReadableStream(body)) {
return handleReadableStream(body);
Expand Down
10 changes: 1 addition & 9 deletions packages/executors/http/src/handleMultipartMixedResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { IncomingMessage } from 'http';
import { meros as merosReadableStream } from 'meros/browser';
import { meros as merosIncomingMessage } from 'meros/node';
import { ExecutionResult, mapAsyncIterator, mergeIncrementalResult } from '@graphql-tools/utils';
import { addCancelToResponseStream } from './addCancelToResponseStream.js';

type Part =
| {
Expand All @@ -18,10 +17,7 @@ function isIncomingMessage(body: any): body is IncomingMessage {
return body != null && typeof body === 'object' && 'pipe' in body;
}

export async function handleMultipartMixedResponse(
response: Response,
controller?: AbortController,
) {
export async function handleMultipartMixedResponse(response: Response) {
const body = response.body;
const contentType = response.headers.get('content-type') || '';
let asyncIterator: AsyncIterator<Part> | undefined;
Expand Down Expand Up @@ -60,9 +56,5 @@ export async function handleMultipartMixedResponse(
}
});

if (controller) {
return addCancelToResponseStream(resultStream, controller);
}

return resultStream;
}
56 changes: 32 additions & 24 deletions packages/executors/http/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { DocumentNode, GraphQLResolveInfo } from 'graphql';
import { ValueOrPromise } from 'value-or-promise';
import {
AsyncExecutor,
createGraphQLError,
DisposableAsyncExecutor,
DisposableExecutor,
DisposableSyncExecutor,
ExecutionRequest,
ExecutionResult,
Executor,
getOperationASTFromRequest,
SyncExecutor,
} from '@graphql-tools/utils';
import { fetch as defaultFetch } from '@whatwg-node/fetch';
import { createFormDataFromVariables } from './createFormDataFromVariables.js';
Expand Down Expand Up @@ -90,27 +91,31 @@ export type HeadersConfig = Record<string, string>;

export function buildHTTPExecutor(
options?: Omit<HTTPExecutorOptions, 'fetch'> & { fetch: SyncFetchFn },
): SyncExecutor<any, HTTPExecutorOptions>;
): DisposableSyncExecutor<any, HTTPExecutorOptions>;

export function buildHTTPExecutor(
options?: Omit<HTTPExecutorOptions, 'fetch'> & { fetch: AsyncFetchFn },
): AsyncExecutor<any, HTTPExecutorOptions>;
): DisposableAsyncExecutor<any, HTTPExecutorOptions>;

export function buildHTTPExecutor(
options?: Omit<HTTPExecutorOptions, 'fetch'> & { fetch: RegularFetchFn },
): AsyncExecutor<any, HTTPExecutorOptions>;
): DisposableAsyncExecutor<any, HTTPExecutorOptions>;

export function buildHTTPExecutor(
options?: Omit<HTTPExecutorOptions, 'fetch'>,
): AsyncExecutor<any, HTTPExecutorOptions>;
): DisposableAsyncExecutor<any, HTTPExecutorOptions>;

export function buildHTTPExecutor(
options?: HTTPExecutorOptions,
): Executor<any, HTTPExecutorOptions> {
const printFn = options?.print ?? defaultPrintFn;
const controller = new AbortController();
const executor = (request: ExecutionRequest<any, any, any, HTTPExecutorOptions>) => {
if (controller.signal.aborted) {
throw new Error('Executor was disposed. Aborting execution');
}
const fetchFn = request.extensions?.fetch ?? options?.fetch ?? defaultFetch;
let controller: AbortController | undefined;
let signal = controller.signal;
let method = request.extensions?.method || options?.method;

const operationAst = getOperationASTFromRequest(request);
Expand Down Expand Up @@ -149,14 +154,10 @@ export function buildHTTPExecutor(

const query = printFn(request.document);

let timeoutId: any;
if (options?.timeout) {
controller = new AbortController();
timeoutId = setTimeout(() => {
if (!controller?.signal.aborted) {
controller?.abort('timeout');
}
}, options.timeout);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore AbortSignal.any is not yet in the DOM types
signal = AbortSignal.any([signal, AbortSignal.timeout(options.timeout)]);
}

const responseDetailsForError: {
Expand All @@ -177,7 +178,7 @@ export function buildHTTPExecutor(
const fetchOptions: RequestInit = {
method: 'GET',
headers,
signal: controller?.signal,
signal,
};
if (options?.credentials != null) {
fetchOptions.credentials = options.credentials;
Expand Down Expand Up @@ -207,7 +208,7 @@ export function buildHTTPExecutor(
method: 'POST',
body,
headers,
signal: controller?.signal,
signal,
};
if (options?.credentials != null) {
fetchOptions.credentials = options.credentials;
Expand All @@ -220,9 +221,6 @@ export function buildHTTPExecutor(
.then((fetchResult: Response): any => {
responseDetailsForError.status = fetchResult.status;
responseDetailsForError.statusText = fetchResult.statusText;
if (timeoutId != null) {
clearTimeout(timeoutId);
}

// Retry should respect HTTP Errors
if (options?.retry != null && !fetchResult.status.toString().startsWith('2')) {
Expand All @@ -231,9 +229,9 @@ export function buildHTTPExecutor(

const contentType = fetchResult.headers.get('content-type');
if (contentType?.includes('text/event-stream')) {
return handleEventStreamResponse(fetchResult, controller);
return handleEventStreamResponse(fetchResult);
} else if (contentType?.includes('multipart/mixed')) {
return handleMultipartMixedResponse(fetchResult, controller);
return handleMultipartMixedResponse(fetchResult);
}

return fetchResult.text();
Expand Down Expand Up @@ -317,10 +315,10 @@ export function buildHTTPExecutor(
}),
],
};
} else if (e.name === 'AbortError' && controller?.signal?.reason) {
} else if (e.name === 'AbortError' && signal?.reason) {
return {
errors: [
createGraphQLError('The operation was aborted. reason: ' + controller.signal.reason, {
createGraphQLError('The operation was aborted. reason: ' + signal.reason, {
extensions: {
requestBody: {
query,
Expand Down Expand Up @@ -395,7 +393,17 @@ export function buildHTTPExecutor(
};
}

return executor;
const disposableExecutor: DisposableExecutor = executor;

disposableExecutor[Symbol.dispose] = () => {
return controller.abort(new Error('Executor was disposed. Aborting execution'));
};

disposableExecutor[Symbol.asyncDispose] = () => {
return controller.abort(new Error('Executor was disposed. Aborting execution'));
};

return disposableExecutor;
}

export { isLiveQueryOperationDefinitionNode };
Loading

0 comments on commit cacf20f

Please sign in to comment.