@@ -48,13 +48,7 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt
4848func (rs * Replicaset ) bucketStatAsync (ctx context.Context , bucketID uint64 ) * tarantool.Future {
4949 const bucketStatFnc = "vshard.storage.bucket_stat"
5050
51- req := tarantool .NewCallRequest (bucketStatFnc ).
52- Args ([]interface {}{bucketID }).
53- Context (ctx )
54-
55- future := rs .conn .Do (req , pool .RO )
56-
57- return future
51+ return rs .CallAsync (ctx , ReplicasetCallOpts {PoolMode : pool .RO }, bucketStatFnc , []interface {}{bucketID })
5852}
5953
6054func bucketStatWait (future * tarantool.Future ) (BucketStatInfo , error ) {
@@ -103,20 +97,11 @@ func (rs *Replicaset) ReplicaCall(
10397 fnc string ,
10498 args interface {},
10599) (interface {}, StorageResultTypedFunc , error ) {
106- timeout := CallTimeoutMin
107-
108- if opts .Timeout > 0 {
109- timeout = opts .Timeout
100+ if opts .Timeout == 0 {
101+ opts .Timeout = CallTimeoutMin
110102 }
111103
112- ctx , cancel := context .WithTimeout (ctx , timeout )
113- defer cancel ()
114-
115- req := tarantool .NewCallRequest (fnc ).
116- Context (ctx ).
117- Args (args )
118-
119- future := rs .conn .Do (req , opts .PoolMode )
104+ future := rs .CallAsync (ctx , opts , fnc , args )
120105
121106 respData , err := future .Get ()
122107 if err != nil {
@@ -158,13 +143,8 @@ func (rs *Replicaset) bucketsDiscoveryAsync(ctx context.Context, from uint64) *t
158143 From uint64 `msgpack:"from"`
159144 }{From : from }
160145
161- req := tarantool .NewCallRequest (bucketsDiscoveryFnc ).
162- Context (ctx ).
163- Args ([]interface {}{& bucketsDiscoveryPaginationRequest })
164-
165- future := rs .conn .Do (req , pool .PreferRO )
166-
167- return future
146+ return rs .CallAsync (ctx , ReplicasetCallOpts {PoolMode : pool .PreferRO }, bucketsDiscoveryFnc ,
147+ []interface {}{bucketsDiscoveryPaginationRequest })
168148}
169149
170150type bucketsDiscoveryResp struct {
@@ -279,28 +259,19 @@ func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error
279259func (rs * Replicaset ) BucketsCount (ctx context.Context ) (uint64 , error ) {
280260 const bucketCountFnc = "vshard.storage.buckets_count"
281261
282- req := tarantool .NewCallRequest (bucketCountFnc )
283- req = req .Context (ctx )
262+ var bucketCount uint64
284263
285- fut := rs .conn .Do (req , pool .ANY )
264+ fut := rs .CallAsync (ctx , ReplicasetCallOpts {PoolMode : pool .ANY }, bucketCountFnc , nil )
265+ err := fut .GetTyped (& []interface {}{& bucketCount })
286266
287- bucketCount := new (uint64 )
288-
289- err := fut .GetTyped (& []interface {}{bucketCount })
290-
291- return * bucketCount , err
267+ return bucketCount , err
292268}
293269
294270func (rs * Replicaset ) BucketForceCreate (ctx context.Context , firstBucketID , count uint64 ) error {
295- const bucketCountFnc = "vshard.storage.bucket_force_create"
296-
297- req := tarantool .NewCallRequest (bucketCountFnc )
298- req = req .Context (ctx )
299- req = req .Args (& []interface {}{firstBucketID , count })
300-
301- fut := rs .conn .Do (req , pool .RW )
271+ const bucketForceCreateFnc = "vshard.storage.bucket_force_create"
302272
303- _ , err := fut .Get ()
273+ fut := rs .CallAsync (ctx , ReplicasetCallOpts {PoolMode : pool .RW }, bucketForceCreateFnc , []interface {}{firstBucketID , count })
274+ _ , err := fut .GetResponse ()
304275
305276 return err
306277}
0 commit comments