Skip to content

Commit 94d755b

Browse files
committed
Remove Provider from Blockservice. Revert Session changes.
1 parent da1e2d2 commit 94d755b

File tree

3 files changed

+110
-151
lines changed

3 files changed

+110
-151
lines changed

blockservice/blockservice.go

Lines changed: 63 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
"github.com/ipfs/boxo/blockstore"
1515
"github.com/ipfs/boxo/exchange"
16-
"github.com/ipfs/boxo/provider"
1716
"github.com/ipfs/boxo/verifcid"
1817
blocks "github.com/ipfs/go-block-format"
1918
"github.com/ipfs/go-cid"
@@ -74,21 +73,10 @@ type BoundedBlockService interface {
7473

7574
var _ BoundedBlockService = (*blockService)(nil)
7675

77-
// ProvidingBlockService is a Blockservice which provides new blocks to a provider.
78-
type ProvidingBlockService interface {
79-
BlockService
80-
81-
// Provider can return nil, then no provider is used.
82-
Provider() provider.Provider
83-
}
84-
85-
var _ ProvidingBlockService = (*blockService)(nil)
86-
8776
type blockService struct {
8877
allowlist verifcid.Allowlist
8978
blockstore blockstore.Blockstore
9079
exchange exchange.Interface
91-
provider provider.Provider
9280
// If checkFirst is true then first check that a block doesn't
9381
// already exist to avoid republishing the block on the exchange.
9482
checkFirst bool
@@ -111,13 +99,6 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option {
11199
}
112100
}
113101

114-
// WithProvider allows to advertise anything that is added through the blockservice.
115-
func WithProvider(prov provider.Provider) Option {
116-
return func(bs *blockService) {
117-
bs.provider = prov
118-
}
119-
}
120-
121102
// New creates a BlockService with given datastore instance.
122103
func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService {
123104
if exchange == nil {
@@ -140,11 +121,6 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option)
140121

141122
// Blockstore returns the blockstore behind this blockservice.
142123
func (s *blockService) Blockstore() blockstore.Blockstore {
143-
if s.provider != nil {
144-
// FIXME: this is a hack remove once ipfs/boxo#567 is solved.
145-
return providingBlockstore{s.blockstore, s.provider}
146-
}
147-
148124
return s.blockstore
149125
}
150126

@@ -157,13 +133,23 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
157133
return s.allowlist
158134
}
159135

160-
func (s *blockService) Provider() provider.Provider {
161-
return s.provider
136+
// NewSession creates a new session that allows for
137+
// controlled exchange of wantlists to decrease the bandwidth overhead.
138+
// If the current exchange is a SessionExchange, a new exchange
139+
// session will be created. Otherwise, the current exchange will be used
140+
// directly.
141+
// Sessions are lazily setup, this is cheap.
142+
func NewSession(ctx context.Context, bs BlockService) *Session {
143+
ses := grabSessionFromContext(ctx, bs)
144+
if ses != nil {
145+
return ses
146+
}
147+
148+
return newSession(ctx, bs)
162149
}
163150

164-
// NewSession creates a new session that allows for controlled exchange of
165-
// wantlists to decrease the bandwidth overhead.
166-
func NewSession(ctx context.Context, bs BlockService) *Session {
151+
// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
152+
func newSession(ctx context.Context, bs BlockService) *Session {
167153
return &Session{bs: bs, sesctx: ctx}
168154
}
169155

@@ -183,7 +169,7 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
183169
}
184170
}
185171

186-
if err = s.blockstore.Put(ctx, o); err != nil {
172+
if err := s.blockstore.Put(ctx, o); err != nil {
187173
return err
188174
}
189175

@@ -194,11 +180,6 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
194180
logger.Errorf("NotifyNewBlocks: %s", err.Error())
195181
}
196182
}
197-
if s.provider != nil {
198-
if err := s.provider.Provide(ctx, o.Cid(), true); err != nil {
199-
logger.Errorf("Provide: %s", err.Error())
200-
}
201-
}
202183

203184
return nil
204185
}
@@ -245,19 +226,16 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
245226
logger.Errorf("NotifyNewBlocks: %s", err.Error())
246227
}
247228
}
248-
if s.provider != nil {
249-
for _, o := range toput {
250-
if err := s.provider.Provide(ctx, o.Cid(), true); err != nil {
251-
logger.Errorf("Provide: %s", err.Error())
252-
}
253-
}
254-
}
255229
return nil
256230
}
257231

