Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/ipfs/kubo/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
// This should never happen, but better safe than sorry
log.Fatal("Private network does not work with Routing.Type=auto. Update your config to Routing.Type=dht (or none, and do manual peering)")
}
if cfg.Provider.Strategy.WithDefault("") != "" && cfg.Reprovider.Strategy.IsDefault() {
log.Fatal("Invalid config. Remove unused Provider.Strategy and set Reprovider.Strategy instead. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#reproviderstrategy")
}

printLibp2pPorts(node)

Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestCheckKey(t *testing.T) {
t.Fatal("Foo.Bar isn't a valid key in the config")
}

err = CheckKey("Provider.Strategy")
err = CheckKey("Reprovider.Strategy")
if err != nil {
t.Fatalf("%s: %s", err, "Provider.Strategy is a valid key in the config")
t.Fatalf("%s: %s", err, "Reprovider.Strategy is a valid key in the config")
}

err = CheckKey("Provider.Foo")
Expand Down
9 changes: 8 additions & 1 deletion config/provider.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package config

const (
DefaultProviderWorkerCount = 64
)

// Provider configuration describes how NEW CIDs are announced the moment they are created.
// For periodical reprovide configuration, see Reprovider.*
type Provider struct {
Strategy string // Which keys to announce
Strategy *OptionalString `json:",omitempty"` // Unused, you are likely looking for Reprovider.Strategy instead
WorkerCount *OptionalInteger `json:",omitempty"` // Number of concurrent provides allowed, 0 means unlimited
}
2 changes: 2 additions & 0 deletions config/reprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const (
DefaultReproviderStrategy = "all"
)

