@@ -2,6 +2,8 @@ package main
22
33import (
44 "context"
5+ "github.com/ipfs/boxo/routing/providerquerymanager"
6+ "github.com/libp2p/go-libp2p/core/peerstore"
57 "time"
68
79 "github.com/ipfs/boxo/bitswap"
@@ -23,6 +25,12 @@ import (
2325
2426func setupBitswapExchange (ctx context.Context , cfg Config , h host.Host , cr routing.ContentRouting , bstore blockstore.Blockstore ) exchange.Interface {
2527 bsctx := metri .CtxScope (ctx , "ipfs_bitswap" )
28+ n := & providerQueryNetwork {cr , h }
29+ pqm , err := providerquerymanager .New (ctx , n , providerquerymanager .WithMaxInProcessRequests (100 ))
30+ if err != nil {
31+ panic (err )
32+ }
33+ cr = & wrapProv {pqm : pqm }
2634 bn := bsnet .NewFromIpfsHost (h , cr )
2735
2836 // --- Client Options
@@ -33,6 +41,14 @@ func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routi
3341 // bitswap.ProviderSearchDelay: default is 1 second.
3442 providerSearchDelay := 1 * time .Second
3543
44+ // --- Bitswap Client Options
45+ clientOpts := []bsclient.Option {
46+ bsclient .RebroadcastDelay (rebroadcastDelay ),
47+ bsclient .ProviderSearchDelay (providerSearchDelay ),
48+ bsclient .WithoutDuplicatedBlockStats (),
49+ bsclient .WithDefaultLookupManagement (false ),
50+ }
51+
3652 // If peering and shared cache are both enabled, we initialize both a
3753 // Client and a Server with custom request filter and custom options.
3854 // client+server is more expensive but necessary when deployment requires
@@ -50,37 +66,88 @@ func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routi
5066 return ok
5167 }
5268
53- // Initialize client+server
54- bswap := bitswap .New (bsctx , bn , bstore ,
55- // --- Client Options
56- bitswap .RebroadcastDelay (rebroadcastDelay ),
57- bitswap .ProviderSearchDelay (providerSearchDelay ),
58- bitswap .WithoutDuplicatedBlockStats (),
69+ // turn bitswap clients option into bitswap options
70+ var opts []bitswap.Option
71+ for _ , o := range clientOpts {
72+ opts = append (opts , bitswap .WithClientOption (o ))
73+ }
5974
60- // ---- Server Options
75+ // ---- Server Options
76+ opts = append (opts ,
6177 bitswap .WithPeerBlockRequestFilter (peerBlockRequestFilter ),
6278 bitswap .ProvideEnabled (false ),
6379 // Do not keep track of other peer's wantlists, we only want to reply if we
6480 // have a block. If we get it later, it's no longer relevant.
6581 bitswap .WithPeerLedger (& noopPeerLedger {}),
6682 // When we don't have a block, don't reply. This reduces processment.
67- bitswap .SetSendDontHaves (false ),
68- )
83+ bitswap .SetSendDontHaves (false ))
84+
85+ // Initialize client+server
86+ bswap := bitswap .New (bsctx , bn , bstore , opts ... )
6987 bn .Start (bswap )
7088 return & noNotifyExchange {bswap }
7189 }
7290
7391 // By default, rainbow runs with bitswap client alone
74- bswap := bsclient .New (bsctx , bn , bstore ,
75- // --- Client Options
76- bsclient .RebroadcastDelay (rebroadcastDelay ),
77- bsclient .ProviderSearchDelay (providerSearchDelay ),
78- bsclient .WithoutDuplicatedBlockStats (),
79- )
92+ bswap := bsclient .New (bsctx , bn , bstore , clientOpts ... )
8093 bn .Start (bswap )
8194 return bswap
8295}
8396
97+ type providerQueryNetwork struct {
98+ routing.ContentRouting
99+ host.Host
100+ }
101+
102+ func (p * providerQueryNetwork ) ConnectTo (ctx context.Context , id peer.ID ) error {
103+ return p .Host .Connect (ctx , peer.AddrInfo {ID : id })
104+ }
105+
106+ func (p * providerQueryNetwork ) FindProvidersAsync (ctx context.Context , c cid.Cid , i int ) <- chan peer.ID {
107+ out := make (chan peer.ID , i )
108+ go func () {
109+ defer close (out )
110+ providers := p .ContentRouting .FindProvidersAsync (ctx , c , i )
111+ for info := range providers {
112+ if info .ID == p .Host .ID () {
113+ continue // ignore self as provider
114+ }
115+ p .Host .Peerstore ().AddAddrs (info .ID , info .Addrs , peerstore .TempAddrTTL )
116+ select {
117+ case <- ctx .Done ():
118+ return
119+ case out <- info .ID :
120+ }
121+ }
122+ }()
123+ return out
124+ }
125+
126+ type wrapProv struct {
127+ pqm * providerquerymanager.ProviderQueryManager
128+ }
129+
130+ var _ routing.ContentRouting = (* wrapProv )(nil )
131+
132+ func (r * wrapProv ) Provide (ctx context.Context , c cid.Cid , b bool ) error {
133+ return routing .ErrNotSupported
134+ }
135+
136+ func (r * wrapProv ) FindProvidersAsync (ctx context.Context , c cid.Cid , _ int ) <- chan peer.AddrInfo {
137+ retCh := make (chan peer.AddrInfo )
138+ go func () {
139+ defer close (retCh )
140+ provsCh := r .pqm .FindProvidersAsync (ctx , c )
141+ for p := range provsCh {
142+ select {
143+ case retCh <- peer.AddrInfo {ID : p }:
144+ case <- ctx .Done ():
145+ }
146+ }
147+ }()
148+ return retCh
149+ }
150+
84151type noopPeerLedger struct {}
85152
86153func (* noopPeerLedger ) Wants (p peer.ID , e wl.Entry ) {}
0 commit comments