258232
// GetBlock retrieves a particular block from the service,
259233
// Getting it from the datastore using the key (hash).
260234
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
235+
if ses := grabSessionFromContext(ctx, s); ses != nil {
236+
return ses.GetBlock(ctx, c)
237+
}
238+
261239
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
262240
defer span.End()
263241

@@ -275,7 +253,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
275253
return nil, err
276254
}
277255

278-
provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs)
256+
blockstore := bs.Blockstore()
279257

280258
block, err := blockstore.Get(ctx, c)
281259
switch {
@@ -309,12 +287,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
309287
return nil, err
310288
}
311289
}
312-
if provider != nil {
313-
err = provider.Provide(ctx, blk.Cid(), true)
314-
if err != nil {
315-
return nil, err
316-
}
317-
}
318290
logger.Debugf("BlockService.BlockFetched %s", c)
319291
return blk, nil
320292
}
@@ -323,6 +295,10 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
323295
// the returned channel.
324296
// NB: No guarantees are made about order.
325297
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
298+
if ses := grabSessionFromContext(ctx, s); ses != nil {
299+
return ses.GetBlocks(ctx, ks)
300+
}
301+
326302
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
327303
defer span.End()
328304

@@ -360,7 +336,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
360336
ks = ks2
361337
}
362338

363-
provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice)
339+
bs := blockservice.Blockstore()
364340

365341
var misses []cid.Cid
366342
for _, c := range ks {
@@ -419,14 +395,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
419395
cache[0] = nil // early gc
420396
}
421397

422-
if provider != nil {
423-
err = provider.Provide(ctx, b.Cid(), true)
424-
if err != nil {
425-
logger.Errorf("could not tell the provider about new blocks: %s", err)
426-
return
427-
}
428-
}
429-
430398
select {
431399
case out <- b:
432400
case <-ctx.Done():
@@ -506,21 +474,47 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo
506474

507475
var _ BlockGetter = (*Session)(nil)
508476

477+
// ContextWithSession is a helper which creates a context with an embded session,
478+
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
479+
// will be redirected to this same session instead.
480+
// Sessions are lazily setup, this is cheap.
481+
// It wont make a new session if one exists already in the context.
482+
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
483+
if grabSessionFromContext(ctx, bs) != nil {
484+
return ctx
485+
}
486+
return EmbedSessionInContext(ctx, newSession(ctx, bs))
487+
}
488+
489+
// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
490+
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
491+
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
492+
return context.WithValue(ctx, ses.bs, ses)
493+
}
494+
495+
// grabSessionFromContext returns nil if the session was not found
496+
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
497+
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
498+
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
499+
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
500+
s := ctx.Value(bs)
501+
if s == nil {
502+
return nil
503+
}
504+
505+
ss, ok := s.(*Session)
506+
if !ok {
507+
// idk what to do here, that kinda sucks, giveup
508+
return nil
509+
}
510+
511+
return ss
512+
}
513+
509514
// grabAllowlistFromBlockservice never returns nil
510515
func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
511516
if bbs, ok := bs.(BoundedBlockService); ok {
512517
return bbs.Allowlist()
513518
}
514519
return verifcid.DefaultAllowlist
515520
}
516-
517-
// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used.
518-
func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) {
519-
if bbs, ok := bs.(*blockService); ok {
520-
return bbs.provider, bbs.blockstore
521-
}
522-
if bbs, ok := bs.(ProvidingBlockService); ok {
523-
return bbs.Provider(), bbs.Blockstore()
524-
}
525-
return nil, bs.Blockstore()
526-
}

blockservice/blockservice_test.go

Lines changed: 46 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -288,102 +288,67 @@ func TestAllowlist(t *testing.T) {
288288
check(NewSession(ctx, blockservice).GetBlock)
289289
}
290290

291-
type wrappedBlockservice struct {
292-
BlockService
291+
type fakeIsNewSessionCreateExchange struct {
292+
ses exchange.Fetcher
293+
newSessionWasCalled bool
293294
}
294295

295-
type mockProvider []cid.Cid
296+
var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil)
296297

297-
func (p *mockProvider) Provide(ctx context.Context, c cid.Cid, announce bool) error {
298-
*p = append(*p, c)
298+
func (*fakeIsNewSessionCreateExchange) Close() error {
299299
return nil
300300
}
301301

302-
func TestProviding(t *testing.T) {
303-
t.Parallel()
304-
a := assert.New(t)
305-
306-
ctx, cancel := context.WithCancel(context.Background())
307-
defer cancel()
302+
func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
303+
panic("should call on the session")
304+
}
308305

309-
blocks := random.BlocksOfSize(12, blockSize)
306+
func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
307+
panic("should call on the session")
308+
}
310309

