Skip to content

Commit 0a8677f

Browse files
partially resolve #100 (#113)
* custom msgpackv5 decoders * use StorageCallVShardError instead of bucketStatError
1 parent a486c03 commit 0a8677f

File tree

5 files changed

+99
-38
lines changed

5 files changed

+99
-38
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ REFACTOR:
2323
* Use constants for vshard error names and codes.
2424
* Reduce SLOC by using CallAsync method.
2525
* BucketForceCreate optimization: don't decode tnt response.
26+
* Remove bucketStatError type, use StorageCallVShardError type instead.
27+
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
28+
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).
2629

2730
TESTS:
2831
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.

discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
8888

8989
for _, rsFuture := range rsFutures {
9090
if _, err := bucketStatWait(rsFuture.future); err != nil {
91-
var bsError bucketStatError
92-
if !errors.As(err, &bsError) {
91+
var vshardError StorageCallVShardError
92+
if !errors.As(err, &vshardError) {
9393
r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsID, err)
9494
}
9595
// just skip, bucket may not belong to this replicaset

error.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,6 @@ const (
9292
VShardErrNameInstanceNameMismatch = "INSTANCE_NAME_MISMATCH"
9393
)
9494

95-
type bucketStatError struct {
96-
BucketID uint64 `msgpack:"bucket_id"`
97-
Reason string `msgpack:"reason"`
98-
Code int `msgpack:"code"`
99-
Type string `msgpack:"type"`
100-
Message string `msgpack:"message"`
101-
Name string `msgpack:"name"`
102-
}
103-
104-
func (bse bucketStatError) Error() string {
105-
type alias bucketStatError
106-
return fmt.Sprintf("%+v", alias(bse))
107-
}
108-
10995
func newVShardErrorNoRouteToBucket(bucketID uint64) error {
11096
return &StorageCallVShardError{
11197
Name: VShardErrNameNoRouteToBucket,

replicaset.go

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import (
77
"time"
88

99
"github.com/google/uuid"
10-
"github.com/mitchellh/mapstructure"
1110
"github.com/tarantool/go-tarantool/v2"
1211
"github.com/tarantool/go-tarantool/v2/pool"
12+
"github.com/vmihailenco/msgpack/v5"
13+
"github.com/vmihailenco/msgpack/v5/msgpcode"
1314
)
1415

1516
type ReplicasetInfo struct {
@@ -51,41 +52,71 @@ func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tar
5152
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID})
5253
}
5354

54-
func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
55-
var bsInfo BucketStatInfo
55+
type vshardStorageBucketStatResponseProto struct {
56+
ok bool
57+
info BucketStatInfo
58+
err StorageCallVShardError
59+
}
5660

57-
respData, err := future.Get()
61+
func (r *vshardStorageBucketStatResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
62+
// bucket_stat returns pair: stat, err
63+
// https://github.com/tarantool/vshard/blob/e1c806e1d3d2ce8a4e6b4d498c09051bf34ab92a/vshard/storage/init.lua#L1413
64+
65+
respArrayLen, err := d.DecodeArrayLen()
5866
if err != nil {
59-
return bsInfo, err
67+
return err
6068
}
6169

62-
if len(respData) == 0 {
63-
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: empty response")
70+
if respArrayLen == 0 {
71+
return fmt.Errorf("protocol violation bucketStatWait: empty response")
6472
}
6573

66-
if respData[0] == nil {
67-
if len(respData) != 2 {
68-
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: invalid response length %d when respData[0] is nil", len(respData))
74+
code, err := d.PeekCode()
75+
if err != nil {
76+
return err
77+
}
78+
79+
if code == msgpcode.Nil {
80+
err = d.DecodeNil()
81+
if err != nil {
82+
return err
83+
}
84+
85+
if respArrayLen != 2 {
86+
return fmt.Errorf("protocol violation bucketStatWait: length is %d on vshard error case", respArrayLen)
6987
}
7088

71-
var bsError bucketStatError
72-
err = mapstructure.Decode(respData[1], &bsError)
89+
err = d.Decode(&r.err)
7390
if err != nil {
74-
// We could not decode respData[1] as bsError, so return respData[1] as is, add info why we could not decode.
75-
return bsInfo, fmt.Errorf("bucketStatWait error: %v (can't decode into bsError: %v)", respData[1], err)
91+
return fmt.Errorf("failed to decode storage vshard error: %w", err)
7692
}
7793

78-
return bsInfo, bsError
94+
return nil
7995
}
8096

81-
// A problem with key-code 1
82-
// todo: fix after https://github.com/tarantool/go-tarantool/issues/368
83-
err = mapstructure.Decode(respData[0], &bsInfo)
97+
err = d.Decode(&r.info)
8498
if err != nil {
85-
return bsInfo, fmt.Errorf("can't decode bsInfo: %w", err)
99+
return fmt.Errorf("failed to decode bucket stat info: %w", err)
100+
}
101+
102+
r.ok = true
103+
104+
return nil
105+
}
106+
107+
func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
108+
var bucketStatResponse vshardStorageBucketStatResponseProto
109+
110+
err := future.GetTyped(&bucketStatResponse)
111+
if err != nil {
112+
return BucketStatInfo{}, err
113+
}
114+
115+
if !bucketStatResponse.ok {
116+
return BucketStatInfo{}, bucketStatResponse.err
86117
}
87118

88-
return bsInfo, nil
119+
return bucketStatResponse.info, nil
89120
}
90121

91122
// ReplicaCall perform function on remote storage

vshard.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/snksoft/crc"
12+
"github.com/vmihailenco/msgpack/v5"
1213

1314
tarantool "github.com/tarantool/go-tarantool/v2"
1415
)
@@ -122,8 +123,48 @@ type Config struct {
122123
}
123124

124125
type BucketStatInfo struct {
125-
BucketID uint64 `mapstructure:"id"`
126-
Status string `mapstructure:"status"`
126+
BucketID uint64 `msgpack:"id"`
127+
Status string `msgpack:"status"`
128+
}
129+
130+
// tnt vshard storage returns map with 'int' keys for bucketStatInfo,
131+
// example: map[id:48 status:active 1:48 2:active].
132+
// But msgpackv5 supports only string keys when decoding maps into structs,
133+
// see issue: https://github.com/vmihailenco/msgpack/issues/372
134+
// To workaround this we decode BucketStatInfo manually.
135+
// When the issue above will be resolved, this code can be (and should be) deleted.
136+
func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error {
137+
nKeys, err := d.DecodeMapLen()
138+
if err != nil {
139+
return err
140+
}
141+
142+
for i := 0; i < nKeys; i++ {
143+
key, err := d.DecodeInterface()
144+
if err != nil {
145+
return err
146+
}
147+
148+
keyName, _ := key.(string)
149+
switch keyName {
150+
case "id":
151+
if err := d.Decode(&bsi.BucketID); err != nil {
152+
return err
153+
}
154+
case "status":
155+
if err := d.Decode(&bsi.Status); err != nil {
156+
return err
157+
}
158+
default:
159+
// skip unused value
160+
if err := d.Skip(); err != nil {
161+
return err
162+
}
163+
}
164+
165+
}
166+
167+
return nil
127168
}
128169

129170
type InstanceInfo struct {

0 commit comments

Comments
 (0)