Skip to content

Commit 3001245

Browse files
authored
feat: Blobs are automatically replicated at 3 (#135)
Closes #119 Notably, while the original issue was titled "Replicate at end of guppy upload", I've opted to replicate at the end of each blob add and not all at once at the end. I think that's the right call? Happy to talk it out if there's disagreement, though. #### PR Dependency Tree * **PR #134** * **PR #135** 👈 * **PR #136** * **PR #137** * **PR #138** * **PR #139** * **PR #140** This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal)
1 parent b259c6a commit 3001245

File tree

9 files changed

+426
-57
lines changed

9 files changed

+426
-57
lines changed

pkg/client/client.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,34 @@ func invokeAndExecute[Caveats, Out any](
123123
successType schema.Type,
124124
options ...delegation.Option,
125125
) (result.Result[Out, failure.IPLDBuilderFailure], fx.Effects, error) {
126+
inv, err := invoke[Caveats, Out](c, capParser, with, caveats, options...)
127+
if err != nil {
128+
return nil, nil, fmt.Errorf("invoking `%s`: %w", capParser.Can(), err)
129+
}
130+
return execute[Caveats, Out](ctx, c, capParser, inv, successType)
131+
}
132+
133+
func invoke[Caveats, Out any](
134+
c *Client,
135+
capParser validator.CapabilityParser[Caveats],
136+
with ucan.Resource,
137+
caveats Caveats,
138+
options ...delegation.Option,
139+
) (invocation.IssuedInvocation, error) {
126140
pfs := make([]delegation.Proof, 0, len(c.Proofs()))
127141
for _, del := range c.Proofs() {
128142
pfs = append(pfs, delegation.FromDelegation(del))
129143
}
144+
return capParser.Invoke(c.Issuer(), c.Connection().ID(), with, caveats, append(options, delegation.WithProof(pfs...))...)
145+
}
130146

131-
inv, err := capParser.Invoke(c.Issuer(), c.Connection().ID(), with, caveats, append(options, delegation.WithProof(pfs...))...)
132-
if err != nil {
133-
return nil, nil, fmt.Errorf("generating invocation: %w", err)
134-
}
135-
147+
func execute[Caveats, Out any](
148+
ctx context.Context,
149+
c *Client,
150+
capParser validator.CapabilityParser[Caveats],
151+
inv invocation.IssuedInvocation,
152+
successType schema.Type,
153+
) (result.Result[Out, failure.IPLDBuilderFailure], fx.Effects, error) {
136154
resp, err := uclient.Execute(ctx, []invocation.Invocation{inv}, c.Connection())
137155
if err != nil {
138156
return nil, nil, fmt.Errorf("sending invocation: %w", err)

pkg/client/requestaccess_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func TestRequestAccess(t *testing.T) {
4949
c := uhelpers.Must(client.NewClient(client.WithConnection(connection)))
5050

5151
authOk, err := c.RequestAccess(testContext(t), "did:mailto:example.com:alice")
52+
require.NoError(t, err)
5253

5354
require.Len(t, invokedInvocations, 1, "expected exactly one invocation to be invoked")
5455
invocation := invokedInvocations[0]

pkg/client/spaceblobreplicate.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
spaceblobcap "github.com/storacha/go-libstoracha/capabilities/space/blob"
8+
"github.com/storacha/go-libstoracha/capabilities/types"
9+
"github.com/storacha/go-ucanto/core/delegation"
10+
"github.com/storacha/go-ucanto/core/receipt/fx"
11+
"github.com/storacha/go-ucanto/core/result"
12+
"github.com/storacha/go-ucanto/did"
13+
)
14+
15+
func (c *Client) SpaceBlobReplicate(ctx context.Context, space did.DID, blob types.Blob, replicaCount uint, locationCommitment delegation.Delegation) (spaceblobcap.ReplicateOk, fx.Effects, error) {
16+
caveats := spaceblobcap.ReplicateCaveats{
17+
Blob: blob,
18+
Replicas: replicaCount,
19+
Site: locationCommitment.Link(),
20+
}
21+
22+
inv, err := invoke[spaceblobcap.ReplicateCaveats, spaceblobcap.ReplicateOk](
23+
c,
24+
spaceblobcap.Replicate,
25+
space.String(),
26+
caveats,
27+
)
28+
if err != nil {
29+
return spaceblobcap.ReplicateOk{}, nil, fmt.Errorf("invoking `space/blob/replicate`: %w", err)
30+
}
31+
for b, err := range locationCommitment.Blocks() {
32+
if err != nil {
33+
return spaceblobcap.ReplicateOk{}, nil, fmt.Errorf("getting block from location commitment: %w", err)
34+
}
35+
inv.Attach(b)
36+
}
37+
38+
res, fx, err := execute[spaceblobcap.ReplicateCaveats, spaceblobcap.ReplicateOk](
39+
ctx,
40+
c,
41+
spaceblobcap.Replicate,
42+
inv,
43+
spaceblobcap.ReplicateOkType(),
44+
)
45+
if err != nil {
46+
return spaceblobcap.ReplicateOk{}, nil, fmt.Errorf("executing `space/blob/replicate`: %w", err)
47+
}
48+
49+
replicateOk, failErr := result.Unwrap(res)
50+
if failErr != nil {
51+
return spaceblobcap.ReplicateOk{}, nil, fmt.Errorf("`space/blob/replicate` failed: %w", failErr)
52+
}
53+
54+
return replicateOk, fx, nil
55+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package client_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/ipfs/go-cid"
9+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
10+
"github.com/multiformats/go-multihash"
11+
spaceblobcap "github.com/storacha/go-libstoracha/capabilities/space/blob"
12+
"github.com/storacha/go-libstoracha/capabilities/types"
13+
libtestutil "github.com/storacha/go-libstoracha/testutil"
14+
"github.com/storacha/go-ucanto/core/dag/blockstore"
15+
"github.com/storacha/go-ucanto/core/invocation"
16+
"github.com/storacha/go-ucanto/core/receipt/fx"
17+
"github.com/storacha/go-ucanto/core/result"
18+
"github.com/storacha/go-ucanto/core/result/failure"
19+
ed25519signer "github.com/storacha/go-ucanto/principal/ed25519/signer"
20+
"github.com/storacha/go-ucanto/server"
21+
uhelpers "github.com/storacha/go-ucanto/testing/helpers"
22+
"github.com/storacha/go-ucanto/ucan"
23+
"github.com/storacha/guppy/pkg/client"
24+
"github.com/storacha/guppy/pkg/client/testutil"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestSpaceBlobReplicate(t *testing.T) {
29+
t.Run("invokes `space/blob/replicate`", func(t *testing.T) {
30+
space, err := ed25519signer.Generate()
31+
require.NoError(t, err)
32+
33+
invocations := []invocation.Invocation{}
34+
invokedCapabilities := []ucan.Capability[spaceblobcap.ReplicateCaveats]{}
35+
36+
connection := testutil.NewTestServerConnection(
37+
server.WithServiceMethod(
38+
spaceblobcap.Replicate.Can(),
39+
server.Provide(
40+
spaceblobcap.Replicate,
41+
func(
42+
ctx context.Context,
43+
cap ucan.Capability[spaceblobcap.ReplicateCaveats],
44+
inv invocation.Invocation,
45+
context server.InvocationContext,
46+
) (result.Result[spaceblobcap.ReplicateOk, failure.IPLDBuilderFailure], fx.Effects, error) {
47+
invocations = append(invocations, inv)
48+
invokedCapabilities = append(invokedCapabilities, cap)
49+
sitePromises := make([]types.Promise, cap.Nb().Replicas)
50+
for i := range sitePromises {
51+
siteDigest, err := multihash.Encode(fmt.Appendf(nil, "test-replicated-site-%d", i), multihash.IDENTITY)
52+
if err != nil {
53+
return nil, nil, fmt.Errorf("encoding site digest: %w", err)
54+
}
55+
sitePromises[i] = types.Promise{
56+
UcanAwait: types.Await{
57+
Selector: ".out.ok.site",
58+
Link: cidlink.Link{Cid: cid.NewCidV1(cid.Raw, siteDigest)},
59+
},
60+
}
61+
}
62+
return result.Ok[spaceblobcap.ReplicateOk, failure.IPLDBuilderFailure](
63+
spaceblobcap.ReplicateOk{
64+
Site: sitePromises,
65+
},
66+
), nil, nil
67+
},
68+
),
69+
),
70+
)
71+
72+
// Act as the space itself for auth simplicity
73+
c := uhelpers.Must(client.NewClient(client.WithConnection(connection), client.WithPrincipal(space)))
74+
75+
digest, err := multihash.Encode([]byte("test-digest"), multihash.IDENTITY)
76+
blob := types.Blob{Digest: digest, Size: 123}
77+
78+
location := libtestutil.RandomLocationDelegation(t)
79+
replicateOk, _, err := c.SpaceBlobReplicate(t.Context(), space.DID(), blob, 5, location)
80+
require.NoError(t, err)
81+
82+
require.Len(t, invocations, 1, "expected exactly one invocation to be made")
83+
inv := invocations[0]
84+
require.Len(t, invokedCapabilities, 1, "expected exactly one capability to be invoked")
85+
capability := invokedCapabilities[0]
86+
87+
nb := uhelpers.Must(spaceblobcap.ReplicateCaveatsReader.Read(capability.Nb()))
88+
require.Equal(t, blob, nb.Blob, "expected to replicate the correct blob")
89+
require.Equal(t, uint(5), nb.Replicas, "expected to replicate the correct number of replicas")
90+
require.Equal(t, location.Link(), nb.Site, "expected to replicate from the correct site")
91+
92+
// Get the location claim from the invocation's extra blocks.
93+
br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(inv.Blocks()))
94+
require.NoError(t, err)
95+
attachedLocation, err := invocation.NewInvocationView(nb.Site, br)
96+
require.NoError(t, err)
97+
require.Equal(t, location.Root().Bytes(), attachedLocation.Root().Bytes(), "expected the invocation to be attached to the location commitment")
98+
99+
// This is somewhat testing the test, but we want to make sure we get out
100+
// whatever the server sent.
101+
require.Len(t, replicateOk.Site, 5, "expected to receive the correct number of site promises")
102+
for i, p := range replicateOk.Site {
103+
require.Equal(t, ".out.ok.site", p.UcanAwait.Selector, "expected to receive the correct selector")
104+
expectedSiteDigest, err := multihash.Encode(fmt.Appendf(nil, "test-replicated-site-%d", i), multihash.IDENTITY)
105+
require.NoError(t, err)
106+
expectedSite := cidlink.Link{Cid: cid.NewCidV1(cid.Raw, expectedSiteDigest)}
107+
require.Equal(t, expectedSite, p.UcanAwait.Link, "expected to receive the correct site promise")
108+
}
109+
})
110+
}

pkg/preparation/internal/mockclient/mockclient.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,52 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"testing"
78

9+
"github.com/ipfs/go-cid"
810
"github.com/ipld/go-ipld-prime"
11+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
912
"github.com/multiformats/go-multihash"
13+
spaceblobcap "github.com/storacha/go-libstoracha/capabilities/space/blob"
14+
"github.com/storacha/go-libstoracha/capabilities/types"
1015
"github.com/storacha/go-libstoracha/capabilities/upload"
16+
"github.com/storacha/go-libstoracha/testutil"
1117
"github.com/storacha/go-ucanto/core/delegation"
18+
"github.com/storacha/go-ucanto/core/receipt/fx"
1219
"github.com/storacha/go-ucanto/did"
1320
"github.com/storacha/guppy/pkg/client"
1421
"github.com/storacha/guppy/pkg/preparation/storacha"
22+
"github.com/stretchr/testify/require"
1523
_ "modernc.org/sqlite"
1624
)
1725

1826
type MockClient struct {
19-
SpaceBlobAddInvocations []spaceBlobAddInvocation
20-
SpaceIndexAddInvocations []spaceIndexAddInvocation
21-
UploadAddInvocations []uploadAddInvocation
27+
T *testing.T
28+
SpaceBlobAddInvocations []spaceBlobAddInvocation
29+
SpaceIndexAddInvocations []spaceIndexAddInvocation
30+
SpaceBlobReplicateInvocations []spaceBlobReplicateInvocation
31+
UploadAddInvocations []uploadAddInvocation
2232
}
2333

2434
type spaceBlobAddInvocation struct {
2535
Space did.DID
2636
BlobAdded []byte
37+
38+
ReturnedLocation delegation.Delegation
2739
}
2840

2941
type spaceIndexAddInvocation struct {
3042
Space did.DID
3143
IndexLink ipld.Link
3244
}
3345

46+
type spaceBlobReplicateInvocation struct {
47+
Space did.DID
48+
Blob types.Blob
49+
ReplicaCount uint
50+
LocationCommitment delegation.Delegation
51+
}
52+
3453
type uploadAddInvocation struct {
3554
Space did.DID
3655
Root ipld.Link
@@ -41,17 +60,21 @@ var _ storacha.Client = (*MockClient)(nil)
4160

4261
func (m *MockClient) SpaceBlobAdd(ctx context.Context, content io.Reader, space did.DID, options ...client.SpaceBlobAddOption) (multihash.Multihash, delegation.Delegation, error) {
4362
contentBytes, err := io.ReadAll(content)
44-
if err != nil {
45-
return nil, nil, fmt.Errorf("reading content for SpaceBlobAdd: %w", err)
46-
}
63+
require.NoError(m.T, err, "reading content for SpaceBlobAdd")
64+
65+
location := testutil.RandomLocationDelegation(m.T)
4766

4867
m.SpaceBlobAddInvocations = append(m.SpaceBlobAddInvocations, spaceBlobAddInvocation{
4968
Space: space,
5069
BlobAdded: contentBytes,
70+
71+
ReturnedLocation: location,
5172
})
5273

5374
digest, err := multihash.Sum(contentBytes, multihash.SHA2_256, -1)
54-
return digest, nil, nil
75+
require.NoError(m.T, err, "summing digest for SpaceBlobAdd")
76+
77+
return digest, location, nil
5578
}
5679

5780
func (m *MockClient) SpaceIndexAdd(ctx context.Context, indexLink ipld.Link, space did.DID) error {
@@ -63,6 +86,28 @@ func (m *MockClient) SpaceIndexAdd(ctx context.Context, indexLink ipld.Link, spa
6386
return nil
6487
}
6588

89+
func (m *MockClient) SpaceBlobReplicate(ctx context.Context, space did.DID, blob types.Blob, replicaCount uint, locationCommitment delegation.Delegation) (spaceblobcap.ReplicateOk, fx.Effects, error) {
90+
m.SpaceBlobReplicateInvocations = append(m.SpaceBlobReplicateInvocations, spaceBlobReplicateInvocation{
91+
Space: space,
92+
Blob: blob,
93+
ReplicaCount: replicaCount,
94+
LocationCommitment: locationCommitment,
95+
})
96+
97+
sitePromises := make([]types.Promise, replicaCount)
98+
for i := range sitePromises {
99+
siteDigest, err := multihash.Encode(fmt.Appendf(nil, "test-replicated-site-%d", i), multihash.IDENTITY)
100+
require.NoError(m.T, err, "encoding site digest")
101+
sitePromises[i] = types.Promise{
102+
UcanAwait: types.Await{
103+
Selector: ".out.ok.site",
104+
Link: cidlink.Link{Cid: cid.NewCidV1(cid.Raw, siteDigest)},
105+
},
106+
}
107+
}
108+
return spaceblobcap.ReplicateOk{Site: sitePromises}, nil, nil
109+
}
110+
66111
func (m *MockClient) UploadAdd(ctx context.Context, space did.DID, root ipld.Link, shards []ipld.Link) (upload.AddOk, error) {
67112
m.UploadAddInvocations = append(m.UploadAddInvocations, uploadAddInvocation{
68113
Space: space,

0 commit comments

Comments
 (0)