311-
exchange := blockstore.NewBlockstore(ds.NewMapDatastore())
310+
func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher {
311+
f.newSessionWasCalled = true
312+
return f.ses
313+
}
312314

313-
prov := mockProvider{}
314-
blockservice := New(blockstore.NewBlockstore(ds.NewMapDatastore()), offline.Exchange(exchange), WithProvider(&prov))
315-
var added []cid.Cid
315+
func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error {
316+
return nil
317+
}
316318

317-
// Adding one block provide it.
318-
a.NoError(blockservice.AddBlock(ctx, blocks[0]))
319-
added = append(added, blocks[0].Cid())
320-
blocks = blocks[1:]
319+
func TestContextSession(t *testing.T) {
320+
t.Parallel()
321+
a := assert.New(t)
321322

322-
// Adding multiple blocks provide them.
323-
a.NoError(blockservice.AddBlocks(ctx, blocks[0:2]))
324-
added = append(added, blocks[0].Cid(), blocks[1].Cid())
325-
blocks = blocks[2:]
323+
ctx, cancel := context.WithCancel(context.Background())
324+
defer cancel()
326325

327-
// Downloading one block provide it.
328-
a.NoError(exchange.Put(ctx, blocks[0]))
329-
_, err := blockservice.GetBlock(ctx, blocks[0].Cid())
330-
a.NoError(err)
331-
added = append(added, blocks[0].Cid())
332-
blocks = blocks[1:]
333-
334-
// Downloading multiple blocks provide them.
335-
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
336-
cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
337-
var got []cid.Cid
338-
for b := range blockservice.GetBlocks(ctx, cids) {
339-
got = append(got, b.Cid())
340-
}
341-
added = append(added, cids...)
342-
a.ElementsMatch(cids, got)
343-
blocks = blocks[2:]
326+
blks := random.BlocksOfSize(2, blockSize)
327+
block1 := blks[0]
328+
block2 := blks[1]
344329

345-
session := NewSession(ctx, blockservice)
330+
bs := blockstore.NewBlockstore(ds.NewMapDatastore())
331+
a.NoError(bs.Put(ctx, block1))
332+
a.NoError(bs.Put(ctx, block2))
333+
sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)}
346334

347-
// Downloading one block over a session provide it.
348-
a.NoError(exchange.Put(ctx, blocks[0]))
349-
_, err = session.GetBlock(ctx, blocks[0].Cid())
350-
a.NoError(err)
351-
added = append(added, blocks[0].Cid())
352-
blocks = blocks[1:]
353-
354-
// Downloading multiple blocks over a session provide them.
355-
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
356-
cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
357-
got = nil
358-
for b := range session.GetBlocks(ctx, cids) {
359-
got = append(got, b.Cid())
360-
}
361-
a.ElementsMatch(cids, got)
362-
added = append(added, cids...)
363-
blocks = blocks[2:]
335+
service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx)
364336

365-
// Test wrapping the blockservice like nopfs does.
366-
session = NewSession(ctx, wrappedBlockservice{blockservice})
337+
ctx = ContextWithSession(ctx, service)
367338

368-
// Downloading one block over a wrapped blockservice session provide it.
369-
a.NoError(exchange.Put(ctx, blocks[0]))
370-
_, err = session.GetBlock(ctx, blocks[0].Cid())
339+
b, err := service.GetBlock(ctx, block1.Cid())
371340
a.NoError(err)
372-
added = append(added, blocks[0].Cid())
373-
blocks = blocks[1:]
374-
375-
// Downloading multiple blocks over a wrapped blockservice session provide them.
376-
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
377-
cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
378-
got = nil
379-
for b := range session.GetBlocks(ctx, cids) {
380-
got = append(got, b.Cid())
381-
}
382-
a.ElementsMatch(cids, got)
383-
added = append(added, cids...)
384-
blocks = blocks[2:]
385-
386-
a.Empty(blocks)
387-
388-
a.ElementsMatch(added, []cid.Cid(prov))
341+
a.Equal(b.RawData(), block1.RawData())
342+
a.True(sesEx.newSessionWasCalled, "new session from context should be created")
343+
sesEx.newSessionWasCalled = false
344+
345+
bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()})
346+
a.Equal((<-bchan).RawData(), block2.RawData())
347+
a.False(sesEx.newSessionWasCalled, "session should be reused in context")
348+
349+
a.Equal(
350+
NewSession(ctx, service),
351+
NewSession(ContextWithSession(ctx, service), service),
352+
"session must be deduped in all invocations on the same context",
353+
)
389354
}

0 commit comments

Comments
 (0)