Skip to content

Commit 37756ce

Browse files
aschmahmannhsanjuangammazero
authored
Bitswap: move providing -> Exchange-layer, providerQueryManager -> routing (#641)
Bitswap: refactor content provider elements This PR performs a rather large and touchy refactor of things related to Content providing and Content discovery previously embedded into Bitswap. The motivations: * Make ProviderQueryManager options configurable * Align and separate coalesced layers: content routing must not be part of bitswap as in the future we will be using different exchanges (bitswap, http) for retrieval and content routing should be above exchange. * Align content routing interfaces with libp2p: to avoid crust, wrappers and user confusion, align Providers and Discovery types to libp2p.ContentRouting. * Reduce duplicated functionality: i.e. code that handles providing in multiple places and fails to take advantage of ProvideMany optimizations. As a result: * ProviderQueryManager is now part of the routing module * A new providing.Exchange has been created * Bitswap initialization params have changed and Bitswap Network doesn't provide anymore (see changelog for more details) Co-authored-by: Hector Sanjuan <[email protected]> Co-authored-by: Andrew Gillis <[email protected]>
1 parent c91cc1d commit 37756ce

File tree

35 files changed

+725
-550
lines changed

35 files changed

+725
-550
lines changed

CHANGELOG.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,52 @@ The following emojis are used to highlight certain changes:
1414

1515
## [Unreleased]
1616

17+
- `bitswap`, `routing`, `exchange` ([#641](https://github.com/ipfs/boxo/pull/641)):
18+
- ✨ Bitswap is no longer in charge of providing blocks to the newtork: providing functionality is now handled by a `exchange/providing.Exchange`, meant to be used with `provider.System` so that all provides follow the same rules (multiple parts of the code where handling provides) before.
19+
- 🛠 `bitswap/client/internal/providerquerymanager` has been moved to `routing/providerquerymanager` where it belongs. In order to keep compatibility, Bitswap now receives a `routing.ContentDiscovery` parameter which implements `FindProvidersAsync(...)` and uses it to create a `providerquerymanager` with the default settings as before. Custom settings can be used by using a custom `providerquerymanager` to manually wrap a `ContentDiscovery` object and pass that in as `ContentDiscovery` on initialization while setting `bitswap.WithDefaultProviderQueryManager(false)` (to avoid re-wrapping it again).
20+
- The renovated `providedQueryManager` will trigger lookups until it manages to connect to `MaxProviders`. Before it would lookup at most `MaxInProcessRequests*MaxProviders` and connection failures may have limited the actual number of providers found.
21+
- 🛠 We have aligned our routing-related interfaces with the libp2p [`routing`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/routing#ContentRouting) ones, including in the `reprovider.System`.
22+
- In order to obtain exactly the same behaviour as before (i.e. particularly ensuring that new blocks are still provided), what was done like:
23+
24+
```go
25+
bswapnet := network.NewFromIpfsHost(host, contentRouter)
26+
bswap := bitswap.New(p.ctx, bswapnet, blockstore)
27+
bserv = blockservice.New(blockstore, bswap)
28+
```
29+
- becomes:
30+
31+
```go
32+
// Create network: no contentRouter anymore
33+
bswapnet := network.NewFromIpfsHost(host)
34+
// Create Bitswap: a new "discovery" parameter, usually the "contentRouter"
35+
// which does both discovery and providing.
36+
bswap := bitswap.New(p.ctx, bswapnet, discovery, blockstore)
37+
// A provider system that handles concurrent provides etc. "contentProvider"
38+
// is usually the "contentRouter" which does both discovery and providing.
39+
// "contentProvider" could be used directly without wrapping, but it is recommended
40+
// to do so to provide more efficiently.
41+
provider := provider.New(datastore, provider.Online(contentProvider)
42+
// A wrapped providing exchange using the previous exchange and the provider.
43+
exch := providing.New(bswap, provider)
44+
45+
// Finally the blockservice
46+
bserv := blockservice.New(blockstore, exch)
47+
...
48+
```
49+
50+
- The above is only necessary if content routing is needed. Otherwise:
51+
52+
```
53+
// Create network: no contentRouter anymore
54+
bswapnet := network.NewFromIpfsHost(host)
55+
// Create Bitswap: a new "discovery" parameter set to nil (disable content discovery)
56+
bswap := bitswap.New(p.ctx, bswapnet, nil, blockstore)
57+
// Finally the blockservice
58+
bserv := blockservice.New(blockstore, exch)
59+
```
60+
61+
62+
1763
### Added
1864

1965
- `routing/http/server`: added built-in Prometheus instrumentation to http delegated `/routing/v1/` endpoints, with custom buckets for response size and duration to match real world data observed at [the `delegated-ipfs.dev` instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing). [#718](https://github.com/ipfs/boxo/pull/718) [#724](https://github.com/ipfs/boxo/pull/724)
@@ -117,6 +163,7 @@ The following emojis are used to highlight certain changes:
117163
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636)
118164
- `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651)
119165
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629)
166+
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.
120167

121168
## [v0.21.0]
122169

bitswap/benchmarks_test.go

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,16 @@ import (
1212
"testing"
1313
"time"
1414

15-
blocks "github.com/ipfs/go-block-format"
16-
"github.com/ipfs/go-test/random"
17-
protocol "github.com/libp2p/go-libp2p/core/protocol"
18-
1915
"github.com/ipfs/boxo/bitswap"
2016
bsnet "github.com/ipfs/boxo/bitswap/network"
2117
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
2218
tn "github.com/ipfs/boxo/bitswap/testnet"
2319
mockrouting "github.com/ipfs/boxo/routing/mock"
20+
blocks "github.com/ipfs/go-block-format"
2421
cid "github.com/ipfs/go-cid"
2522
delay "github.com/ipfs/go-ipfs-delay"
23+
"github.com/ipfs/go-test/random"
24+
protocol "github.com/libp2p/go-libp2p/core/protocol"
2625
)
2726

2827
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
@@ -135,24 +134,25 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
135134
benchmarkLog = nil
136135
fixedDelay := delay.Fixed(10 * time.Millisecond)
137136
bstoreLatency := time.Duration(0)
137+
router := mockrouting.NewServer()
138138

139139
for _, bch := range mixedBenches {
140140
b.Run(bch.name, func(b *testing.B) {
141141
fetcherCount := bch.fetcherCount
142142
oldSeedCount := bch.oldSeedCount
143143
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)
144144

145-
net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
145+
net := tn.VirtualNetwork(fixedDelay)
146146

147147
// Simulate an older Bitswap node (old protocol ID) that doesn't
148148
// send DONT_HAVE responses
149149
oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne}
150150
oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)}
151151
oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)}
152-
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts)
152+
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, oldNetOpts, oldBsOpts)
153153

154154
// Regular new Bitswap node
155-
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil)
155+
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
156156
var instances []testinstance.Instance
157157

158158
// Create new nodes (fetchers + seeds)
@@ -294,9 +294,10 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
294294
numblks := 1000
295295

296296
for i := 0; i < b.N; i++ {
297-
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
297+
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)
298298

299-
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
299+
router := mockrouting.NewServer()
300+
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
300301
defer ig.Close()
301302

302303
instances := ig.Instances(numnodes)
@@ -312,9 +313,9 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
312313

313314
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
314315
for i := 0; i < b.N; i++ {
315-
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
316-
317-
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
316+
net := tn.VirtualNetwork(d)
317+
router := mockrouting.NewServer()
318+
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
318319

319320
instances := ig.Instances(numnodes)
320321
rootBlock := random.BlocksOfSize(1, rootBlockSize)
@@ -327,9 +328,9 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b
327328

328329
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
329330
for i := 0; i < b.N; i++ {
330-
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
331-
332-
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
331+
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)
332+
router := mockrouting.NewServer()
333+
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
333334
defer ig.Close()
334335

335336
instances := ig.Instances(numnodes)
@@ -437,7 +438,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b
437438

438439
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
439440
for _, p := range provs {
440-
if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil {
441+
if err := p.Blockstore.PutMany(context.Background(), blocks); err != nil {
441442
b.Fatal(err)
442443
}
443444
}
@@ -452,10 +453,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
452453
bill := provs[0]
453454
jeff := provs[1]
454455

455-
if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil {
456+
if err := bill.Blockstore.PutMany(context.Background(), blks[:75]); err != nil {
456457
b.Fatal(err)
457458
}
458-
if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil {
459+
if err := jeff.Blockstore.PutMany(context.Background(), blks[25:]); err != nil {
459460
b.Fatal(err)
460461
}
461462
}
@@ -473,12 +474,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
473474
even := i%2 == 0
474475
third := i%3 == 0
475476
if third || even {
476-
if err := bill.Blockstore().Put(context.Background(), blk); err != nil {
477+
if err := bill.Blockstore.Put(context.Background(), blk); err != nil {
477478
b.Fatal(err)
478479
}
479480
}
480481
if third || !even {
481-
if err := jeff.Blockstore().Put(context.Background(), blk); err != nil {
482+
if err := jeff.Blockstore.Put(context.Background(), blk); err != nil {
482483
b.Fatal(err)
483484
}
484485
}
@@ -490,7 +491,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
490491
// but we're mostly just testing performance of the sync algorithm
491492
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
492493
for _, blk := range blks {
493-
err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk)
494+
err := provs[rand.Intn(len(provs))].Blockstore.Put(context.Background(), blk)
494495
if err != nil {
495496
b.Fatal(err)
496497
}

bitswap/bitswap.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66

77
"github.com/ipfs/boxo/bitswap/client"
8-
"github.com/ipfs/boxo/bitswap/internal/defaults"
98
"github.com/ipfs/boxo/bitswap/message"
109
"github.com/ipfs/boxo/bitswap/network"
1110
"github.com/ipfs/boxo/bitswap/server"
@@ -45,9 +44,8 @@ type bitswap interface {
4544
}
4645

4746
var (
48-
_ exchange.SessionExchange = (*Bitswap)(nil)
49-
_ bitswap = (*Bitswap)(nil)
50-
HasBlockBufferSize = defaults.HasBlockBufferSize
47+
_ exchange.SessionExchange = (*Bitswap)(nil)
48+
_ bitswap = (*Bitswap)(nil)
5149
)
5250

5351
type Bitswap struct {
@@ -58,7 +56,7 @@ type Bitswap struct {
5856
net network.BitSwapNetwork
5957
}
6058

61-
func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap {
59+
func New(ctx context.Context, net network.BitSwapNetwork, providerFinder client.ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Bitswap {
6260
bs := &Bitswap{
6361
net: net,
6462
}
@@ -85,14 +83,10 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
8583
serverOptions = append(serverOptions, server.WithTracer(tracer))
8684
}
8785

88-
if HasBlockBufferSize != defaults.HasBlockBufferSize {
89-
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
90-
}
91-
9286
ctx = metrics.CtxSubScope(ctx, "bitswap")
9387

9488
bs.Server = server.New(ctx, net, bstore, serverOptions...)
95-
bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
89+
bs.Client = client.New(ctx, net, providerFinder, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
9690
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once
9791

9892
return bs
@@ -115,7 +109,6 @@ type Stat struct {
115109
MessagesReceived uint64
116110
BlocksSent uint64
117111
DataSent uint64
118-
ProvideBufLen int
119112
}
120113

121114
func (bs *Bitswap) Stat() (*Stat, error) {
@@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
138131
Peers: ss.Peers,
139132
BlocksSent: ss.BlocksSent,
140133
DataSent: ss.DataSent,
141-
ProvideBufLen: ss.ProvideBufLen,
142134
}, nil
143135
}
144136

0 commit comments

Comments
 (0)