// Reprovider configuration describes how CID from local datastore are periodically re-announced to routing systems.
// For provide behavior of ad-hoc or newly created CIDs and their first-time announcement, see Provider.*
type Reprovider struct {
Interval *OptionalDuration `json:",omitempty"` // Time period to reprovide locally stored objects to the network
Strategy *OptionalString `json:",omitempty"` // Which keys to announce
Expand Down
1 change: 1 addition & 0 deletions core/commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func TestCommands(t *testing.T) {
"/stats/bw",
"/stats/dht",
"/stats/provide",
"/stats/reprovide",
"/stats/repo",
"/swarm",
"/swarm/addrs",
Expand Down
11 changes: 6 additions & 5 deletions core/commands/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ for your IPFS node.`,
},

Subcommands: map[string]*cmds.Command{
"bw": statBwCmd,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
"bw": statBwCmd,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
"reprovide": statReprovideCmd,
},
}

Expand Down
49 changes: 7 additions & 42 deletions core/commands/stat_provide.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: renaming field names here is a breaking change to existing command.

It will break compatibility with older RPC clients, libraries, and will also break user's scripts and automations.

Mind adding a note about this to the Changelog?

Since we have two queues, and they behave differently, I wonder if we should have ipfs stat provide and ipfs stat reprovide? (e.g. if we make breaking change anyway, any reason to not rename current provide to reprovide?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked ipfs stats provide as deprecated, and renamed it to ipfs stats reprovide.

ipfs stats provide shows the same stats as ipfs stats reprovide but with the old names (only information related to reprovides, because we don't track provides anymore), so it shouldn't break existing scripts.

Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,20 @@ import (
"fmt"
"io"
"text/tabwriter"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/ipfs/boxo/provider"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"golang.org/x/exp/constraints"
)

type reprovideStats struct {
provider.ReproviderStats
fullRT bool
}

var statProvideCmd = &cmds.Command{
Status: cmds.Deprecated,
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's (re)provider system.",
Tagline: "Deprecated command, use 'ipfs stats reprovide' instead.",
ShortDescription: `
Returns statistics about the content the node is advertising.

This interface is not stable and may change from release to release.
'ipfs stats provide' is deprecated because provide and reprovide operations
are now distinct. This command may be replaced by provide only stats in the
future.
`,
},
Arguments: []cmds.Argument{},
Expand Down Expand Up @@ -57,8 +49,8 @@ This interface is not stable and may change from release to release.
wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
defer wtr.Flush()

fmt.Fprintf(wtr, "TotalReprovides:\t%s\n", humanNumber(s.TotalReprovides))
fmt.Fprintf(wtr, "AvgReprovideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration))
fmt.Fprintf(wtr, "TotalProvides:\t%s\n", humanNumber(s.TotalReprovides))
fmt.Fprintf(wtr, "AvgProvideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration))
Comment on lines +52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the printed names correct? Seems like they should match the variable names, even if that causes a breaking change.

Copy link
Member

@lidel lidel Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we would be making a breaking change – see #10779 (comment)

Safer to keep stats provide as-is now and mark it as deprecated + add new stats reprovide which has new labels.

We will remove / change stats provide in future release.

fmt.Fprintf(wtr, "LastReprovideDuration:\t%s\n", humanDuration(s.LastReprovideDuration))
if !s.LastRun.IsZero() {
fmt.Fprintf(wtr, "LastRun:\t%s\n", humanTime(s.LastRun))
Expand All @@ -71,30 +63,3 @@ This interface is not stable and may change from release to release.
},
Type: reprovideStats{},
}

func humanDuration(val time.Duration) string {
return val.Truncate(time.Microsecond).String()
}

func humanTime(val time.Time) string {
return val.Format("2006-01-02 15:04:05")
}

func humanNumber[T constraints.Float | constraints.Integer](n T) string {
nf := float64(n)
str := humanSI(nf, 0)
fullStr := humanFull(nf, 0)
if str != fullStr {
return fmt.Sprintf("%s\t(%s)", str, fullStr)
}
return str
}

func humanSI(val float64, decimals int) string {
v, unit := humanize.ComputeSI(val)
return fmt.Sprintf("%s%s", humanFull(v, decimals), unit)
}

func humanFull(val float64, decimals int) string {
return humanize.CommafWithDigits(val, decimals)
}
104 changes: 104 additions & 0 deletions core/commands/stat_reprovide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package commands

import (
"fmt"
"io"
"text/tabwriter"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/ipfs/boxo/provider"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"golang.org/x/exp/constraints"
)

type reprovideStats struct {
provider.ReproviderStats
fullRT bool
}

var statReprovideCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's reprovider system.",
ShortDescription: `
Returns statistics about the content the node is reproviding every
Reprovider.Interval according to Reprovider.Strategy:
https://github.com/ipfs/kubo/blob/master/docs/config.md#reprovider

This interface is not stable and may change from release to release.

`,
},
Arguments: []cmds.Argument{},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if !nd.IsOnline {
return ErrNotOnline
}

stats, err := nd.Provider.Stat()
if err != nil {
return err
}
_, fullRT := nd.DHTClient.(*fullrt.FullRT)

if err := res.Emit(reprovideStats{stats, fullRT}); err != nil {
return err
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s reprovideStats) error {
wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
defer wtr.Flush()

fmt.Fprintf(wtr, "TotalReprovides:\t%s\n", humanNumber(s.TotalReprovides))
fmt.Fprintf(wtr, "AvgReprovideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration))
fmt.Fprintf(wtr, "LastReprovideDuration:\t%s\n", humanDuration(s.LastReprovideDuration))
if !s.LastRun.IsZero() {
fmt.Fprintf(wtr, "LastReprovide:\t%s\n", humanTime(s.LastRun))
if s.fullRT {
fmt.Fprintf(wtr, "NextReprovide:\t%s\n", humanTime(s.LastRun.Add(s.ReprovideInterval)))
}
}
return nil
}),
},
Type: reprovideStats{},
}

func humanDuration(val time.Duration) string {
return val.Truncate(time.Microsecond).String()
}

func humanTime(val time.Time) string {
return val.Format("2006-01-02 15:04:05")
}

func humanNumber[T constraints.Float | constraints.Integer](n T) string {
nf := float64(n)
str := humanSI(nf, 0)
fullStr := humanFull(nf, 0)
if str != fullStr {
return fmt.Sprintf("%s\t(%s)", str, fullStr)
}
return str
}

func humanSI(val float64, decimals int) string {
v, unit := humanize.ComputeSI(val)
return fmt.Sprintf("%s%s", humanFull(v, decimals), unit)
}

func humanFull(val float64, decimals int) string {
return humanize.CommafWithDigits(val, decimals)
}
1 change: 1 addition & 0 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy),
cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval),
cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient),
int(cfg.Provider.WorkerCount.WithDefault(config.DefaultProviderWorkerCount)),
),
)
}
Expand Down
8 changes: 4 additions & 4 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
// and in 'ipfs stats provide' report.
const sampledBatchSize = 1000

func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option {
func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) {
opts := []provider.Option{
provider.Online(cr),
provider.ReproviderInterval(reprovideInterval),
provider.KeyProvider(keyProvider),
provider.ProvideWorkerCount(provideWorkerCount),
}
if !acceleratedDHTClient && reprovideInterval > 0 {
// The estimation kinda suck if you are running with accelerated DHT client,
Expand Down Expand Up @@ -131,7 +132,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli
// ONLINE/OFFLINE

// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option {
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
if useStrategicProviding {
return OfflineProviders()
}
Expand All @@ -146,7 +147,7 @@ func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, repro

return fx.Options(
keyProvider,
ProviderSys(reprovideInterval, acceleratedDHTClient),
ProviderSys(reprovideInterval, acceleratedDHTClient, provideWorkerCount),
)
}

Expand All @@ -169,7 +170,6 @@ func mfsProvider(mfsRoot *mfs.Root, fetcher fetcher.Factory) provider.KeyChanFun
kcf := provider.NewDAGProvider(rootNode.Cid(), fetcher)
return kcf(ctx)
}

}

func mfsRootProvider(mfsRoot *mfs.Root) provider.KeyChanFunc {
Expand Down
32 changes: 32 additions & 0 deletions docs/changelogs/v0.35.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ This release was brought to you by the [Shipyard](http://ipshipyard.com/) team.
- [New `ipfs add` Options](#new-ipfs-add-options)
- [Persistent `Import.*` Configuration](#persistent-import-configuration)
- [Updated Configuration Profiles](#updated-configuration-profiles)
- [Optimized, dedicated queue for providing fresh CIDs](#optimized-dedicated-queue-for-providing-fresh-cids)
- [Deprecated `ipfs stats provider`](#deprecated-ipfs-stats-provider)
- [📦️ Important dependency updates](#-important-dependency-updates)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)
Expand Down Expand Up @@ -81,6 +83,36 @@ The release updated configuration [profiles](https://github.com/ipfs/kubo/blob/m
> [!TIP]
> Apply one of CIDv1 test [profiles](https://github.com/ipfs/kubo/blob/master/docs/config.md#profiles) with `ipfs config profile apply test-cid-v1[-wide]`.

#### Optimized, dedicated queue for providing fresh CIDs

From `kubo` [`v0.33.0`](https://github.com/ipfs/kubo/releases/tag/v0.33.0),
Bitswap stopped advertising newly added and received blocks to the DHT. Since
then `boxo/provider` is responsible for the provide and reprovide logic. Prior
to `v0.35.0`, provides and reprovides were handled together in batches, leading
to delays in initial advertisements (provides).

Provides and Reprovides now have separate queues, allowing for immediate
provide of new CIDs and optimised batching of reprovides.

This change introduces a new configuration option for limiting the number of
concurrent provide operations:
[`Provider.WorkerCount`](https://github.com/ipfs/kubo/blob/master/docs/config.md#providerworkercount).

> [!TIP]
> Users who need to provide large volumes of content immediately should consider removing the cap on concurrent provide operations and also set `Routing.AcceleratedDHTClient` to `true`.

##### Deprecated `ipfs stats provider`

Since the `ipfs stats provider` command was displaying statistics for both
provides and reprovides, this command isn't relevant anymore after separating
the two queues.

The successor command is `ipfs stats reprovide`, showing the same statistics,
but for reprovides only.

> [!NOTE]
> `ipfs stats provider` still works, but is marked as deprecated and will be removed in a future release. Be mindful that the command provides only statistics about reprovides (similar to `ipfs stats reprovide`) and not the new provide queue (this will be fixed as a part of wider refactor planned for a future release).

#### 📦️ Important dependency updates

- update `boxo` to [v0.30.0](https://github.com/ipfs/boxo/releases/tag/v0.30.0)
Expand Down
Loading
Loading