-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Amp-powered subgraphs #6218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
isum
wants to merge
40
commits into
master
Choose a base branch
from
ion/amp-powered-subgraphs
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Amp-powered subgraphs #6218
Changes from 20 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
501a914
feat(graph): add Nozzle Flight service client
isum 941f4ec
feat(graph): add Nozzle stream aggregator
isum 08d7a21
feat(graph): add Nozzle data decoder
isum 0dd263b
feat(graph): add SQL query parser, resolver and validator
isum 6429c19
feat(graph): use a new identifier type in Nozzle related modules
isum 0e8314b
feat(graph): add Nozzle Subgraph schema generation
isum 01d2596
feat(graph): add Nozzle Subgraph manifest
isum e7e11f8
feat(graph): add reorg handling to the Nozzle FlightClient
isum be604db
feat(graph, core): extend SubgraphInstanceManager trait
isum 3615301
feat(core, graph, node): allow multiple subgraph instance managers
isum efb7c59
fix(graph): update deterministic error patterns in Nozzle Flight client
isum 5a2c3af
feat(graph): add Nozzle related ENV variables
isum e5b7898
fix(graph): make block range filter return a new query
isum 71829d4
feat(graph): add decoding utilities
isum b0d0bcd
fix(graph): use decoding utilities in the stream aggregator
isum 6a0930e
feat(graph): add more details to Nozzle data sources
isum 19cf6dd
feat(core, graph, node): add Nozzle subgraph deployment
isum 9a661b2
feat(graph): add a dedicated Nozzle manifest resolver
isum f7cc3ba
feat(node): add shutdown token
isum 4c747b9
feat(core, graph): add Nozzle subgraph runner
isum 2df58ab
chore(all): rename Nozzle to Amp
isum 82cf29a
fix(graph): produce consistent query hashes for logging
isum 762e27f
fix(core, graph): simplify SQL query requirements
isum 4cadade
chore(graph): fix typos
isum 4d74833
fix(graph): use nozzle-resume header name
isum 5a8688c
fix(graph): extend common column aliases
isum 3e71ed3
fix(core, graph): use named streams in the stream aggregator
isum a02db82
fix(core, graph): simplify working with identifiers
isum 74c9357
fix(graph): validate query output column names
isum cd8f962
fix(graph): support all versions of the Amp server
isum c3fcb3b
fix(graph): extend the list of common column aliases
isum 2c68f4f
test(graph): add decoder unit-tests
isum c57a959
feat(core, graph): add Amp subgraph metrics
isum eb2bf43
fix(graph): allow more complex dataset and table names
isum e4d71e8
fix(graph): remove CTE name requirements
isum 459028c
fix(graph, node): add option to authenticate Flight service requests
isum b7c720b
fix(graph): update temporary predefined list of source context tables
isum 3ebfc27
docs: add docs for Amp-powered subgraphs
isum 353f6e2
chore(core): reuse existing metric names
isum 2418aa1
fix(core, graph): minor adjustments after rebase
isum File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,5 @@ | ||
| pub mod nozzle_subgraph; | ||
| pub mod polling_monitor; | ||
|
|
||
| mod subgraph; | ||
|
|
||
| pub use crate::subgraph::{ | ||
| SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar, SubgraphRunner, | ||
| SubgraphTriggerProcessor, | ||
| }; | ||
| pub mod subgraph; | ||
| pub mod subgraph_provider; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| use std::sync::Arc; | ||
|
|
||
| use alloy::primitives::BlockNumber; | ||
| use anyhow::Context; | ||
| use async_trait::async_trait; | ||
| use graph::{ | ||
| components::{ | ||
| link_resolver::{LinkResolver, LinkResolverContext}, | ||
| metrics::MetricsRegistry, | ||
| store::{DeploymentLocator, SubgraphStore}, | ||
| subgraph::SubgraphInstanceManager, | ||
| }, | ||
| env::EnvVars, | ||
| log::factory::LoggerFactory, | ||
| nozzle, | ||
| prelude::CheapClone, | ||
| }; | ||
| use slog::{debug, error}; | ||
| use tokio_util::sync::CancellationToken; | ||
|
|
||
| use super::{runner, Metrics, Monitor}; | ||
|
|
||
| /// Manages Nozzle subgraph runner futures. | ||
| /// | ||
| /// Creates and schedules Nozzle subgraph runner futures for execution on demand. | ||
| /// Also handles stopping previously started Nozzle subgraph runners. | ||
| pub struct Manager<SS, NC> { | ||
| logger_factory: LoggerFactory, | ||
| metrics_registry: Arc<MetricsRegistry>, | ||
| env_vars: Arc<EnvVars>, | ||
| monitor: Monitor, | ||
| subgraph_store: Arc<SS>, | ||
| link_resolver: Arc<dyn LinkResolver>, | ||
| nozzle_client: Arc<NC>, | ||
| } | ||
|
|
||
| impl<SS, NC> Manager<SS, NC> | ||
| where | ||
| SS: SubgraphStore, | ||
| NC: nozzle::Client, | ||
| { | ||
| /// Creates a new Nozzle subgraph manager. | ||
| pub fn new( | ||
| logger_factory: &LoggerFactory, | ||
| metrics_registry: Arc<MetricsRegistry>, | ||
| env_vars: Arc<EnvVars>, | ||
| cancel_token: &CancellationToken, | ||
| subgraph_store: Arc<SS>, | ||
| link_resolver: Arc<dyn LinkResolver>, | ||
| nozzle_client: Arc<NC>, | ||
| ) -> Self { | ||
| let logger = logger_factory.component_logger("NozzleSubgraphManager", None); | ||
| let logger_factory = logger_factory.with_parent(logger); | ||
|
|
||
| let monitor = Monitor::new(&logger_factory, cancel_token); | ||
|
|
||
| Self { | ||
| logger_factory, | ||
| metrics_registry, | ||
| env_vars, | ||
| monitor, | ||
| subgraph_store, | ||
| link_resolver, | ||
| nozzle_client, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl<SS, NC> SubgraphInstanceManager for Manager<SS, NC> | ||
| where | ||
| SS: SubgraphStore, | ||
| NC: nozzle::Client + Send + Sync + 'static, | ||
| { | ||
| async fn start_subgraph( | ||
| self: Arc<Self>, | ||
| deployment: DeploymentLocator, | ||
| stop_block: Option<i32>, | ||
| ) { | ||
| let manager = self.cheap_clone(); | ||
|
|
||
| self.monitor.start( | ||
| deployment.cheap_clone(), | ||
| Box::new(move |cancel_token| { | ||
| Box::pin(async move { | ||
| let logger = manager.logger_factory.subgraph_logger(&deployment); | ||
|
|
||
| let store = manager | ||
| .subgraph_store | ||
| .cheap_clone() | ||
| .writable(logger.cheap_clone(), deployment.id, Vec::new().into()) | ||
| .await | ||
| .context("failed to create writable store")?; | ||
|
|
||
| let metrics = Metrics::new( | ||
| &logger, | ||
| manager.metrics_registry.cheap_clone(), | ||
| store.cheap_clone(), | ||
| deployment.hash.cheap_clone(), | ||
| ); | ||
|
|
||
| let link_resolver = manager | ||
| .link_resolver | ||
| .for_manifest(&deployment.hash.to_string()) | ||
| .context("failed to create link resolver")?; | ||
|
|
||
| let manifest_bytes = link_resolver | ||
| .cat( | ||
| &LinkResolverContext::new(&deployment.hash, &logger), | ||
| &deployment.hash.to_ipfs_link(), | ||
| ) | ||
| .await | ||
| .context("failed to load subgraph manifest")?; | ||
|
|
||
| let raw_manifest = serde_yaml::from_slice(&manifest_bytes) | ||
| .context("failed to parse subgraph manifest")?; | ||
|
|
||
| let mut manifest = nozzle::Manifest::resolve::<graph_chain_ethereum::Chain, _>( | ||
| &logger, | ||
| manager.link_resolver.cheap_clone(), | ||
| manager.nozzle_client.cheap_clone(), | ||
| manager.env_vars.max_spec_version.cheap_clone(), | ||
| deployment.hash.cheap_clone(), | ||
| raw_manifest, | ||
| ) | ||
| .await?; | ||
|
|
||
| if let Some(stop_block) = stop_block { | ||
| for data_source in manifest.data_sources.iter_mut() { | ||
| data_source.source.end_block = stop_block as BlockNumber; | ||
| } | ||
| } | ||
|
|
||
| store | ||
| .start_subgraph_deployment(&logger) | ||
| .await | ||
| .context("failed to start subgraph deployment")?; | ||
|
|
||
| let runner_context = runner::Context::new( | ||
| &logger, | ||
| &manager.env_vars.nozzle, | ||
| manager.nozzle_client.cheap_clone(), | ||
| store, | ||
| deployment.hash.cheap_clone(), | ||
| manifest, | ||
| metrics, | ||
| ); | ||
|
|
||
| let runner_result = runner::new_runner(runner_context)(cancel_token).await; | ||
|
|
||
| match manager.subgraph_store.stop_subgraph(&deployment).await { | ||
| Ok(()) => { | ||
| debug!(logger, "Subgraph writer stopped"); | ||
| } | ||
| Err(e) => { | ||
| error!(logger, "Failed to stop subgraph writer"; | ||
| "e" => ?e | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| runner_result | ||
| }) | ||
| }), | ||
| ); | ||
| } | ||
|
|
||
| async fn stop_subgraph(&self, deployment: DeploymentLocator) { | ||
| self.monitor.stop(deployment); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| use std::sync::Arc; | ||
|
|
||
| use graph::{ | ||
| cheap_clone::CheapClone, | ||
| components::{ | ||
| metrics::{stopwatch::StopwatchMetrics, MetricsRegistry}, | ||
| store::WritableStore, | ||
| }, | ||
| data::subgraph::DeploymentHash, | ||
| }; | ||
| use slog::Logger; | ||
|
|
||
| /// Contains deployment specific metrics. | ||
| pub(super) struct Metrics { | ||
| pub(super) stopwatch: StopwatchMetrics, | ||
| } | ||
|
|
||
| impl Metrics { | ||
| /// Creates new deployment specific metrics. | ||
| pub(super) fn new( | ||
| logger: &Logger, | ||
| metrics_registry: Arc<MetricsRegistry>, | ||
| store: Arc<dyn WritableStore>, | ||
| deployment: DeploymentHash, | ||
| ) -> Self { | ||
| let stopwatch = StopwatchMetrics::new( | ||
| logger.cheap_clone(), | ||
| deployment, | ||
| "nozzle-process", | ||
| metrics_registry, | ||
| store.shard().to_string(), | ||
| ); | ||
|
|
||
| Self { stopwatch } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| mod manager; | ||
| mod metrics; | ||
| mod monitor; | ||
| mod runner; | ||
|
|
||
| use self::{metrics::Metrics, monitor::Monitor}; | ||
|
|
||
| pub use self::manager::Manager; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make that
process, it's easier on existing dashboards and it's clear from the manifest what kind of subgraph it is