Skip to content

Commit acffa0a

Browse files
committed
fix:
update middleware update errors
1 parent 1b70895 commit acffa0a

File tree

32 files changed

+550
-566
lines changed

32 files changed

+550
-566
lines changed

clients/grpcc/middleware.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,27 @@ import (
44
"context"
55
"time"
66

7+
"github.com/pubgo/xerror"
78
"google.golang.org/grpc"
89
"google.golang.org/grpc/metadata"
910
"google.golang.org/grpc/peer"
1011

12+
"github.com/pubgo/lava/pkg/encoding"
1113
"github.com/pubgo/lava/types"
1214
)
1315

1416
func unaryInterceptor(middlewares []types.Middleware) grpc.UnaryClientInterceptor {
15-
var wrapperUnary = func(ctx context.Context, req types.Request, rsp func(response types.Response) error) error {
17+
var unaryWrapper = func(ctx context.Context, req types.Request, rsp func(response types.Response) error) error {
1618
var reqCtx = req.(*request)
1719
ctx = metadata.NewOutgoingContext(ctx, reqCtx.Header())
1820
if err := reqCtx.invoker(ctx, reqCtx.method, reqCtx.req, reqCtx.reply, reqCtx.cc); err != nil {
1921
return err
2022
}
21-
2223
return rsp(&response{req: reqCtx, resp: reqCtx.reply})
2324
}
2425

2526
for i := len(middlewares) - 1; i >= 0; i-- {
26-
wrapperUnary = middlewares[i](wrapperUnary)
27+
unaryWrapper = middlewares[i](unaryWrapper)
2728
}
2829

2930
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
@@ -58,14 +59,17 @@ func unaryInterceptor(middlewares []types.Middleware) grpc.UnaryClientIntercepto
5859
if dur, err := time.ParseDuration(to[0]); err == nil {
5960
var cancel context.CancelFunc
6061
ctx, cancel = context.WithTimeout(ctx, dur)
61-
_ = cancel
62+
defer cancel()
6263
}
6364
}
6465

65-
return wrapperUnary(
66-
ctx,
66+
var cdc = encoding.GetCdc(ct)
67+
xerror.Assert(cdc == nil, "contentType(%s) codec not found", ct)
68+
69+
return unaryWrapper(ctx,
6770
&request{
6871
ct: ct,
72+
cdc: cdc,
6973
header: md,
7074
service: serviceFromMethod(method),
7175
opts: opts,
@@ -128,14 +132,17 @@ func streamInterceptor(middlewares []types.Middleware) grpc.StreamClientIntercep
128132
if dur, err := time.ParseDuration(to[0]); err == nil {
129133
var cancel context.CancelFunc
130134
ctx, cancel = context.WithTimeout(ctx, dur)
131-
_ = cancel
135+
defer cancel()
132136
}
133137
}
134138

135-
return nil, wrapperStream(
136-
ctx,
139+
var cdc = encoding.GetCdc(ct)
140+
xerror.Assert(cdc == nil, "contentType(%s) codec not found", ct)
141+
142+
return nil, wrapperStream(ctx,
137143
&request{
138144
ct: ct,
145+
cdc: cdc,
139146
service: serviceFromMethod(method),
140147
header: md,
141148
opts: opts,
@@ -144,10 +151,7 @@ func streamInterceptor(middlewares []types.Middleware) grpc.StreamClientIntercep
144151
method: method,
145152
streamer: streamer,
146153
},
147-
func(rsp types.Response) error {
148-
resp = rsp.(*response).stream
149-
return nil
150-
},
154+
func(rsp types.Response) error { resp = rsp.(*response).stream; return nil },
151155
)
152156
}
153157
}

clients/grpcc/request.go

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ var _ types.Request = (*request)(nil)
1111

1212
type request struct {
1313
ct string
14+
cdc encoding.Codec
1415
opts []grpc.CallOption
1516
method string
1617
service string
@@ -22,46 +23,14 @@ type request struct {
2223
header types.Header
2324
}
2425

25-
func (r *request) Kind() string {
26-
return Name
27-
}
28-
29-
func (r *request) Codec() encoding.Codec {
30-
return encoding.Get(encoding.Mapping[r.ct])
31-
}
32-
33-
func (r *request) Client() bool {
34-
return true
35-
}
36-
37-
func (r *request) Service() string {
38-
return r.service
39-
}
40-
41-
func (r *request) Method() string {
42-
return r.method
43-
}
44-
45-
func (r *request) Endpoint() string {
46-
return r.method
47-
}
48-
49-
func (r *request) ContentType() string {
50-
return r.ct
51-
}
52-
53-
func (r *request) Header() types.Header {
54-
return r.header
55-
}
56-
57-
func (r *request) Payload() interface{} {
58-
return r.req
59-
}
60-
61-
func (r *request) Body() ([]byte, error) {
62-
return nil, nil
63-
}
64-
65-
func (r *request) Stream() bool {
66-
return false
67-
}
26+
func (r *request) Kind() string { return Name }
27+
func (r *request) Codec() encoding.Codec { return r.cdc }
28+
func (r *request) Client() bool { return true }
29+
func (r *request) Service() string { return r.service }
30+
func (r *request) Method() string { return r.method }
31+
func (r *request) Endpoint() string { return r.method }
32+
func (r *request) ContentType() string { return r.ct }
33+
func (r *request) Header() types.Header { return r.header }
34+
func (r *request) Payload() interface{} { return r.req }
35+
func (r *request) Read() ([]byte, error) { return nil, nil }
36+
func (r *request) Stream() bool { return r.desc != nil }

clients/grpcc/response.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package grpcc
22

33
import (
4-
"encoding/json"
5-
64
"github.com/pubgo/lava/types"
75
"google.golang.org/grpc"
86
)
@@ -15,18 +13,6 @@ type response struct {
1513
resp interface{}
1614
}
1715

18-
func (r *response) Stream() bool {
19-
return r.stream != nil
20-
}
21-
22-
func (r *response) Header() types.Header {
23-
return r.req.header
24-
}
25-
26-
func (r *response) Body() ([]byte, error) {
27-
return json.Marshal(r.resp)
28-
}
29-
30-
func (r *response) Payload() interface{} {
31-
return r.resp
32-
}
16+
func (r *response) Stream() bool { return r.stream != nil }
17+
func (r *response) Header() types.Header { return r.req.header }
18+
func (r *response) Payload() interface{} { return r.resp }

clients/orm/config.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/pubgo/lava/internal/logz"
1212
"github.com/pubgo/lava/pkg/merge"
1313
"github.com/pubgo/lava/plugins/tracing"
14+
"github.com/pubgo/lava/runenv"
1415
)
1516

1617
var logs = logz.New(Name)
@@ -36,20 +37,26 @@ type Cfg struct {
3637

3738
func (t Cfg) Build(dialect gorm.Dialector) *gorm.DB {
3839
var log = merge.Struct(&gorm.Config{}, t).(*gorm.Config)
40+
41+
var level = logger.Info
42+
if runenv.IsProd() || runenv.IsRelease() {
43+
level = logger.Error
44+
}
45+
3946
log.Logger = logger.New(
4047
logPrintf(logs.Infof),
4148
logger.Config{
42-
SlowThreshold: time.Second, // Slow SQL threshold
43-
LogLevel: logger.Silent, // Log level
44-
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
45-
Colorful: false, // Disable color
49+
SlowThreshold: 200 * time.Millisecond,
50+
LogLevel: level,
51+
IgnoreRecordNotFoundError: false,
52+
Colorful: true,
4653
},
4754
)
4855

4956
db, err := gorm.Open(dialect, log)
5057
xerror.Panic(err)
5158

52-
// 添加链路
59+
// 添加链路追踪
5360
xerror.Panic(db.Use(opentracing.New(
5461
opentracing.WithErrorTagHook(tracing.SetIfErr),
5562
)))

clients/restc/client.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@ import (
44
"bytes"
55
"context"
66
"io"
7+
"io/ioutil"
78
"net/http"
89
"net/url"
910
"time"
1011

1112
"github.com/pubgo/x/strutil"
1213
"github.com/pubgo/xerror"
1314

15+
"github.com/pubgo/lava/pkg/encoding"
1416
"github.com/pubgo/lava/pkg/httpx"
15-
"github.com/pubgo/lava/pkg/retry"
1617
"github.com/pubgo/lava/types"
1718
)
1819

@@ -32,10 +33,25 @@ type clientImpl struct {
3233
}
3334

3435
func (c *clientImpl) Do(ctx context.Context, req *Request) (resp *Response, err error) {
35-
return resp, c.do(ctx, &request{req: req}, func(res types.Response) error {
36-
resp = res.(*response).resp
37-
return nil
38-
})
36+
var ct = filterFlags(req.Header.Get(httpx.HeaderContentType))
37+
var cdc = encoding.GetCdc(ct)
38+
xerror.Assert(cdc == nil, "contentType(%s) codec not found", ct)
39+
40+
var data []byte
41+
if req.Body != nil {
42+
data, err = ioutil.ReadAll(req.Body)
43+
xerror.Panic(err)
44+
req.Body = ioutil.NopCloser(bytes.NewReader(data))
45+
}
46+
47+
return resp, c.do(ctx,
48+
&request{
49+
ct: ct,
50+
cdc: cdc,
51+
req: req,
52+
},
53+
func(res types.Response) error { resp = res.(*response).resp; return nil },
54+
)
3955
}
4056

4157
func (c *clientImpl) Get(ctx context.Context, url string, requests ...func(req *Request)) (*Response, error) {
@@ -92,15 +108,11 @@ func doRequest(ctx context.Context, c *clientImpl, mth string, url string, reque
92108
return resp, nil
93109
}
94110

95-
func doFunc(c *clientImpl) types.MiddleNext {
96-
var r = retry.New(retry.WithMaxRetries(c.cfg.RetryCount, c.cfg.backoff))
97-
return func(ctx context.Context, req types.Request, callback func(rsp types.Response) error) error {
98-
return r.Do(func(i int) error {
99-
resp, err := c.client.Do(req.(*request).req)
100-
if err != nil {
101-
return err
102-
}
103-
return callback(&response{resp: resp})
104-
})
111+
func filterFlags(content string) string {
112+
for i, char := range content {
113+
if char == ' ' || char == ';' {
114+
return content[:i]
115+
}
105116
}
117+
return content
106118
}

clients/restc/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (t Cfg) Build(opts ...func(cfg *Cfg)) (_ Client, err error) {
6969
if mid == nil {
7070
continue
7171
}
72-
middlewares = append(middlewares, plugin.Get(name).Middleware())
72+
middlewares = append(middlewares, mid)
7373
}
7474

7575
// 加载插件

clients/restc/middleware.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package restc
2+
3+
import (
4+
"context"
5+
6+
"github.com/pubgo/lava/pkg/retry"
7+
"github.com/pubgo/lava/types"
8+
)
9+
10+
func doFunc(c *clientImpl) types.MiddleNext {
11+
var r = retry.New(retry.WithMaxRetries(c.cfg.RetryCount, c.cfg.backoff))
12+
return func(ctx context.Context, req types.Request, callback func(rsp types.Response) error) error {
13+
return r.Do(func(i int) error {
14+
resp, err := c.client.Do(req.(*request).req)
15+
if err != nil {
16+
return err
17+
}
18+
return callback(&response{resp: resp})
19+
})
20+
}
21+
}

clients/restc/request.go

Lines changed: 17 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -8,49 +8,20 @@ import (
88
var _ types.Request = (*request)(nil)
99

1010
type request struct {
11-
req *Request
12-
}
13-
14-
func (r *request) Kind() string {
15-
return Name
16-
}
17-
18-
func (r *request) Codec() encoding.Codec {
19-
return encoding.Get(encoding.Mapping[r.ContentType()])
20-
}
21-
22-
func (r *request) Client() bool {
23-
return true
24-
}
25-
26-
func (r *request) Service() string {
27-
return ""
28-
}
29-
30-
func (r *request) Method() string {
31-
return r.req.Method
32-
}
33-
34-
func (r *request) Endpoint() string {
35-
return r.req.RequestURI
36-
}
37-
38-
func (r *request) ContentType() string {
39-
return r.ContentType()
40-
}
41-
42-
func (r *request) Header() types.Header {
43-
return types.Header(r.req.Header)
44-
}
45-
46-
func (r *request) Payload() interface{} {
47-
return nil
48-
}
49-
50-
func (r *request) Body() ([]byte, error) {
51-
return nil, nil
52-
}
53-
54-
func (r *request) Stream() bool {
55-
return false
56-
}
11+
service string
12+
req *Request
13+
ct string
14+
cdc encoding.Codec
15+
}
16+
17+
func (r *request) Kind() string { return Name }
18+
func (r *request) Codec() encoding.Codec { return r.cdc }
19+
func (r *request) Client() bool { return true }
20+
func (r *request) Service() string { return r.service }
21+
func (r *request) Method() string { return r.req.Method }
22+
func (r *request) Endpoint() string { return r.req.RequestURI }
23+
func (r *request) ContentType() string { return r.ct }
24+
func (r *request) Header() types.Header { return types.Header(r.req.Header) }
25+
func (r *request) Payload() interface{} { return nil }
26+
func (r *request) Read() ([]byte, error) { return nil, nil }
27+
func (r *request) Stream() bool { return false }

0 commit comments

Comments
 (0)