diff --git a/cmd/ipfs/kubo/daemon.go b/cmd/ipfs/kubo/daemon.go index 7ee5953070c..30e805186c1 100644 --- a/cmd/ipfs/kubo/daemon.go +++ b/cmd/ipfs/kubo/daemon.go @@ -485,6 +485,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment // This should never happen, but better safe than sorry log.Fatal("Private network does not work with Routing.Type=auto. Update your config to Routing.Type=dht (or none, and do manual peering)") } + if cfg.Provider.Strategy.WithDefault("") != "" && cfg.Reprovider.Strategy.IsDefault() { + log.Fatal("Invalid config. Remove unused Provider.Strategy and set Reprovider.Strategy instead. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#reproviderstrategy") + } printLibp2pPorts(node) diff --git a/config/config_test.go b/config/config_test.go index d4f38f08662..16573504370 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -134,9 +134,9 @@ func TestCheckKey(t *testing.T) { t.Fatal("Foo.Bar isn't a valid key in the config") } - err = CheckKey("Provider.Strategy") + err = CheckKey("Reprovider.Strategy") if err != nil { - t.Fatalf("%s: %s", err, "Provider.Strategy is a valid key in the config") + t.Fatalf("%s: %s", err, "Reprovider.Strategy is a valid key in the config") } err = CheckKey("Provider.Foo") diff --git a/config/provider.go b/config/provider.go index f2b5afe05b4..a1c44859889 100644 --- a/config/provider.go +++ b/config/provider.go @@ -1,5 +1,12 @@ package config +const ( + DefaultProviderWorkerCount = 64 +) + +// Provider configuration describes how NEW CIDs are announced the moment they are created. +// For periodical reprovide configuration, see Reprovider.* type Provider struct { - Strategy string // Which keys to announce + Strategy *OptionalString `json:",omitempty"` // Unused, you are likely looking for Reprovider.Strategy instead + WorkerCount *OptionalInteger `json:",omitempty"` // Number of concurrent provides allowed, 0 means unlimited } diff --git a/config/reprovider.go b/config/reprovider.go index dae9ae6dee9..3e8a5b476ca 100644 --- a/config/reprovider.go +++ b/config/reprovider.go @@ -7,6 +7,8 @@ const ( DefaultReproviderStrategy = "all" ) +// Reprovider configuration describes how CID from local datastore are periodically re-announced to routing systems. +// For provide behavior of ad-hoc or newly created CIDs and their first-time announcement, see Provider.* type Reprovider struct { Interval *OptionalDuration `json:",omitempty"` // Time period to reprovide locally stored objects to the network Strategy *OptionalString `json:",omitempty"` // Which keys to announce diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index 2dda639f545..d8b4c408380 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -184,6 +184,7 @@ func TestCommands(t *testing.T) { "/stats/bw", "/stats/dht", "/stats/provide", + "/stats/reprovide", "/stats/repo", "/swarm", "/swarm/addrs", diff --git a/core/commands/stat.go b/core/commands/stat.go index 2632d6aa257..2b4485a9513 100644 --- a/core/commands/stat.go +++ b/core/commands/stat.go @@ -27,11 +27,12 @@ for your IPFS node.`, }, Subcommands: map[string]*cmds.Command{ - "bw": statBwCmd, - "repo": repoStatCmd, - "bitswap": bitswapStatCmd, - "dht": statDhtCmd, - "provide": statProvideCmd, + "bw": statBwCmd, + "repo": repoStatCmd, + "bitswap": bitswapStatCmd, + "dht": statDhtCmd, + "provide": statProvideCmd, + "reprovide": statReprovideCmd, }, } diff --git a/core/commands/stat_provide.go b/core/commands/stat_provide.go index 7a54a79e4f1..ef06d8e2828 100644 --- a/core/commands/stat_provide.go +++ b/core/commands/stat_provide.go @@ -4,28 +4,20 @@ import ( "fmt" "io" "text/tabwriter" - "time" - humanize "github.com/dustin/go-humanize" - "github.com/ipfs/boxo/provider" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/libp2p/go-libp2p-kad-dht/fullrt" - "golang.org/x/exp/constraints" ) -type reprovideStats struct { - provider.ReproviderStats - fullRT bool -} - var statProvideCmd = &cmds.Command{ + Status: cmds.Deprecated, Helptext: cmds.HelpText{ - Tagline: "Returns statistics about the node's (re)provider system.", + Tagline: "Deprecated command, use 'ipfs stats reprovide' instead.", ShortDescription: ` -Returns statistics about the content the node is advertising. - -This interface is not stable and may change from release to release. +'ipfs stats provide' is deprecated because provide and reprovide operations +are now distinct. This command may be replaced by provide only stats in the +future. `, }, Arguments: []cmds.Argument{}, @@ -57,8 +49,8 @@ This interface is not stable and may change from release to release. wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0) defer wtr.Flush() - fmt.Fprintf(wtr, "TotalReprovides:\t%s\n", humanNumber(s.TotalReprovides)) - fmt.Fprintf(wtr, "AvgReprovideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration)) + fmt.Fprintf(wtr, "TotalProvides:\t%s\n", humanNumber(s.TotalReprovides)) + fmt.Fprintf(wtr, "AvgProvideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration)) fmt.Fprintf(wtr, "LastReprovideDuration:\t%s\n", humanDuration(s.LastReprovideDuration)) if !s.LastRun.IsZero() { fmt.Fprintf(wtr, "LastRun:\t%s\n", humanTime(s.LastRun)) @@ -71,30 +63,3 @@ This interface is not stable and may change from release to release. }, Type: reprovideStats{}, } - -func humanDuration(val time.Duration) string { - return val.Truncate(time.Microsecond).String() -} - -func humanTime(val time.Time) string { - return val.Format("2006-01-02 15:04:05") -} - -func humanNumber[T constraints.Float | constraints.Integer](n T) string { - nf := float64(n) - str := humanSI(nf, 0) - fullStr := humanFull(nf, 0) - if str != fullStr { - return fmt.Sprintf("%s\t(%s)", str, fullStr) - } - return str -} - -func humanSI(val float64, decimals int) string { - v, unit := humanize.ComputeSI(val) - return fmt.Sprintf("%s%s", humanFull(v, decimals), unit) -} - -func humanFull(val float64, decimals int) string { - return humanize.CommafWithDigits(val, decimals) -} diff --git a/core/commands/stat_reprovide.go b/core/commands/stat_reprovide.go new file mode 100644 index 00000000000..10dbc727d38 --- /dev/null +++ b/core/commands/stat_reprovide.go @@ -0,0 +1,104 @@ +package commands + +import ( + "fmt" + "io" + "text/tabwriter" + "time" + + humanize "github.com/dustin/go-humanize" + "github.com/ipfs/boxo/provider" + cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/ipfs/kubo/core/commands/cmdenv" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" + "golang.org/x/exp/constraints" +) + +type reprovideStats struct { + provider.ReproviderStats + fullRT bool +} + +var statReprovideCmd = &cmds.Command{ + Status: cmds.Experimental, + Helptext: cmds.HelpText{ + Tagline: "Returns statistics about the node's reprovider system.", + ShortDescription: ` +Returns statistics about the content the node is reproviding every +Reprovider.Interval according to Reprovider.Strategy: +https://github.com/ipfs/kubo/blob/master/docs/config.md#reprovider + +This interface is not stable and may change from release to release. + +`, + }, + Arguments: []cmds.Argument{}, + Options: []cmds.Option{}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + stats, err := nd.Provider.Stat() + if err != nil { + return err + } + _, fullRT := nd.DHTClient.(*fullrt.FullRT) + + if err := res.Emit(reprovideStats{stats, fullRT}); err != nil { + return err + } + + return nil + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s reprovideStats) error { + wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0) + defer wtr.Flush() + + fmt.Fprintf(wtr, "TotalReprovides:\t%s\n", humanNumber(s.TotalReprovides)) + fmt.Fprintf(wtr, "AvgReprovideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration)) + fmt.Fprintf(wtr, "LastReprovideDuration:\t%s\n", humanDuration(s.LastReprovideDuration)) + if !s.LastRun.IsZero() { + fmt.Fprintf(wtr, "LastReprovide:\t%s\n", humanTime(s.LastRun)) + if s.fullRT { + fmt.Fprintf(wtr, "NextReprovide:\t%s\n", humanTime(s.LastRun.Add(s.ReprovideInterval))) + } + } + return nil + }), + }, + Type: reprovideStats{}, +} + +func humanDuration(val time.Duration) string { + return val.Truncate(time.Microsecond).String() +} + +func humanTime(val time.Time) string { + return val.Format("2006-01-02 15:04:05") +} + +func humanNumber[T constraints.Float | constraints.Integer](n T) string { + nf := float64(n) + str := humanSI(nf, 0) + fullStr := humanFull(nf, 0) + if str != fullStr { + return fmt.Sprintf("%s\t(%s)", str, fullStr) + } + return str +} + +func humanSI(val float64, decimals int) string { + v, unit := humanize.ComputeSI(val) + return fmt.Sprintf("%s%s", humanFull(v, decimals), unit) +} + +func humanFull(val float64, decimals int) string { + return humanize.CommafWithDigits(val, decimals) +} diff --git a/core/node/groups.go b/core/node/groups.go index 4a471f17038..0e28444be95 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -359,6 +359,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy), cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval), cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient), + int(cfg.Provider.WorkerCount.WithDefault(config.DefaultProviderWorkerCount)), ), ) } diff --git a/core/node/provider.go b/core/node/provider.go index 4638aad4dc5..d0081eb0aec 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -21,12 +21,13 @@ import ( // and in 'ipfs stats provide' report. const sampledBatchSize = 1000 -func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option { +func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) { opts := []provider.Option{ provider.Online(cr), provider.ReproviderInterval(reprovideInterval), provider.KeyProvider(keyProvider), + provider.ProvideWorkerCount(provideWorkerCount), } if !acceleratedDHTClient && reprovideInterval > 0 { // The estimation kinda suck if you are running with accelerated DHT client, @@ -131,7 +132,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online -func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option { +func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { if useStrategicProviding { return OfflineProviders() } @@ -146,7 +147,7 @@ func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, repro return fx.Options( keyProvider, - ProviderSys(reprovideInterval, acceleratedDHTClient), + ProviderSys(reprovideInterval, acceleratedDHTClient, provideWorkerCount), ) } @@ -169,7 +170,6 @@ func mfsProvider(mfsRoot *mfs.Root, fetcher fetcher.Factory) provider.KeyChanFun kcf := provider.NewDAGProvider(rootNode.Cid(), fetcher) return kcf(ctx) } - } func mfsRootProvider(mfsRoot *mfs.Root) provider.KeyChanFunc { diff --git a/docs/changelogs/v0.35.md b/docs/changelogs/v0.35.md index d383fafad26..e20d4759cb0 100644 --- a/docs/changelogs/v0.35.md +++ b/docs/changelogs/v0.35.md @@ -17,6 +17,8 @@ This release was brought to you by the [Shipyard](http://ipshipyard.com/) team. - [New `ipfs add` Options](#new-ipfs-add-options) - [Persistent `Import.*` Configuration](#persistent-import-configuration) - [Updated Configuration Profiles](#updated-configuration-profiles) + - [Optimized, dedicated queue for providing fresh CIDs](#optimized-dedicated-queue-for-providing-fresh-cids) + - [Deprecated `ipfs stats provider`](#deprecated-ipfs-stats-provider) - [๐Ÿ“ฆ๏ธ Important dependency updates](#-important-dependency-updates) - [๐Ÿ“ Changelog](#-changelog) - [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) @@ -81,6 +83,36 @@ The release updated configuration [profiles](https://github.com/ipfs/kubo/blob/m > [!TIP] > Apply one of CIDv1 test [profiles](https://github.com/ipfs/kubo/blob/master/docs/config.md#profiles) with `ipfs config profile apply test-cid-v1[-wide]`. +#### Optimized, dedicated queue for providing fresh CIDs + +From `kubo` [`v0.33.0`](https://github.com/ipfs/kubo/releases/tag/v0.33.0), +Bitswap stopped advertising newly added and received blocks to the DHT. Since +then `boxo/provider` is responsible for the provide and reprovide logic. Prior +to `v0.35.0`, provides and reprovides were handled together in batches, leading +to delays in initial advertisements (provides). + +Provides and Reprovides now have separate queues, allowing for immediate +provide of new CIDs and optimised batching of reprovides. + +This change introduces a new configuration option for limiting the number of +concurrent provide operations: +[`Provider.WorkerCount`](https://github.com/ipfs/kubo/blob/master/docs/config.md#providerworkercount). + +> [!TIP] +> Users who need to provide large volumes of content immediately should consider removing the cap on concurrent provide operations and also set `Routing.AcceleratedDHTClient` to `true`. + +##### Deprecated `ipfs stats provider` + +Since the `ipfs stats provider` command was displaying statistics for both +provides and reprovides, this command isn't relevant anymore after separating +the two queues. + +The successor command is `ipfs stats reprovide`, showing the same statistics, +but for reprovides only. + +> [!NOTE] +> `ipfs stats provider` still works, but is marked as deprecated and will be removed in a future release. Be mindful that the command provides only statistics about reprovides (similar to `ipfs stats reprovide`) and not the new provide queue (this will be fixed as a part of wider refactor planned for a future release). + #### ๐Ÿ“ฆ๏ธ Important dependency updates - update `boxo` to [v0.30.0](https://github.com/ipfs/boxo/releases/tag/v0.30.0) diff --git a/docs/config.md b/docs/config.md index 3724487d96b..d812f82714d 100644 --- a/docs/config.md +++ b/docs/config.md @@ -105,6 +105,9 @@ config file at runtime. - [`Pinning.RemoteServices: Policies.MFS.Enabled`](#pinningremoteservices-policiesmfsenabled) - [`Pinning.RemoteServices: Policies.MFS.PinName`](#pinningremoteservices-policiesmfspinname) - [`Pinning.RemoteServices: Policies.MFS.RepinInterval`](#pinningremoteservices-policiesmfsrepininterval) + - [`Provider`](#provider) + - [`Provider.Strategy`](#providerstrategy) + - [`Provider.WorkerCount`](#providerworkercount) - [`Pubsub`](#pubsub) - [`Pubsub.Enabled`](#pubsubenabled) - [`Pubsub.Router`](#pubsubrouter) @@ -207,6 +210,7 @@ config file at runtime. - [`announce-on` profile](#announce-on-profile) - [`legacy-cid-v0` profile](#legacy-cid-v0-profile) - [`test-cid-v1` profile](#test-cid-v1-profile) + - [`test-cid-v1-wide` profile](#test-cid-v1-wide-profile) - [Types](#types) - [`flag`](#flag) - [`priority`](#priority) @@ -1404,6 +1408,39 @@ Default: `"5m"` Type: `duration` +## `Provider` + +Configuration applied to the initial one-time announcement of fresh CIDs +created with `ipfs add`, `ipfs files`, `ipfs dag import`, `ipfs block|dag put` +commands. + +For periodical DHT reprovide settings, see [`Reprovide.*`](#reprovider). + +### `Provider.Strategy` + +Legacy, not used at the moment, see [`Reprovider.Strategy`](#reproviderstrategy) instead. + +### `Provider.WorkerCount` + +Sets the maximum number of _concurrent_ DHT provide operations. DHT reprovides +operations do **not** count against that limit. A value of `0` allows an +unlimited number of provide workers. + +If the [accelerated DHT client](#routingaccelerateddhtclient) is enabled, each +provide operation opens ~20 connections in parallel. With the standard DHT +client (accelerated disabled), each provide opens between 20 and 60 +connections, with at most 10 active at once. Provides complete more quickly +when using the accelerated client. Be mindful of how many simultaneous +connections this setting can generate. + +For nodes without strict connection limits that need to provide large volumes +of content immediately, we recommend enabling the `Routing.AcceleratedDHTClient` and +setting `Provider.WorkerCount` to `0` (unlimited). + +Default: `64` + +Type: `integer` (non-negative; `0` means unlimited number of workers) + ## `Pubsub` **DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717) diff --git a/test/sharness/t0002-docker-image.sh b/test/sharness/t0002-docker-image.sh index 8812c277a74..81bb8d4493f 100755 --- a/test/sharness/t0002-docker-image.sh +++ b/test/sharness/t0002-docker-image.sh @@ -36,7 +36,7 @@ test_expect_success "docker image build succeeds" ' ' test_expect_success "write init scripts" ' - echo "ipfs config Provider.Strategy Bar" > 001.sh && + echo "ipfs config Mounts.IPFS Bar" > 001.sh && echo "ipfs config Pubsub.Router Qux" > 002.sh && chmod +x 002.sh ' @@ -65,7 +65,7 @@ test_expect_success "check that init scripts were run correctly and in the corre test_expect_success "check that init script configs were applied" ' echo Bar > expected && - docker exec "$DOC_ID" ipfs config Provider.Strategy > actual && + docker exec "$DOC_ID" ipfs config Mounts.IPFS > actual && test_cmp actual expected && echo Qux > expected && docker exec "$DOC_ID" ipfs config Pubsub.Router > actual && diff --git a/test/sharness/t0070-user-config.sh b/test/sharness/t0070-user-config.sh index 1dc4c0369a0..5a8180c7317 100755 --- a/test/sharness/t0070-user-config.sh +++ b/test/sharness/t0070-user-config.sh @@ -11,12 +11,12 @@ test_description="Test user-provided config values" test_init_ipfs test_expect_success "bootstrap doesn't overwrite user-provided config keys (top-level)" ' - ipfs config Provider.Strategy >previous && - ipfs config Provider.Strategy foo && + ipfs config Identity.PeerID >previous && + ipfs config Identity.PeerID foo && ipfs bootstrap rm --all && echo "foo" >expected && - ipfs config Provider.Strategy >actual && - ipfs config Provider.Strategy $(cat previous) && + ipfs config Identity.PeerID >actual && + ipfs config Identity.PeerID $(cat previous) && test_cmp expected actual '