Skip to content

Commit b55366d

Browse files
committed
exchange: create a providing.Exchange which provides on NotifyNewBlocks.
1 parent 6d74a8f commit b55366d

File tree

10 files changed

+135
-39
lines changed

10 files changed

+135
-39
lines changed

bitswap/benchmarks_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b
438438

439439
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
440440
for _, p := range provs {
441-
if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil {
441+
if err := p.Blockstore.PutMany(context.Background(), blocks); err != nil {
442442
b.Fatal(err)
443443
}
444444
}
@@ -453,10 +453,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
453453
bill := provs[0]
454454
jeff := provs[1]
455455

456-
if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil {
456+
if err := bill.Blockstore.PutMany(context.Background(), blks[:75]); err != nil {
457457
b.Fatal(err)
458458
}
459-
if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil {
459+
if err := jeff.Blockstore.PutMany(context.Background(), blks[25:]); err != nil {
460460
b.Fatal(err)
461461
}
462462
}
@@ -474,12 +474,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
474474
even := i%2 == 0
475475
third := i%3 == 0
476476
if third || even {
477-
if err := bill.Blockstore().Put(context.Background(), blk); err != nil {
477+
if err := bill.Blockstore.Put(context.Background(), blk); err != nil {
478478
b.Fatal(err)
479479
}
480480
}
481481
if third || !even {
482-
if err := jeff.Blockstore().Put(context.Background(), blk); err != nil {
482+
if err := jeff.Blockstore.Put(context.Background(), blk); err != nil {
483483
b.Fatal(err)
484484
}
485485
}
@@ -491,7 +491,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
491491
// but we're mostly just testing performance of the sync algorithm
492492
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
493493
for _, blk := range blks {
494-
err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk)
494+
err := provs[rand.Intn(len(provs))].Blockstore.Put(context.Background(), blk)
495495
if err != nil {
496496
b.Fatal(err)
497497
}

bitswap/bitswap_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func isCI() bool {
3636

3737
func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
3838
t.Helper()
39-
err := inst.Blockstore().Put(ctx, blk)
39+
err := inst.Blockstore.Put(ctx, blk)
4040
if err != nil {
4141
t.Fatal(err)
4242
}
@@ -176,7 +176,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
176176

177177
doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Identity.ID(), bsMessage)
178178

179-
blockInStore, err := doesNotWantBlock.Blockstore().Has(ctx, block.Cid())
179+
blockInStore, err := doesNotWantBlock.Blockstore.Has(ctx, block.Cid())
180180
if err != nil || blockInStore {
181181
t.Fatal("Unwanted block added to block store")
182182
}

bitswap/client/bitswap_with_sessions_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func getVirtualNetwork() tn.Network {
3131

3232
func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
3333
t.Helper()
34-
err := inst.Blockstore().Put(ctx, blk)
34+
err := inst.Blockstore.Put(ctx, blk)
3535
if err != nil {
3636
t.Fatal(err)
3737
}
@@ -61,7 +61,7 @@ func TestBasicSessions(t *testing.T) {
6161
b := inst[1]
6262

6363
// Add a block to Peer B
64-
if err := b.Blockstore().Put(ctx, block); err != nil {
64+
if err := b.Blockstore.Put(ctx, block); err != nil {
6565
t.Fatal(err)
6666
}
6767

@@ -125,7 +125,7 @@ func TestSessionBetweenPeers(t *testing.T) {
125125

126126
// Add 101 blocks to Peer A
127127
blks := random.BlocksOfSize(101, blockSize)
128-
if err := inst[0].Blockstore().PutMany(ctx, blks); err != nil {
128+
if err := inst[0].Blockstore.PutMany(ctx, blks); err != nil {
129129
t.Fatal(err)
130130
}
131131

@@ -186,7 +186,7 @@ func TestSessionSplitFetch(t *testing.T) {
186186
// Add 10 distinct blocks to each of 10 peers
187187
blks := random.BlocksOfSize(100, blockSize)
188188
for i := 0; i < 10; i++ {
189-
if err := inst[i].Blockstore().PutMany(ctx, blks[i*10:(i+1)*10]); err != nil {
189+
if err := inst[i].Blockstore.PutMany(ctx, blks[i*10:(i+1)*10]); err != nil {
190190
t.Fatal(err)
191191
}
192192
}

bitswap/testinstance/testinstance.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
tn "github.com/ipfs/boxo/bitswap/testnet"
1010
blockstore "github.com/ipfs/boxo/blockstore"
1111
mockrouting "github.com/ipfs/boxo/routing/mock"
12+
"github.com/ipfs/go-datastore"
1213
ds "github.com/ipfs/go-datastore"
1314
delayed "github.com/ipfs/go-datastore/delayed"
1415
ds_sync "github.com/ipfs/go-datastore/sync"
@@ -89,18 +90,14 @@ func ConnectInstances(instances []Instance) {
8990
// Instance is a test instance of bitswap + dependencies for integration testing
9091
type Instance struct {
9192
Identity tnet.Identity
93+
Datastore datastore.Batching
9294
Exchange *bitswap.Bitswap
93-
blockstore blockstore.Blockstore
95+
Blockstore blockstore.Blockstore
9496
Adapter bsnet.BitSwapNetwork
9597
Routing routing.Routing
9698
blockstoreDelay delay.D
9799
}
98100

99-
// Blockstore returns the block store for this test instance
100-
func (i *Instance) Blockstore() blockstore.Blockstore {
101-
return i.blockstore
102-
}
103-
104101
// SetBlockstoreLatency customizes the artificial delay on receiving blocks
105102
// from a blockstore test instance.
106103
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
@@ -118,20 +115,22 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p
118115
adapter := net.Adapter(p, netOptions...)
119116
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
120117

118+
ds := ds_sync.MutexWrap(dstore)
121119
bstore, err := blockstore.CachedBlockstore(ctx,
122-
blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)),
120+
blockstore.NewBlockstore(ds),
123121
blockstore.DefaultCacheOpts())
124122
if err != nil {
125123
panic(err.Error()) // FIXME perhaps change signature and return error.
126124
}
127125

128126
bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...)
129127
return Instance{
128+
Datastore: ds,
130129
Adapter: adapter,
131130
Identity: p,
132131
Exchange: bs,
133132
Routing: router,
134-
blockstore: bstore,
133+
Blockstore: bstore,
135134
blockstoreDelay: bsdelay,
136135
}
137136
}

blockservice/test/mock.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService {
1717

1818
var servs []blockservice.BlockService
1919
for _, i := range instances {
20-
servs = append(servs, blockservice.New(i.Blockstore(),
20+
servs = append(servs, blockservice.New(i.Blockstore,
2121
i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...))
2222
}
2323
return servs

exchange/providing/providing.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Package providing implements an exchange wrapper which
2+
// does content providing for new blocks.
3+
package providing
4+
5+
import (
6+
"context"
7+
8+
"github.com/ipfs/boxo/exchange"
9+
"github.com/ipfs/boxo/provider"
10+
blocks "github.com/ipfs/go-block-format"
11+
)
12+
13+
// Exchange is an exchange wrapper that calls ProvideMany for blocks received
14+
// over NotifyNewBlocks.
15+
type Exchange struct {
16+
exchange.Interface
17+
provider provider.Provider
18+
}
19+
20+
// New creates a new providing Exchange with the given exchange and provider.
21+
func New(base exchange.Interface, provider provider.Provider) *Exchange {
22+
return &Exchange{
23+
Interface: base,
24+
provider: provider,
25+
}
26+
}
27+
28+
// NotifyNewBlocks calls provider.ProvideMany.
29+
func (ex *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
30+
for _, b := range blocks {
31+
if err := ex.provider.Provide(ctx, b.Cid(), true); err != nil {
32+
return err
33+
}
34+
}
35+
return nil
36+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package providing
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
8+
tn "github.com/ipfs/boxo/bitswap/testnet"
9+
"github.com/ipfs/boxo/blockservice"
10+
"github.com/ipfs/boxo/provider"
11+
mockrouting "github.com/ipfs/boxo/routing/mock"
12+
delay "github.com/ipfs/go-ipfs-delay"
13+
"github.com/ipfs/go-test/random"
14+
)
15+
16+
func TestExchange(t *testing.T) {
17+
ctx := context.Background()
18+
net := tn.VirtualNetwork(delay.Fixed(0))
19+
routing := mockrouting.NewServer()
20+
sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil)
21+
i := sg.Next()
22+
provFinder := routing.Client(i.Identity)
23+
prov, err := provider.New(i.Datastore,
24+
provider.Online(provFinder),
25+
)
26+
if err != nil {
27+
t.Fatal(err)
28+
}
29+
provExchange := New(i.Exchange, prov)
30+
// write-through so that we notify when re-adding block
31+
bs := blockservice.New(i.Blockstore, provExchange,
32+
blockservice.WriteThrough())
33+
block := random.BlocksOfSize(1, 10)[0]
34+
// put it on the blockstore of the first instance
35+
err = i.Blockstore.Put(ctx, block)
36+
if err != nil {
37+
t.Fatal()
38+
}
39+
40+
providersChan := provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
41+
_, ok := <-providersChan
42+
if ok {
43+
t.Fatal("there should be no providers yet for block")
44+
}
45+
46+
// Now add it via BlockService. It should trigger NotifyNewBlocks
47+
// on the exchange and thus they should get announced.
48+
err = bs.AddBlock(ctx, block)
49+
if err != nil {
50+
t.Fatal()
51+
}
52+
// Trigger reproviding, otherwise it's not really provided.
53+
err = prov.Reprovide(ctx)
54+
if err != nil {
55+
t.Fatal(err)
56+
}
57+
providersChan = provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
58+
_, ok = <-providersChan
59+
if !ok {
60+
t.Fatal("there should be one provider for the block")
61+
}
62+
}

fetcher/helpers/block_visitor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ func TestFetchGraphToBlocks(t *testing.T) {
5454
defer hasBlock.Exchange.Close()
5555

5656
blocks := []blocks.Block{block1, block2, block3, block4}
57-
err := hasBlock.Blockstore().PutMany(bg, blocks)
57+
err := hasBlock.Blockstore.PutMany(bg, blocks)
5858
require.NoError(t, err)
5959
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
6060
require.NoError(t, err)
6161

6262
wantsBlock := peers[1]
6363
defer wantsBlock.Exchange.Close()
6464

65-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
65+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
6666
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
6767
session := fetcherConfig.NewSession(context.Background())
6868
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -104,7 +104,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
104104
hasBlock := peers[0]
105105
defer hasBlock.Exchange.Close()
106106

107-
err := hasBlock.Blockstore().PutMany(bg, []blocks.Block{block1, block2, block3})
107+
err := hasBlock.Blockstore.PutMany(bg, []blocks.Block{block1, block2, block3})
108108
require.NoError(t, err)
109109

110110
err = hasBlock.Exchange.NotifyNewBlocks(bg, block1, block2, block3)
@@ -113,7 +113,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
113113
wantsBlock := peers[1]
114114
defer wantsBlock.Exchange.Close()
115115

116-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
116+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
117117
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
118118
session := fetcherConfig.NewSession(context.Background())
119119
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

fetcher/impl/blockservice/fetcher_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
4747
hasBlock := peers[0]
4848
defer hasBlock.Exchange.Close()
4949

50-
err := hasBlock.Blockstore().Put(bg, block)
50+
err := hasBlock.Blockstore.Put(bg, block)
5151
require.NoError(t, err)
5252

5353
err = hasBlock.Exchange.NotifyNewBlocks(bg, block)
@@ -56,7 +56,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
5656
wantsBlock := peers[1]
5757
defer wantsBlock.Exchange.Close()
5858

59-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
59+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
6060
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
6161
session := fetcherConfig.NewSession(context.Background())
6262

@@ -98,15 +98,15 @@ func TestFetchIPLDGraph(t *testing.T) {
9898
defer hasBlock.Exchange.Close()
9999

100100
blocks := []blocks.Block{block1, block2, block3, block4}
101-
err := hasBlock.Blockstore().PutMany(bg, blocks)
101+
err := hasBlock.Blockstore.PutMany(bg, blocks)
102102
require.NoError(t, err)
103103
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
104104
require.NoError(t, err)
105105

106106
wantsBlock := peers[1]
107107
defer wantsBlock.Exchange.Close()
108108

109-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
109+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
110110
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
111111
session := fetcherConfig.NewSession(context.Background())
112112
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -155,15 +155,15 @@ func TestFetchIPLDPath(t *testing.T) {
155155
defer hasBlock.Exchange.Close()
156156

157157
blocks := []blocks.Block{block1, block2, block3, block4, block5}
158-
err := hasBlock.Blockstore().PutMany(bg, blocks)
158+
err := hasBlock.Blockstore.PutMany(bg, blocks)
159159
require.NoError(t, err)
160160
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
161161
require.NoError(t, err)
162162

163163
wantsBlock := peers[1]
164164
defer wantsBlock.Exchange.Close()
165165

166-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
166+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
167167
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
168168
session := fetcherConfig.NewSession(context.Background())
169169
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -219,15 +219,15 @@ func TestHelpers(t *testing.T) {
219219
defer hasBlock.Exchange.Close()
220220

221221
blocks := []blocks.Block{block1, block2, block3, block4}
222-
err := hasBlock.Blockstore().PutMany(bg, blocks)
222+
err := hasBlock.Blockstore.PutMany(bg, blocks)
223223
require.NoError(t, err)
224224
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
225225
require.NoError(t, err)
226226

227227
wantsBlock := peers[1]
228228
defer wantsBlock.Exchange.Close()
229229

230-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
230+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
231231

232232
t.Run("Block retrieves node", func(t *testing.T) {
233233
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
@@ -334,15 +334,15 @@ func TestNodeReification(t *testing.T) {
334334
defer hasBlock.Exchange.Close()
335335

336336
blocks := []blocks.Block{block2, block3, block4}
337-
err := hasBlock.Blockstore().PutMany(bg, blocks)
337+
err := hasBlock.Blockstore.PutMany(bg, blocks)
338338
require.NoError(t, err)
339339
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
340340
require.NoError(t, err)
341341

342342
wantsBlock := peers[1]
343343
defer wantsBlock.Exchange.Close()
344344

345-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
345+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
346346
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
347347
nodeReifier := func(lnkCtx ipld.LinkContext, nd ipld.Node, ls *ipld.LinkSystem) (ipld.Node, error) {
348348
return &selfLoader{Node: nd, ctx: lnkCtx.Ctx, ls: ls}, nil

0 commit comments

Comments
 (0)