Skip to content

Commit be85e4a

Browse files
committed
feat(gateway): add support for new Hive CDN mirror and circuit breaker
1 parent e1dc7e1 commit be85e4a

File tree

4 files changed

+172
-120
lines changed

4 files changed

+172
-120
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
"vitest": "3.2.4"
6464
},
6565
"resolutions": {
66+
"@graphql-hive/core": "0.16.0-alpha-20251126142740-d265dc6d6d73bf62ec597dbe41409f466b9d9874",
6667
"@graphql-mesh/types": "0.104.13",
6768
"@graphql-mesh/utils": "0.104.13",
6869
"@graphql-tools/delegate": "workspace:^",

packages/runtime/src/createGatewayRuntime.ts

Lines changed: 162 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import {
55
} from '@envelop/core';
66
import { useDisableIntrospection } from '@envelop/disable-introspection';
77
import { useGenericAuth } from '@envelop/generic-auth';
8-
import {
9-
createSchemaFetcher,
10-
createSupergraphSDLFetcher,
11-
} from '@graphql-hive/core';
8+
import { createCDNArtifactFetcher, joinUrl } from '@graphql-hive/core';
129
import { LegacyLogger } from '@graphql-hive/logger';
1310
import type {
1411
OnDelegationPlanHook,
@@ -60,7 +57,12 @@ import { useCSRFPrevention } from '@graphql-yoga/plugin-csrf-prevention';
6057
import { useDeferStream } from '@graphql-yoga/plugin-defer-stream';
6158
import { usePersistedOperations } from '@graphql-yoga/plugin-persisted-operations';
6259
import { AsyncDisposableStack } from '@whatwg-node/disposablestack';
63-
import { handleMaybePromise, MaybePromise } from '@whatwg-node/promise-helpers';
60+
import {
61+
fakePromise,
62+
handleMaybePromise,
63+
MaybePromise,
64+
unfakePromise,
65+
} from '@whatwg-node/promise-helpers';
6466
import { ServerAdapterPlugin } from '@whatwg-node/server';
6567
import { useCookies } from '@whatwg-node/server-plugin-cookies';
6668
import {
@@ -270,7 +272,7 @@ export function createGatewayRuntime<
270272
clearTimeout(currentTimeout);
271273
}
272274
if (pollingInterval) {
273-
currentTimeout = setTimeout(schemaFetcher, pollingInterval);
275+
currentTimeout = setTimeout(schemaFetcher.fetch, pollingInterval);
274276
}
275277
}
276278
function pausePolling() {
@@ -280,35 +282,54 @@ export function createGatewayRuntime<
280282
}
281283
let lastFetchedSdl: string | undefined;
282284
let initialFetch$: MaybePromise<true>;
283-
let schemaFetcher: () => MaybePromise<true>;
285+
let schemaFetcher: {
286+
fetch: () => MaybePromise<true>;
287+
dispose?: () => void | PromiseLike<void>;
288+
};
284289

285290
if (
286291
config.schema &&
287292
typeof config.schema === 'object' &&
288293
'type' in config.schema
289294
) {
290295
// hive cdn
291-
const { endpoint, key } = config.schema;
292-
const fetcher = createSchemaFetcher({
293-
endpoint,
294-
key,
295-
logger: LegacyLogger.from(
296-
configContext.log.child('[hiveSchemaFetcher] '),
297-
),
296+
const { endpoint, key, circuitBreaker } = config.schema;
297+
const endpoints = (Array.isArray(endpoint) ? endpoint : [endpoint]).map(
298+
(url) =>
299+
url.endsWith('/sdl')
300+
? url
301+
: joinUrl(
302+
url.endsWith('/services')
303+
? url.substring(0, url.length - 8)
304+
: url,
305+
'sdl',
306+
),
307+
);
308+
const fetcher = createCDNArtifactFetcher({
309+
endpoint: endpoints as [string, string],
310+
circuitBreaker,
311+
accessKey: key,
312+
logger: configContext.log.child('[hiveSchemaFetcher] '),
298313
});
299-
schemaFetcher = function fetchSchemaFromCDN() {
300-
pausePolling();
301-
initialFetch$ = handleMaybePromise(fetcher, ({ sdl }) => {
302-
if (lastFetchedSdl == null || lastFetchedSdl !== sdl) {
303-
unifiedGraph = buildSchema(sdl, {
304-
assumeValid: true,
305-
assumeValidSDL: true,
306-
});
307-
}
308-
continuePolling();
309-
return true;
310-
});
311-
return initialFetch$;
314+
schemaFetcher = {
315+
fetch: function fetchSchemaFromCDN() {
316+
pausePolling();
317+
initialFetch$ = handleMaybePromise(
318+
fetcher.fetch,
319+
({ contents }): true => {
320+
if (lastFetchedSdl == null || lastFetchedSdl !== contents) {
321+
unifiedGraph = buildSchema(contents, {
322+
assumeValid: true,
323+
assumeValidSDL: true,
324+
});
325+
}
326+
continuePolling();
327+
return true;
328+
},
329+
);
330+
return initialFetch$;
331+
},
332+
dispose: () => fetcher.dispose(),
312333
};
313334
} else if (config.schema) {
314335
// local or remote
@@ -318,60 +339,67 @@ export function createGatewayRuntime<
318339
delete config.pollingInterval;
319340
}
320341

321-
schemaFetcher = function fetchSchema() {
322-
pausePolling();
323-
initialFetch$ = handleMaybePromise(
324-
() =>
325-
handleUnifiedGraphConfig(
326-
// @ts-expect-error TODO: what's up with type narrowing
327-
config.schema,
328-
configContext,
329-
),
330-
(schema) => {
331-
if (isSchema(schema)) {
332-
unifiedGraph = schema;
333-
} else if (isDocumentNode(schema)) {
334-
unifiedGraph = buildASTSchema(schema, {
335-
assumeValid: true,
336-
assumeValidSDL: true,
337-
});
338-
} else {
339-
unifiedGraph = buildSchema(schema, {
340-
noLocation: true,
341-
assumeValid: true,
342-
assumeValidSDL: true,
343-
});
344-
}
345-
continuePolling();
346-
return true;
347-
},
348-
);
349-
return initialFetch$;
342+
schemaFetcher = {
343+
fetch: function fetchSchema() {
344+
pausePolling();
345+
initialFetch$ = handleMaybePromise(
346+
() =>
347+
handleUnifiedGraphConfig(
348+
// @ts-expect-error TODO: what's up with type narrowing
349+
config.schema,
350+
configContext,
351+
),
352+
(schema) => {
353+
if (isSchema(schema)) {
354+
unifiedGraph = schema;
355+
} else if (isDocumentNode(schema)) {
356+
unifiedGraph = buildASTSchema(schema, {
357+
assumeValid: true,
358+
assumeValidSDL: true,
359+
});
360+
} else {
361+
unifiedGraph = buildSchema(schema, {
362+
noLocation: true,
363+
assumeValid: true,
364+
assumeValidSDL: true,
365+
});
366+
}
367+
continuePolling();
368+
return true;
369+
},
370+
);
371+
return initialFetch$;
372+
},
350373
};
351374
} else {
352375
// introspect endpoint
353-
schemaFetcher = function fetchSchemaWithExecutor() {
354-
pausePolling();
355-
return handleMaybePromise(
356-
() =>
357-
schemaFromExecutor(proxyExecutor, configContext, {
358-
assumeValid: true,
359-
}),
360-
(schema) => {
361-
unifiedGraph = schema;
362-
continuePolling();
363-
return true;
364-
},
365-
);
376+
schemaFetcher = {
377+
fetch: function fetchSchemaWithExecutor() {
378+
pausePolling();
379+
return handleMaybePromise(
380+
() =>
381+
schemaFromExecutor(proxyExecutor, configContext, {
382+
assumeValid: true,
383+
}),
384+
(schema) => {
385+
unifiedGraph = schema;
386+
continuePolling();
387+
return true;
388+
},
389+
);
390+
},
366391
};
367392
}
368393

369-
const instrumentedFetcher = schemaFetcher;
370-
schemaFetcher = (...args) =>
371-
getInstrumented(null).asyncFn(
372-
instrumentation?.schema,
373-
instrumentedFetcher,
374-
)(...args);
394+
const instrumentedFetcher = schemaFetcher.fetch;
395+
schemaFetcher = {
396+
...schemaFetcher,
397+
fetch: (...args) =>
398+
getInstrumented(null).asyncFn(
399+
instrumentation?.schema,
400+
instrumentedFetcher,
401+
)(...args),
402+
};
375403

376404
getSchema = () => {
377405
if (unifiedGraph != null) {
@@ -383,22 +411,22 @@ export function createGatewayRuntime<
383411
() => unifiedGraph,
384412
);
385413
}
386-
return handleMaybePromise(schemaFetcher, () => unifiedGraph);
414+
return handleMaybePromise(schemaFetcher.fetch, () => unifiedGraph);
387415
};
388416
const shouldSkipValidation =
389417
'skipValidation' in config ? config.skipValidation : false;
390-
const executorPlugin: GatewayPlugin = {
418+
unifiedGraphPlugin = {
391419
onValidate({ params, setResult }) {
392420
if (shouldSkipValidation || !params.schema) {
393421
setResult([]);
394422
}
395423
},
396-
onDispose() {
424+
async onDispose() {
397425
pausePolling();
398-
return transportExecutorStack.disposeAsync();
426+
await transportExecutorStack.disposeAsync();
427+
return schemaFetcher.dispose?.();
399428
},
400429
};
401-
unifiedGraphPlugin = executorPlugin;
402430
readinessChecker = () =>
403431
handleMaybePromise(
404432
() =>
@@ -412,7 +440,7 @@ export function createGatewayRuntime<
412440
schemaInvalidator = () => {
413441
// @ts-expect-error TODO: this is illegal but somehow we want it
414442
unifiedGraph = undefined;
415-
initialFetch$ = schemaFetcher();
443+
initialFetch$ = schemaFetcher.fetch();
416444
};
417445
} else if ('subgraph' in config) {
418446
const subgraphInConfig = config.subgraph;
@@ -643,40 +671,51 @@ export function createGatewayRuntime<
643671
},
644672
};
645673
} /** 'supergraph' in config */ else {
646-
let unifiedGraphFetcher: (
647-
transportCtx: TransportContext,
648-
) => MaybePromise<UnifiedGraphSchema>;
674+
let unifiedGraphFetcher: {
675+
fetch: (
676+
transportCtx: TransportContext,
677+
) => MaybePromise<UnifiedGraphSchema>;
678+
dispose?: () => void | PromiseLike<void>;
679+
};
680+
649681
if (typeof config.supergraph === 'object' && 'type' in config.supergraph) {
650682
if (config.supergraph.type === 'hive') {
651683
// hive cdn
652-
const { endpoint, key } = config.supergraph;
653-
const fetcher = createSupergraphSDLFetcher({
654-
endpoint,
655-
key,
656-
logger: LegacyLogger.from(
657-
configContext.log.child('[hiveSupergraphFetcher] '),
658-
),
684+
const { endpoint, key, circuitBreaker } = config.supergraph;
685+
const endpoints = (Array.isArray(endpoint) ? endpoint : [endpoint]).map(
686+
(url) => (url.endsWith('/supergraph') ? url : `${url}/supergraph`),
687+
);
659688

689+
const fetcher = createCDNArtifactFetcher({
690+
endpoint: endpoints as [string, string],
691+
accessKey: key,
692+
logger: configContext.log.child('[hiveSupergraphFetcher] '),
660693
// @ts-expect-error - MeshFetch is not compatible with `typeof fetch`
661-
fetchImplementation: configContext.fetch,
662-
694+
fetch: configContext.fetch,
695+
circuitBreaker,
663696
name: 'hive-gateway',
664697
version: globalThis.__VERSION__,
665698
});
666-
unifiedGraphFetcher = () =>
667-
fetcher().then(({ supergraphSdl }) => supergraphSdl);
699+
unifiedGraphFetcher = {
700+
fetch: () => fetcher.fetch().then(({ contents }) => contents),
701+
dispose: () => fetcher.dispose(),
702+
};
668703
} else if (config.supergraph.type === 'graphos') {
669704
const graphosFetcherContainer = createGraphOSFetcher({
670705
graphosOpts: config.supergraph,
671706
configContext,
672707
pollingInterval: config.pollingInterval,
673708
});
674-
unifiedGraphFetcher = graphosFetcherContainer.unifiedGraphFetcher;
709+
unifiedGraphFetcher = {
710+
fetch: graphosFetcherContainer.unifiedGraphFetcher,
711+
};
675712
} else {
676-
unifiedGraphFetcher = () => {
677-
throw new Error(
678-
`Unknown supergraph configuration: ${config.supergraph}`,
679-
);
713+
unifiedGraphFetcher = {
714+
fetch: () => {
715+
throw new Error(
716+
`Unknown supergraph configuration: ${config.supergraph}`,
717+
);
718+
},
680719
};
681720
}
682721
} else {
@@ -691,24 +730,29 @@ export function createGatewayRuntime<
691730
);
692731
}
693732

694-
unifiedGraphFetcher = () =>
695-
handleUnifiedGraphConfig(
696-
// @ts-expect-error TODO: what's up with type narrowing
697-
config.supergraph,
698-
configContext,
699-
);
733+
unifiedGraphFetcher = {
734+
fetch: () =>
735+
handleUnifiedGraphConfig(
736+
// @ts-expect-error TODO: what's up with type narrowing
737+
config.supergraph,
738+
configContext,
739+
),
740+
};
700741
}
701742

702-
const instrumentedGraphFetcher = unifiedGraphFetcher;
703-
unifiedGraphFetcher = (...args) =>
704-
getInstrumented(null).asyncFn(
705-
instrumentation?.schema,
706-
instrumentedGraphFetcher,
707-
)(...args);
743+
const instrumentedGraphFetcher = unifiedGraphFetcher.fetch;
744+
unifiedGraphFetcher = {
745+
...unifiedGraphFetcher,
746+
fetch: (...args) =>
747+
getInstrumented(null).asyncFn(
748+
instrumentation?.schema,
749+
instrumentedGraphFetcher,
750+
)(...args),
751+
};
708752

709753
const unifiedGraphManager = new UnifiedGraphManager<GatewayContext>({
710754
handleUnifiedGraph: config.unifiedGraphHandler,
711-
getUnifiedGraph: unifiedGraphFetcher,
755+
getUnifiedGraph: unifiedGraphFetcher.fetch,
712756
onUnifiedGraphChange(newUnifiedGraph: GraphQLSchema) {
713757
unifiedGraph = newUnifiedGraph;
714758
replaceSchema(newUnifiedGraph);
@@ -758,7 +802,11 @@ export function createGatewayRuntime<
758802
getExecutor = () => unifiedGraphManager.getExecutor();
759803
unifiedGraphPlugin = {
760804
onDispose() {
761-
return dispose(unifiedGraphManager);
805+
return unfakePromise(
806+
fakePromise(undefined)
807+
.then(() => dispose(unifiedGraphManager))
808+
.then(() => unifiedGraphFetcher.dispose?.()),
809+
);
762810
},
763811
};
764812
}

0 commit comments

Comments
 (0)