diff --git a/.changeset/wise-singers-cry.md b/.changeset/wise-singers-cry.md new file mode 100644 index 00000000000..1d2882c6c57 --- /dev/null +++ b/.changeset/wise-singers-cry.md @@ -0,0 +1,8 @@ +--- +"@graphql-tools/executor-graphql-ws": minor +"@graphql-tools/executor-legacy-ws": minor +"@graphql-tools/executor-http": minor +"@graphql-tools/utils": minor +--- + +Implement Symbol.dispose or Symbol.asyncDispose to make \`Executor\`s \`Disposable\` diff --git a/packages/executors/graphql-ws/src/index.ts b/packages/executors/graphql-ws/src/index.ts index a32b964cc93..a73f38221e0 100644 --- a/packages/executors/graphql-ws/src/index.ts +++ b/packages/executors/graphql-ws/src/index.ts @@ -2,14 +2,17 @@ import { print } from 'graphql'; import { Client, ClientOptions, createClient } from 'graphql-ws'; import WebSocket from 'isomorphic-ws'; import { + DisposableExecutor, ExecutionRequest, - ExecutionResult, - Executor, getOperationASTFromRequest, + memoize1, } from '@graphql-tools/utils'; +const defaultPrintFn = memoize1(print); + interface GraphQLWSExecutorOptions extends ClientOptions { onClient?: (client: Client) => void; + print?: typeof print; } function isClient(client: Client | GraphQLWSExecutorOptions): client is Client { @@ -18,12 +21,16 @@ function isClient(client: Client | GraphQLWSExecutorOptions): client is Client { export function buildGraphQLWSExecutor( clientOptionsOrClient: GraphQLWSExecutorOptions | Client, -): Executor { +): DisposableExecutor { let graphqlWSClient: Client; let executorConnectionParams = {}; + let printFn = defaultPrintFn; if (isClient(clientOptionsOrClient)) { graphqlWSClient = clientOptionsOrClient; } else { + if (clientOptionsOrClient.print) { + printFn = clientOptionsOrClient.print; + } graphqlWSClient = createClient({ ...clientOptionsOrClient, webSocketImpl: WebSocket, @@ -40,14 +47,12 @@ export function buildGraphQLWSExecutor( clientOptionsOrClient.onClient(graphqlWSClient); } } - return function GraphQLWSExecutor< + const executor = function GraphQLWSExecutor< TData, TArgs extends Record, TRoot, TExtensions extends Record, - >( - executionRequest: ExecutionRequest, - ): AsyncIterableIterator> | Promise> { + >(executionRequest: ExecutionRequest) { const { document, variables, @@ -63,7 +68,7 @@ export function buildGraphQLWSExecutor( extensions['connectionParams'], ); } - const query = print(document); + const query = printFn(document); const iterableIterator = graphqlWSClient.iterate({ query, variables, @@ -75,4 +80,9 @@ export function buildGraphQLWSExecutor( } return iterableIterator.next().then(({ value }) => value); }; + const disposableExecutor: DisposableExecutor = executor; + disposableExecutor[Symbol.asyncDispose] = function disposeWS() { + return graphqlWSClient.dispose(); + }; + return disposableExecutor; } diff --git a/packages/executors/graphql-ws/tests/graphql-ws.test.ts b/packages/executors/graphql-ws/tests/graphql-ws.test.ts index 7f4c96082b6..d5e125206ae 100644 --- a/packages/executors/graphql-ws/tests/graphql-ws.test.ts +++ b/packages/executors/graphql-ws/tests/graphql-ws.test.ts @@ -2,11 +2,13 @@ import { createServer, Server } from 'http'; import { AddressInfo } from 'net'; import { parse } from 'graphql'; import { useServer } from 'graphql-ws/lib/use/ws'; +import { Repeater } from 'graphql-yoga'; import { WebSocketServer } from 'ws'; // yarn add ws import { buildGraphQLWSExecutor } from '@graphql-tools/executor-graphql-ws'; import { makeExecutableSchema } from '@graphql-tools/schema'; import { Executor, isAsyncIterable } from '@graphql-tools/utils'; +import { assertAsyncIterable } from '../../../loaders/url/tests/test-utils'; describe('GraphQL WS Executor', () => { let server: Server; @@ -35,10 +37,28 @@ describe('GraphQL WS Executor', () => { }, Subscription: { count: { - subscribe: async function* (_root, { to }) { - for (let i = 0; i < to; i++) { - yield { count: i }; - } + subscribe(_, { to }: { to: number }) { + return new Repeater((push, stop) => { + let i = 0; + let closed = false; + let timeout: NodeJS.Timeout; + const pump = async () => { + if (closed) { + return; + } + await push({ count: i }); + if (i++ < to) { + timeout = setTimeout(pump, 150); + } else { + stop(); + } + }; + stop.then(() => { + closed = true; + clearTimeout(timeout); + }); + pump(); + }); }, }, }, @@ -53,8 +73,9 @@ describe('GraphQL WS Executor', () => { url: `ws://localhost:${(server.address() as AddressInfo).port}/graphql`, }); }); - afterAll(async () => { - await new Promise(resolve => server.close(resolve)); + afterAll(done => { + server.closeAllConnections(); + server.close(done); }); it('should return a promise of an execution result for regular queries', async () => { const result = await executor({ @@ -92,6 +113,25 @@ describe('GraphQL WS Executor', () => { { data: { count: 0 } }, { data: { count: 1 } }, { data: { count: 2 } }, + { data: { count: 3 } }, ]); }); + it('should close connections when disposed', async () => { + const result = await executor({ + document: parse(/* GraphQL */ ` + subscription { + count(to: 4) + } + `), + }); + assertAsyncIterable(result); + for await (const item of result) { + if (item.data?.count === 2) { + await executor[Symbol.asyncDispose](); + } + if (item.data?.count === 3) { + throw new Error('Expected connection to be closed before receiving the third item'); + } + } + }); }); diff --git a/packages/executors/http/src/addCancelToResponseStream.ts b/packages/executors/http/src/addCancelToResponseStream.ts deleted file mode 100644 index ec55c49642e..00000000000 --- a/packages/executors/http/src/addCancelToResponseStream.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { withCancel } from '@graphql-tools/utils'; - -export function addCancelToResponseStream( - resultStream: AsyncIterable, - controller: AbortController, -) { - return withCancel(resultStream, () => { - if (!controller.signal.aborted) { - controller.abort(); - } - }); -} diff --git a/packages/executors/http/src/handleEventStreamResponse.ts b/packages/executors/http/src/handleEventStreamResponse.ts index 8b35cb71b6b..011198216d8 100644 --- a/packages/executors/http/src/handleEventStreamResponse.ts +++ b/packages/executors/http/src/handleEventStreamResponse.ts @@ -1,5 +1,4 @@ import { ExecutionResult, inspect, isAsyncIterable } from '@graphql-tools/utils'; -import { addCancelToResponseStream } from './addCancelToResponseStream.js'; import { handleAsyncIterable } from './handleAsyncIterable.js'; import { handleReadableStream } from './handleReadableStream.js'; @@ -7,20 +6,12 @@ export function isReadableStream(value: any): value is ReadableStream { return value && typeof value.getReader === 'function'; } -export function handleEventStreamResponse( - response: Response, - controller?: AbortController, -): AsyncIterable { +export function handleEventStreamResponse(response: Response): AsyncIterable { // node-fetch returns body as a promise so we need to resolve it const body = response.body; if (body) { if (isAsyncIterable(body)) { - const resultStream = handleAsyncIterable(body); - if (controller) { - return addCancelToResponseStream(resultStream, controller); - } else { - return resultStream; - } + return handleAsyncIterable(body); } if (isReadableStream(body)) { return handleReadableStream(body); diff --git a/packages/executors/http/src/handleMultipartMixedResponse.ts b/packages/executors/http/src/handleMultipartMixedResponse.ts index ab839c81e4d..e4127ab3808 100644 --- a/packages/executors/http/src/handleMultipartMixedResponse.ts +++ b/packages/executors/http/src/handleMultipartMixedResponse.ts @@ -2,7 +2,6 @@ import type { IncomingMessage } from 'http'; import { meros as merosReadableStream } from 'meros/browser'; import { meros as merosIncomingMessage } from 'meros/node'; import { ExecutionResult, mapAsyncIterator, mergeIncrementalResult } from '@graphql-tools/utils'; -import { addCancelToResponseStream } from './addCancelToResponseStream.js'; type Part = | { @@ -18,10 +17,7 @@ function isIncomingMessage(body: any): body is IncomingMessage { return body != null && typeof body === 'object' && 'pipe' in body; } -export async function handleMultipartMixedResponse( - response: Response, - controller?: AbortController, -) { +export async function handleMultipartMixedResponse(response: Response) { const body = response.body; const contentType = response.headers.get('content-type') || ''; let asyncIterator: AsyncIterator | undefined; @@ -60,9 +56,5 @@ export async function handleMultipartMixedResponse( } }); - if (controller) { - return addCancelToResponseStream(resultStream, controller); - } - return resultStream; } diff --git a/packages/executors/http/src/index.ts b/packages/executors/http/src/index.ts index 7650a90fb3c..41ca74c2854 100644 --- a/packages/executors/http/src/index.ts +++ b/packages/executors/http/src/index.ts @@ -1,13 +1,14 @@ import { DocumentNode, GraphQLResolveInfo } from 'graphql'; import { ValueOrPromise } from 'value-or-promise'; import { - AsyncExecutor, createGraphQLError, + DisposableAsyncExecutor, + DisposableExecutor, + DisposableSyncExecutor, ExecutionRequest, ExecutionResult, Executor, getOperationASTFromRequest, - SyncExecutor, } from '@graphql-tools/utils'; import { fetch as defaultFetch } from '@whatwg-node/fetch'; import { createFormDataFromVariables } from './createFormDataFromVariables.js'; @@ -90,27 +91,31 @@ export type HeadersConfig = Record; export function buildHTTPExecutor( options?: Omit & { fetch: SyncFetchFn }, -): SyncExecutor; +): DisposableSyncExecutor; export function buildHTTPExecutor( options?: Omit & { fetch: AsyncFetchFn }, -): AsyncExecutor; +): DisposableAsyncExecutor; export function buildHTTPExecutor( options?: Omit & { fetch: RegularFetchFn }, -): AsyncExecutor; +): DisposableAsyncExecutor; export function buildHTTPExecutor( options?: Omit, -): AsyncExecutor; +): DisposableAsyncExecutor; export function buildHTTPExecutor( options?: HTTPExecutorOptions, ): Executor { const printFn = options?.print ?? defaultPrintFn; + const controller = new AbortController(); const executor = (request: ExecutionRequest) => { + if (controller.signal.aborted) { + throw new Error('Executor was disposed. Aborting execution'); + } const fetchFn = request.extensions?.fetch ?? options?.fetch ?? defaultFetch; - let controller: AbortController | undefined; + let signal = controller.signal; let method = request.extensions?.method || options?.method; const operationAst = getOperationASTFromRequest(request); @@ -149,14 +154,10 @@ export function buildHTTPExecutor( const query = printFn(request.document); - let timeoutId: any; if (options?.timeout) { - controller = new AbortController(); - timeoutId = setTimeout(() => { - if (!controller?.signal.aborted) { - controller?.abort('timeout'); - } - }, options.timeout); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore AbortSignal.any is not yet in the DOM types + signal = AbortSignal.any([signal, AbortSignal.timeout(options.timeout)]); } const responseDetailsForError: { @@ -177,7 +178,7 @@ export function buildHTTPExecutor( const fetchOptions: RequestInit = { method: 'GET', headers, - signal: controller?.signal, + signal, }; if (options?.credentials != null) { fetchOptions.credentials = options.credentials; @@ -207,7 +208,7 @@ export function buildHTTPExecutor( method: 'POST', body, headers, - signal: controller?.signal, + signal, }; if (options?.credentials != null) { fetchOptions.credentials = options.credentials; @@ -220,9 +221,6 @@ export function buildHTTPExecutor( .then((fetchResult: Response): any => { responseDetailsForError.status = fetchResult.status; responseDetailsForError.statusText = fetchResult.statusText; - if (timeoutId != null) { - clearTimeout(timeoutId); - } // Retry should respect HTTP Errors if (options?.retry != null && !fetchResult.status.toString().startsWith('2')) { @@ -231,9 +229,9 @@ export function buildHTTPExecutor( const contentType = fetchResult.headers.get('content-type'); if (contentType?.includes('text/event-stream')) { - return handleEventStreamResponse(fetchResult, controller); + return handleEventStreamResponse(fetchResult); } else if (contentType?.includes('multipart/mixed')) { - return handleMultipartMixedResponse(fetchResult, controller); + return handleMultipartMixedResponse(fetchResult); } return fetchResult.text(); @@ -317,10 +315,10 @@ export function buildHTTPExecutor( }), ], }; - } else if (e.name === 'AbortError' && controller?.signal?.reason) { + } else if (e.name === 'AbortError' && signal?.reason) { return { errors: [ - createGraphQLError('The operation was aborted. reason: ' + controller.signal.reason, { + createGraphQLError('The operation was aborted. reason: ' + signal.reason, { extensions: { requestBody: { query, @@ -395,7 +393,17 @@ export function buildHTTPExecutor( }; } - return executor; + const disposableExecutor: DisposableExecutor = executor; + + disposableExecutor[Symbol.dispose] = () => { + return controller.abort(new Error('Executor was disposed. Aborting execution')); + }; + + disposableExecutor[Symbol.asyncDispose] = () => { + return controller.abort(new Error('Executor was disposed. Aborting execution')); + }; + + return disposableExecutor; } export { isLiveQueryOperationDefinitionNode }; diff --git a/packages/executors/http/tests/buildHTTPExecutor.test.ts b/packages/executors/http/tests/buildHTTPExecutor.test.ts index a56ee27c8f1..f9779e85e10 100644 --- a/packages/executors/http/tests/buildHTTPExecutor.test.ts +++ b/packages/executors/http/tests/buildHTTPExecutor.test.ts @@ -1,5 +1,6 @@ +import { createServer, Server } from 'http'; import { parse } from 'graphql'; -import { ExecutionResult } from '@graphql-tools/utils'; +import { createGraphQLError, ExecutionResult } from '@graphql-tools/utils'; import { ReadableStream, Request, Response } from '@whatwg-node/fetch'; import { assertAsyncIterable } from '../../../loaders/url/tests/test-utils.js'; import { buildHTTPExecutor } from '../src/index.js'; @@ -208,4 +209,50 @@ describe('buildHTTPExecutor', () => { expect(result.errors).toBeUndefined(); }); + let server: Server; + afterEach(done => { + if (server?.listening) { + server.close(done); + } else { + done(); + } + }); + it('stops existing requests when the executor is disposed', async () => { + // Create a server that never responds + server = createServer(); + await new Promise(resolve => server.listen(0, resolve)); + const executor = buildHTTPExecutor({ + endpoint: `http://localhost:${(server.address() as any).port}`, + }); + const result = executor({ + document: parse(/* GraphQL */ ` + query { + hello + } + `), + }); + executor[Symbol.dispose]?.(); + await expect(result).resolves.toEqual({ + errors: [ + createGraphQLError( + 'The operation was aborted. reason: Error: Executor was disposed. Aborting execution', + ), + ], + }); + }); + it('does not allow new requests when the executor is disposed', async () => { + const executor = buildHTTPExecutor({ + fetch: () => Response.json({ data: { hello: 'world' } }), + }); + executor[Symbol.dispose]?.(); + expect(() => + executor({ + document: parse(/* GraphQL */ ` + query { + hello + } + `), + }), + ).toThrow('Executor was disposed. Aborting execution'); + }); }); diff --git a/packages/executors/http/tests/retry-timeout.test.ts b/packages/executors/http/tests/retry-timeout.test.ts index 3c8f9d3d437..6d480b536e5 100644 --- a/packages/executors/http/tests/retry-timeout.test.ts +++ b/packages/executors/http/tests/retry-timeout.test.ts @@ -5,10 +5,6 @@ import { Response } from '@whatwg-node/fetch'; import { buildHTTPExecutor } from '../src'; describe('Retry & Timeout', () => { - if (process.version.startsWith('v16.')) { - it('skip on node 16', () => {}); - return; - } let server: Server; const sockets = new Set(); afterEach(() => { @@ -24,9 +20,9 @@ describe('Retry & Timeout', () => { async fetch() { if (cnt < 2) { cnt++; - return new Response(undefined, { status: 500 }); + return Response.error(); } - return (Response as any).json({ data: { hello: 'world' } }); + return Response.json({ data: { hello: 'world' } }); }, retry: 3, }); @@ -50,11 +46,11 @@ describe('Retry & Timeout', () => { async fetch() { if (cnt < 2) { cnt++; - return (Response as any).json({ + return Response.json({ errors: [{ message: `error in ${cnt}` }], }); } - return (Response as any).json({ data: { hello: 'world' } }); + return Response.json({ data: { hello: 'world' } }); }, retry: 3, }); @@ -77,7 +73,7 @@ describe('Retry & Timeout', () => { const executor = buildHTTPExecutor({ async fetch() { cnt++; - return (Response as any).json({ + return Response.json({ errors: [{ message: `error in ${cnt}` }], }); }, @@ -118,48 +114,55 @@ describe('Retry & Timeout', () => { `), }); expect(result).toMatchObject({ - errors: [{ message: 'The operation was aborted. reason: timeout' }], + errors: [ + { + message: + 'The operation was aborted. reason: TimeoutError: The operation was aborted due to timeout', + }, + ], }); }); - it('retry & timeout', async () => { - let cnt = 0; - server = new Server((req, res) => { - if (cnt < 2) { - cnt++; - const timeout = setTimeout(() => { - res.end(JSON.stringify({ errors: [{ message: `error in ${cnt}` }] })); - }, 1000); - req.once('close', () => { - clearTimeout(timeout); + if (!process.env['LEAK_TEST']) { + it('retry & timeout', async () => { + let cnt = 0; + server = new Server((req, res) => { + if (cnt < 2) { + cnt++; + const timeout = setTimeout(() => { + res.end(JSON.stringify({ errors: [{ message: `error in ${cnt}` }] })); + }, 1000); + req.once('close', () => { + clearTimeout(timeout); + }); + } else { + res.end(JSON.stringify({ data: { hello: 'world' } })); + } + }); + server.on('connection', socket => { + sockets.add(socket); + socket.once('close', () => { + sockets.delete(socket); }); - } else { - res.end(JSON.stringify({ data: { hello: 'world' } })); - } - }); - server.on('connection', socket => { - sockets.add(socket); - socket.once('close', () => { - sockets.delete(socket); + }); + server.listen(0); + const executor = buildHTTPExecutor({ + endpoint: `http://localhost:${(server.address() as AddressInfo).port}`, + timeout: 500, + retry: 3, + }); + const result = await executor({ + document: parse(/* GraphQL */ ` + query { + hello + } + `), + }); + expect(cnt).toEqual(2); + expect(result).toMatchObject({ + data: { + hello: 'world', + }, }); }); - server.listen(0); - const executor = buildHTTPExecutor({ - endpoint: `http://localhost:${(server.address() as AddressInfo).port}`, - timeout: 500, - retry: 3, - }); - const result = await executor({ - document: parse(/* GraphQL */ ` - query { - hello - } - `), - }); - expect(cnt).toEqual(2); - expect(result).toMatchObject({ - data: { - hello: 'world', - }, - }); - }); + } }); diff --git a/packages/executors/legacy-ws/src/index.ts b/packages/executors/legacy-ws/src/index.ts index 4f0b6ac1cd4..d9d3da92833 100644 --- a/packages/executors/legacy-ws/src/index.ts +++ b/packages/executors/legacy-ws/src/index.ts @@ -1,6 +1,10 @@ import { print } from 'graphql'; import WebSocket from 'isomorphic-ws'; -import { ExecutionRequest, Executor, observableToAsyncIterable } from '@graphql-tools/utils'; +import { + DisposableExecutor, + ExecutionRequest, + observableToAsyncIterable, +} from '@graphql-tools/utils'; export enum LEGACY_WS { CONNECTION_INIT = 'connection_init', @@ -24,7 +28,7 @@ export function buildWSLegacyExecutor( subscriptionsEndpoint: string, WebSocketImpl: typeof WebSocket, options?: LegacyWSExecutorOpts, -): Executor { +): DisposableExecutor { let executorConnectionParams = {}; let websocket: WebSocket | null = null; @@ -83,7 +87,7 @@ export function buildWSLegacyExecutor( } }; - return function legacyExecutor(request: ExecutionRequest) { + const executor: DisposableExecutor = function legacyExecutor(request: ExecutionRequest) { // additional connection params can be supplied through the "connectionParams" field in extensions. // TODO: connection params only from the FIRST operation in lazy mode will be used (detect connectionParams changes and reconnect, too implicit?) if ( @@ -178,4 +182,8 @@ export function buildWSLegacyExecutor( }, }); }; + + executor[Symbol.dispose] = cleanupWebsocket; + + return executor; } diff --git a/packages/utils/src/executor.ts b/packages/utils/src/executor.ts index 3b4b1c85015..3c66647b615 100644 --- a/packages/utils/src/executor.ts +++ b/packages/utils/src/executor.ts @@ -38,3 +38,18 @@ export type Executor, TBaseExtensions = Recor >( request: ExecutionRequest, ) => MaybePromise>>; + +export type DisposableSyncExecutor< + TBaseContext = Record, + TBaseExtensions = Record, +> = SyncExecutor & { [Symbol.dispose]?: () => void }; +export type DisposableAsyncExecutor< + TBaseContext = Record, + TBaseExtensions = Record, +> = AsyncExecutor & { [Symbol.dispose]?: () => void }; +export type DisposableExecutor< + TBaseContext = Record, + TBaseExtensions = Record, +> = + | DisposableSyncExecutor + | DisposableAsyncExecutor;