@@ -10,6 +10,8 @@ import (
1010
1111 "github.com/tarantool/go-tarantool/v2"
1212 "github.com/tarantool/go-tarantool/v2/pool"
13+ "github.com/vmihailenco/msgpack/v5"
14+ "github.com/vmihailenco/msgpack/v5/msgpcode"
1315)
1416
1517// --------------------------------------------------------------------------------
@@ -27,32 +29,135 @@ func (c VshardMode) String() string {
2729 return string (c )
2830}
2931
30- type storageCallAssertError struct {
32+ type vshardStorageCallResponseProto struct {
33+ AssertError * assertError // not nil if there is assert error
34+ VshardError * StorageCallVShardError // not nil if there is vshard response
35+ Data []interface {} // raw response data
36+ }
37+
38+ func (r * vshardStorageCallResponseProto ) DecodeMsgpack (d * msgpack.Decoder ) error {
39+ /* vshard.storage.call(func) response has the next 4 possbile formats:
40+ See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130
41+ 1. vshard error has occurred:
42+ array[nil, vshard_error]
43+ 2. User method has finished with some error:
44+ array[false, assert error]
45+ 3. User mehod has finished successfully
46+ a) but has not returned anything
47+ array[true]
48+ b) has returned 1 element
49+ array[true, elem1]
50+ c) has returned 2 element
51+ array[true, elem1, elem2]
52+ d) has returned 3 element
53+ array[true, elem1, elem2, elem3]
54+ */
55+
56+ // Ensure it is an array and get array len for protocol violation check
57+ respArrayLen , err := d .DecodeArrayLen ()
58+ if err != nil {
59+ return err
60+ }
61+
62+ if respArrayLen == 0 {
63+ return fmt .Errorf ("protocol violation: invalid array length: %d" , respArrayLen )
64+ }
65+
66+ // we need peek code to make our check faster than decode interface
67+ // later we will check if code nil or bool
68+ code , err := d .PeekCode ()
69+ if err != nil {
70+ return err
71+ }
72+
73+ // this is storage error
74+ if code == msgpcode .Nil {
75+ err = d .DecodeNil ()
76+ if err != nil {
77+ return err
78+ }
79+
80+ if respArrayLen != 2 {
81+ return fmt .Errorf ("protocol violation: length is %d on vshard error case" , respArrayLen )
82+ }
83+
84+ var vshardError StorageCallVShardError
85+
86+ err = d .Decode (& vshardError )
87+ if err != nil {
88+ return fmt .Errorf ("failed to decode storage vshard error: %w" , err )
89+ }
90+
91+ r .VshardError = & vshardError
92+
93+ return nil
94+ }
95+
96+ isVShardRespOk , err := d .DecodeBool ()
97+ if err != nil {
98+ return err
99+ }
100+
101+ if ! isVShardRespOk {
102+ // that means we have an assert errors and response is not ok
103+ if respArrayLen != 2 {
104+ return fmt .Errorf ("protocol violation: length is %d on assert error case" , respArrayLen )
105+ }
106+
107+ var assertError assertError
108+ err = d .Decode (& assertError )
109+ if err != nil {
110+ return fmt .Errorf ("failed to decode storage assert error: %w" , err )
111+ }
112+
113+ r .AssertError = & assertError
114+
115+ return nil
116+ }
117+
118+ // isVShardRespOk is true
119+ r .Data = make ([]interface {}, 0 , respArrayLen - 1 )
120+
121+ for i := 1 ; i < respArrayLen ; i ++ {
122+ elem , err := d .DecodeInterface ()
123+ if err != nil {
124+ return fmt .Errorf ("failed to decode into interface element #%d of response array" , i + 1 )
125+ }
126+ r .Data = append (r .Data , elem )
127+ }
128+
129+ return nil
130+ }
131+
132+ type assertError struct {
31133 Code int `msgpack:"code"`
32134 BaseType string `msgpack:"base_type"`
33135 Type string `msgpack:"type"`
34136 Message string `msgpack:"message"`
35137 Trace interface {} `msgpack:"trace"`
36138}
37139
38- func (s storageCallAssertError ) Error () string {
39- type alias storageCallAssertError
140+ func (s assertError ) Error () string {
141+ // Just print struct as is, use hack with alias type to avoid recursion:
142+ // %v attempts to call Error() method for s, which is recursion.
143+ // This alias doesn't have method Error().
144+ type alias assertError
40145 return fmt .Sprintf ("%+v" , alias (s ))
41146}
42147
43148type StorageCallVShardError struct {
44- BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id" `
149+ BucketID uint64 `msgpack:"bucket_id"`
45150 Reason string `msgpack:"reason"`
46151 Code int `msgpack:"code"`
47152 Type string `msgpack:"type"`
48153 Message string `msgpack:"message"`
49154 Name string `msgpack:"name"`
50155 // These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
51156 // Example: 00000000-0000-0002-0002-000000000000
52- MasterUUID string `msgpack:"master" mapstructure:"master" `
53- ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset" `
54- ReplicaUUID string `msgpack:"replica" mapstructure:"replica" `
55- Destination string `msgpack:"destination" mapstructure:"destination" `
157+ MasterUUID string `msgpack:"master"`
158+ ReplicasetUUID string `msgpack:"replicaset"`
159+ ReplicaUUID string `msgpack:"replica"`
160+ Destination string `msgpack:"destination"`
56161}
57162
58163func (s StorageCallVShardError ) Error () string {
@@ -108,7 +213,6 @@ func (r *Router) RouterCallImpl(ctx context.Context,
108213 })
109214
110215 var err error
111- var vshardError StorageCallVShardError
112216
113217 for {
114218 if since := time .Since (timeStart ); since > timeout {
@@ -141,32 +245,20 @@ func (r *Router) RouterCallImpl(ctx context.Context,
141245
142246 future := rs .conn .Do (req , opts .PoolMode )
143247
144- var respData [] interface {}
145- respData , err = future .Get ( )
248+ var storageCallResponse vshardStorageCallResponseProto
249+ err = future .GetTyped ( & storageCallResponse )
146250 if err != nil {
147251 return nil , nil , fmt .Errorf ("got error on future.Get(): %w" , err )
148252 }
149253
150- r .log ().Debugf (ctx , "Got call result response data %v" , respData )
254+ r .log ().Debugf (ctx , "Got call result response data %+ v" , storageCallResponse )
151255
152- if len (respData ) == 0 {
153- // vshard.storage.call(func) returns up to two values:
154- // - true/false/nil
155- // - func result, omitted if func does not return anything
156- return nil , nil , fmt .Errorf ("protocol violation %s: got empty response" , vshardStorageClientCall )
256+ if storageCallResponse .AssertError != nil {
257+ return nil , nil , fmt .Errorf ("%s: %s failed: %+v" , vshardStorageClientCall , fnc , storageCallResponse .AssertError )
157258 }
158259
159- if respData [0 ] == nil {
160- if len (respData ) != 2 {
161- return nil , nil , fmt .Errorf ("protocol violation %s: length is %d when respData[0] is nil" , vshardStorageClientCall , len (respData ))
162- }
163-
164- err = mapstructure .Decode (respData [1 ], & vshardError )
165- if err != nil {
166- // Something unexpected happened: we couldn't decode respData[1] as a vshardError,
167- // so return reason why and respData[1], that is supposed to be a vshardError.
168- return nil , nil , fmt .Errorf ("cant decode vhsard err by trarantool with err: %v (%v)" , err , respData [1 ])
169- }
260+ if storageCallResponse .VshardError != nil {
261+ vshardError := storageCallResponse .VshardError
170262
171263 switch vshardError .Name {
172264 case VShardErrNameWrongBucket , VShardErrNameBucketIsLocked :
@@ -201,7 +293,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
201293 time .Sleep (defaultPoolingPause )
202294
203295 if time .Since (timeStart ) > timeout {
204- return nil , nil , & vshardError
296+ return nil , nil , vshardError
205297 }
206298 }
207299 }
@@ -210,60 +302,36 @@ func (r *Router) RouterCallImpl(ctx context.Context,
210302
211303 r .metrics ().RetryOnCall ("bucket_migrate" )
212304
213- r .log ().Debugf (ctx , "Retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
305+ r .log ().Debugf (ctx , "Retrying fnc '%s' cause got vshard error: %v" , fnc , vshardError )
214306
215307 // this vshardError will be returned to a caller in case of timeout
216- err = & vshardError
308+ err = vshardError
217309 continue
218310 case VShardErrNameTransferIsInProgress :
219311 // Since lua vshard router doesn't retry here, we don't retry too.
220312 // There is a comment why lua vshard router doesn't retry:
221313 // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
222314 r .BucketReset (bucketID )
223- return nil , nil , & vshardError
315+ return nil , nil , vshardError
224316 case VShardErrNameNonMaster :
225317 // vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
226318 // See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
227319 // Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
228320 // we just return this error as is.
229- return nil , nil , & vshardError
321+ return nil , nil , vshardError
230322 default :
231- return nil , nil , & vshardError
323+ return nil , nil , vshardError
232324 }
233325 }
234326
235- var isVShardRespOk bool
236- err = future .GetTyped (& []interface {}{& isVShardRespOk })
237- if err != nil {
238- return nil , nil , fmt .Errorf ("protocol violation %s: can't decode respData[0] as boolean: %v" , vshardStorageClientCall , err )
239- }
240-
241- if ! isVShardRespOk {
242- // Since we got respData[0] == false, it means that an error has happened
243- // while executing user-defined function on vshard storage.
244- // In this case, vshard storage must return a pair: false, error.
245- if len (respData ) != 2 {
246- return nil , nil , fmt .Errorf ("protocol violation %s: response length is %d when respData[0] is false" , vshardStorageClientCall , len (respData ))
247- }
248-
249- var assertError storageCallAssertError
250- err = mapstructure .Decode (respData [1 ], & assertError )
251- if err != nil {
252- // We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
253- return nil , nil , fmt .Errorf ("%s: %s failed %v (decoding to assertError failed %v)" , vshardStorageClientCall , fnc , respData [1 ], err )
254- }
255-
256- return nil , nil , fmt .Errorf ("%s: %s failed: %+v" , vshardStorageClientCall , fnc , assertError )
257- }
258-
259327 r .metrics ().RequestDuration (time .Since (timeStart ), true , false )
260328
261- return respData [ 1 :] , func (result interface {}) error {
262- if len (respData ) < 2 {
329+ return storageCallResponse . Data , func (result interface {}) error {
330+ if len (storageCallResponse . Data ) == 0 {
263331 return nil
264332 }
265333
266- var stub interface {}
334+ var stub bool
267335
268336 return future .GetTyped (& []interface {}{& stub , result })
269337 }, nil
@@ -399,7 +467,7 @@ func (r *Router) RouterMapCallRWImpl(
399467 return nil , fmt .Errorf ("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d" , len (respData ))
400468 }
401469
402- var assertError storageCallAssertError
470+ var assertError assertError
403471 err = mapstructure .Decode (respData [1 ], & assertError )
404472 if err != nil {
405473 // We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
0 commit comments