Skip to content

Commit da1e2d2

Browse files
committed
exchange: create a providing.Exchange which provides on NotifyNewBlocks.
1 parent 6d74a8f commit da1e2d2

File tree

7 files changed

+123
-27
lines changed

7 files changed

+123
-27
lines changed

bitswap/testinstance/testinstance.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
tn "github.com/ipfs/boxo/bitswap/testnet"
1010
blockstore "github.com/ipfs/boxo/blockstore"
1111
mockrouting "github.com/ipfs/boxo/routing/mock"
12+
"github.com/ipfs/go-datastore"
1213
ds "github.com/ipfs/go-datastore"
1314
delayed "github.com/ipfs/go-datastore/delayed"
1415
ds_sync "github.com/ipfs/go-datastore/sync"
@@ -89,18 +90,14 @@ func ConnectInstances(instances []Instance) {
8990
// Instance is a test instance of bitswap + dependencies for integration testing
9091
type Instance struct {
9192
Identity tnet.Identity
93+
Datastore datastore.Batching
9294
Exchange *bitswap.Bitswap
93-
blockstore blockstore.Blockstore
95+
Blockstore blockstore.Blockstore
9496
Adapter bsnet.BitSwapNetwork
9597
Routing routing.Routing
9698
blockstoreDelay delay.D
9799
}
98100

99-
// Blockstore returns the block store for this test instance
100-
func (i *Instance) Blockstore() blockstore.Blockstore {
101-
return i.blockstore
102-
}
103-
104101
// SetBlockstoreLatency customizes the artificial delay on receiving blocks
105102
// from a blockstore test instance.
106103
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
@@ -118,20 +115,22 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p
118115
adapter := net.Adapter(p, netOptions...)
119116
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
120117

118+
ds := ds_sync.MutexWrap(dstore)
121119
bstore, err := blockstore.CachedBlockstore(ctx,
122-
blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)),
120+
blockstore.NewBlockstore(ds),
123121
blockstore.DefaultCacheOpts())
124122
if err != nil {
125123
panic(err.Error()) // FIXME perhaps change signature and return error.
126124
}
127125

128126
bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...)
129127
return Instance{
128+
Datastore: ds,
130129
Adapter: adapter,
131130
Identity: p,
132131
Exchange: bs,
133132
Routing: router,
134-
blockstore: bstore,
133+
Blockstore: bstore,
135134
blockstoreDelay: bsdelay,
136135
}
137136
}

blockservice/test/mock.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService {
1717

1818
var servs []blockservice.BlockService
1919
for _, i := range instances {
20-
servs = append(servs, blockservice.New(i.Blockstore(),
20+
servs = append(servs, blockservice.New(i.Blockstore,
2121
i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...))
2222
}
2323
return servs

exchange/providing/providing.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Package providing implements an exchange wrapper which
2+
// does content providing for new blocks.
3+
package providing
4+
5+
import (
6+
"context"
7+
8+
"github.com/ipfs/boxo/exchange"
9+
"github.com/ipfs/boxo/provider"
10+
blocks "github.com/ipfs/go-block-format"
11+
)
12+
13+
// Exchange is an exchange wrapper that calls ProvideMany for blocks received
14+
// over NotifyNewBlocks.
15+
type Exchange struct {
16+
exchange.Interface
17+
provider provider.Provider
18+
}
19+
20+
// New creates a new providing Exchange with the given exchange and provider.
21+
func New(base exchange.Interface, provider provider.Provider) *Exchange {
22+
return &Exchange{
23+
Interface: base,
24+
provider: provider,
25+
}
26+
}
27+
28+
// NotifyNewBlocks calls provider.ProvideMany.
29+
func (ex *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
30+
for _, b := range blocks {
31+
if err := ex.provider.Provide(ctx, b.Cid(), true); err != nil {
32+
return err
33+
}
34+
}
35+
return nil
36+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package providing
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
8+
tn "github.com/ipfs/boxo/bitswap/testnet"
9+
"github.com/ipfs/boxo/blockservice"
10+
"github.com/ipfs/boxo/provider"
11+
mockrouting "github.com/ipfs/boxo/routing/mock"
12+
delay "github.com/ipfs/go-ipfs-delay"
13+
"github.com/ipfs/go-test/random"
14+
)
15+
16+
func TestExchange(t *testing.T) {
17+
ctx := context.Background()
18+
net := tn.VirtualNetwork(delay.Fixed(0))
19+
routing := mockrouting.NewServer()
20+
sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil)
21+
i := sg.Next()
22+
provFinder := routing.Client(i.Identity)
23+
prov, err := provider.New(i.Datastore,
24+
provider.Online(provFinder),
25+
)
26+
if err != nil {
27+
t.Fatal(err)
28+
}
29+
provExchange := New(i.Exchange, prov)
30+
// write-through so that we notify when re-adding block
31+
bs := blockservice.New(i.Blockstore, provExchange,
32+
blockservice.WriteThrough())
33+
block := random.BlocksOfSize(1, 10)[0]
34+
// put it on the blockstore of the first instance
35+
err = i.Blockstore.Put(ctx, block)
36+
if err != nil {
37+
t.Fatal()
38+
}
39+
40+
providersChan := provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
41+
_, ok := <-providersChan
42+
if ok {
43+
t.Fatal("there should be no providers yet for block")
44+
}
45+
46+
// Now add it via BlockService. It should trigger NotifyNewBlocks
47+
// on the exchange and thus they should get announced.
48+
err = bs.AddBlock(ctx, block)
49+
if err != nil {
50+
t.Fatal()
51+
}
52+
// Trigger reproviding, otherwise it's not really provided.
53+
err = prov.Reprovide(ctx)
54+
if err != nil {
55+
t.Fatal(err)
56+
}
57+
providersChan = provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
58+
_, ok = <-providersChan
59+
if !ok {
60+
t.Fatal("there should be one provider for the block")
61+
}
62+
}

fetcher/helpers/block_visitor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ func TestFetchGraphToBlocks(t *testing.T) {
5454
defer hasBlock.Exchange.Close()
5555

5656
blocks := []blocks.Block{block1, block2, block3, block4}
57-
err := hasBlock.Blockstore().PutMany(bg, blocks)
57+
err := hasBlock.Blockstore.PutMany(bg, blocks)
5858
require.NoError(t, err)
5959
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
6060
require.NoError(t, err)
6161

6262
wantsBlock := peers[1]
6363
defer wantsBlock.Exchange.Close()
6464

65-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
65+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
6666
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
6767
session := fetcherConfig.NewSession(context.Background())
6868
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -104,7 +104,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
104104
hasBlock := peers[0]
105105
defer hasBlock.Exchange.Close()
106106

107-
err := hasBlock.Blockstore().PutMany(bg, []blocks.Block{block1, block2, block3})
107+
err := hasBlock.Blockstore.PutMany(bg, []blocks.Block{block1, block2, block3})
108108
require.NoError(t, err)
109109

110110
err = hasBlock.Exchange.NotifyNewBlocks(bg, block1, block2, block3)
@@ -113,7 +113,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
113113
wantsBlock := peers[1]
114114
defer wantsBlock.Exchange.Close()
115115

116-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
116+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
117117
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
118118
session := fetcherConfig.NewSession(context.Background())
119119
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

fetcher/impl/blockservice/fetcher_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
4747
hasBlock := peers[0]
4848
defer hasBlock.Exchange.Close()
4949

50-
err := hasBlock.Blockstore().Put(bg, block)
50+
err := hasBlock.Blockstore.Put(bg, block)
5151
require.NoError(t, err)
5252

5353
err = hasBlock.Exchange.NotifyNewBlocks(bg, block)
@@ -56,7 +56,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
5656
wantsBlock := peers[1]
5757
defer wantsBlock.Exchange.Close()
5858

59-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
59+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
6060
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
6161
session := fetcherConfig.NewSession(context.Background())
6262

@@ -98,15 +98,15 @@ func TestFetchIPLDGraph(t *testing.T) {
9898
defer hasBlock.Exchange.Close()
9999

100100
blocks := []blocks.Block{block1, block2, block3, block4}
101-
err := hasBlock.Blockstore().PutMany(bg, blocks)
101+
err := hasBlock.Blockstore.PutMany(bg, blocks)
102102
require.NoError(t, err)
103103
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
104104
require.NoError(t, err)
105105

106106
wantsBlock := peers[1]
107107
defer wantsBlock.Exchange.Close()
108108

109-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
109+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
110110
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
111111
session := fetcherConfig.NewSession(context.Background())
112112
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -155,15 +155,15 @@ func TestFetchIPLDPath(t *testing.T) {
155155
defer hasBlock.Exchange.Close()
156156

157157
blocks := []blocks.Block{block1, block2, block3, block4, block5}
158-
err := hasBlock.Blockstore().PutMany(bg, blocks)
158+
err := hasBlock.Blockstore.PutMany(bg, blocks)
159159
require.NoError(t, err)
160160
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
161161
require.NoError(t, err)
162162

163163
wantsBlock := peers[1]
164164
defer wantsBlock.Exchange.Close()
165165

166-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
166+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
167167
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
168168
session := fetcherConfig.NewSession(context.Background())
169169
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -219,15 +219,15 @@ func TestHelpers(t *testing.T) {
219219
defer hasBlock.Exchange.Close()
220220

221221
blocks := []blocks.Block{block1, block2, block3, block4}
222-
err := hasBlock.Blockstore().PutMany(bg, blocks)
222+
err := hasBlock.Blockstore.PutMany(bg, blocks)
223223
require.NoError(t, err)
224224
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
225225
require.NoError(t, err)
226226

227227
wantsBlock := peers[1]
228228
defer wantsBlock.Exchange.Close()
229229

230-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
230+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
231231

232232
t.Run("Block retrieves node", func(t *testing.T) {
233233
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
@@ -334,15 +334,15 @@ func TestNodeReification(t *testing.T) {
334334
defer hasBlock.Exchange.Close()
335335

336336
blocks := []blocks.Block{block2, block3, block4}
337-
err := hasBlock.Blockstore().PutMany(bg, blocks)
337+
err := hasBlock.Blockstore.PutMany(bg, blocks)
338338
require.NoError(t, err)
339339
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
340340
require.NoError(t, err)
341341

342342
wantsBlock := peers[1]
343343
defer wantsBlock.Exchange.Close()
344344

345-
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
345+
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
346346
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
347347
nodeReifier := func(lnkCtx ipld.LinkContext, nd ipld.Node, ls *ipld.LinkSystem) (ipld.Node, error) {
348348
return &selfLoader{Node: nd, ctx: lnkCtx.Ctx, ls: ls}, nil

routing/mock/centralized_server.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error {
3939
rs.lock.Lock()
4040
defer rs.lock.Unlock()
4141

42-
k := c.KeyString()
42+
k := c.Hash().String()
4343

4444
_, ok := rs.providers[k]
4545
if !ok {
@@ -54,16 +54,16 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error {
5454

5555
func (rs *s) Providers(c cid.Cid) []peer.AddrInfo {
5656
rs.delayConf.Query.Wait() // before locking
57-
5857
rs.lock.RLock()
5958
defer rs.lock.RUnlock()
60-
k := c.KeyString()
59+
k := c.Hash().String()
6160

6261
var ret []peer.AddrInfo
6362
records, ok := rs.providers[k]
6463
if !ok {
6564
return ret
6665
}
66+
6767
for _, r := range records {
6868
if time.Since(r.Created) > rs.delayConf.ValueVisibility.Get() {
6969
ret = append(ret, r.Peer)
@@ -74,7 +74,6 @@ func (rs *s) Providers(c cid.Cid) []peer.AddrInfo {
7474
j := rand.Intn(i + 1)
7575
ret[i], ret[j] = ret[j], ret[i]
7676
}
77-
7877
return ret
7978
}
8079

0 commit comments

Comments
 (0)