Skip to content

Commit ec731f6

Browse files
committed
Polish code by extracting func and extracting struct
Signed-off-by: Jiangnan Jia <[email protected]>
1 parent f7928b3 commit ec731f6

File tree

4 files changed

+395
-200
lines changed

4 files changed

+395
-200
lines changed

pkg/client/opensergo_client.go

Lines changed: 48 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,48 @@
1515
package client
1616

1717
import (
18-
"context"
19-
"strconv"
20-
"sync/atomic"
21-
"time"
22-
2318
"github.com/golang/groupcache"
2419
"github.com/opensergo/opensergo-go/pkg/api"
20+
"github.com/opensergo/opensergo-go/pkg/client/stream"
2521
"github.com/opensergo/opensergo-go/pkg/common/logging"
22+
"github.com/opensergo/opensergo-go/pkg/global"
2623
"github.com/opensergo/opensergo-go/pkg/model"
2724
transportPb "github.com/opensergo/opensergo-go/pkg/proto/transport/v1"
2825
"github.com/opensergo/opensergo-go/pkg/transport/subscribe"
29-
"github.com/pkg/errors"
3026
"google.golang.org/grpc"
3127
"google.golang.org/grpc/credentials/insecure"
28+
"strconv"
3229
)
3330

3431
// OpenSergoClient is the client to communicate with opensergo-control-plane.
3532
type OpenSergoClient struct {
36-
host string
37-
port uint32
38-
transportServiceClient transportPb.OpenSergoUniversalTransportServiceClient
33+
host string
34+
port uint32
3935

40-
subscribeConfigStreamPtr atomic.Value // type of value is *client.subscribeConfigStream
41-
subscribeConfigStreamObserverPtr atomic.Value // type of value is *client.SubscribeConfigStreamObserver
42-
subscribeConfigStreamStatus atomic.Value // type of value is client.OpensergoClientStreamStatus
36+
opensergoOptions *api.OpensergoOptions
37+
clientOptions *api.ClientOptions
4338

44-
subscribeDataCache *subscribe.SubscribeDataCache
45-
subscriberRegistry *subscribe.SubscriberRegistry
39+
transportServiceClient transportPb.OpenSergoUniversalTransportServiceClient
40+
subscribeConfigOutsStream *stream.SubscribeConfigOutsStream
4641

4742
requestId groupcache.AtomicInt
4843
}
4944

5045
// NewOpenSergoClient returns an instance of OpenSergoClient, and init some properties.
51-
func NewOpenSergoClient(host string, port uint32) (*OpenSergoClient, error) {
46+
func NewOpenSergoClient(host string, port uint32, opts ...api.ClientOption) (*OpenSergoClient, error) {
47+
// globalOpts
48+
globalOptions := global.GetGlobalOptions()
49+
// OpensergoOptions from GetGlobalOptions
50+
opensergoOptions := globalOptions.OpensergoOptions()
51+
// ClientOptions from GetGlobalOptions
52+
clientOptions := globalOptions.ClientOptions()
53+
// override default ClientOptions by params
54+
if len(opts) > 0 {
55+
for _, opt := range opts {
56+
opt(clientOptions)
57+
}
58+
}
59+
5260
address := host + ":" + strconv.FormatUint(uint64(port), 10)
5361
clientConn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
5462
if err != nil {
@@ -57,165 +65,58 @@ func NewOpenSergoClient(host string, port uint32) (*OpenSergoClient, error) {
5765
transportServiceClient := transportPb.NewOpenSergoUniversalTransportServiceClient(clientConn)
5866

5967
openSergoClient := &OpenSergoClient{
60-
host: host,
61-
port: port,
62-
transportServiceClient: transportServiceClient,
63-
subscribeDataCache: &subscribe.SubscribeDataCache{},
64-
subscriberRegistry: &subscribe.SubscriberRegistry{},
65-
requestId: groupcache.AtomicInt(0),
68+
host: host,
69+
port: port,
70+
opensergoOptions: opensergoOptions,
71+
clientOptions: clientOptions,
72+
transportServiceClient: transportServiceClient,
73+
subscribeConfigOutsStream: stream.NewSubscribeConfigOutsStream(opensergoOptions, clientOptions, transportServiceClient),
74+
requestId: groupcache.AtomicInt(0),
6675
}
67-
openSergoClient.subscribeConfigStreamPtr.Store(&subscribeConfigStream{})
68-
openSergoClient.subscribeConfigStreamObserverPtr.Store(&subscribeConfigStreamObserver{})
69-
openSergoClient.subscribeConfigStreamStatus.Store(initial)
7076
return openSergoClient, nil
7177
}
7278

7379
func (c *OpenSergoClient) SubscribeDataCache() *subscribe.SubscribeDataCache {
74-
return c.subscribeDataCache
80+
return c.subscribeConfigOutsStream.SubscribeDataCache
7581
}
7682

7783
func (c *OpenSergoClient) SubscriberRegistry() *subscribe.SubscriberRegistry {
78-
return c.subscriberRegistry
79-
}
80-
81-
// KeepAlive
82-
//
83-
// keepalive OpenSergoClient
84-
func (c *OpenSergoClient) keepAlive() {
85-
// TODO change to event-driven-model instead of for-loop
86-
for {
87-
status := c.CurrentStreamStatus()
88-
if status == interrupted {
89-
logging.Info("Try to restart OpenSergoClient...")
90-
// We do not handle error here because error has print in Start()
91-
_ = c.Start()
92-
}
93-
time.Sleep(time.Duration(10) * time.Second)
94-
}
95-
}
96-
97-
func (c *OpenSergoClient) CurrentStreamStatus() OpensergoClientStreamStatus {
98-
return c.subscribeConfigStreamStatus.Load().(OpensergoClientStreamStatus)
84+
return c.subscribeConfigOutsStream.SubscriberRegistry
9985
}
10086

10187
// Start the OpenSergoClient.
10288
func (c *OpenSergoClient) Start() error {
10389
logging.Info("OpenSergoClient is starting...")
10490

105-
// keepalive OpensergoClient
106-
if c.CurrentStreamStatus() == initial {
107-
logging.Info("Start a keepalive() daemon goroutine to keep OpenSergoClient alive")
108-
go c.keepAlive()
91+
if err := c.subscribeConfigOutsStream.Start(); err != nil {
92+
logging.Error(err, "OpenSergoClient cannot start", "host", c.host, "port", c.port)
93+
return err
10994
}
11095

111-
c.subscribeConfigStreamStatus.Store(starting)
96+
logging.Info("OpenSergoClient has been started", "host", c.host, "port", c.port)
97+
return nil
98+
}
11299

113-
stream, err := c.transportServiceClient.SubscribeConfig(context.Background())
114-
if err != nil {
115-
logging.Error(err, "[OpenSergo SDK] SubscribeConfigStream can not connect.")
116-
c.subscribeConfigStreamStatus.Store(interrupted)
117-
return err
118-
}
119-
c.subscribeConfigStreamPtr.Store(&subscribeConfigStream{stream: stream})
100+
// Close the OpenSergoClient.
101+
func (c *OpenSergoClient) Close() error {
102+
logging.Info("OpenSergoClient is closing...")
120103

121-
if c.subscribeConfigStreamObserverPtr.Load().(*subscribeConfigStreamObserver).observer == nil {
122-
c.subscribeConfigStreamObserverPtr.Store(&subscribeConfigStreamObserver{observer: NewSubscribeConfigStreamObserver(c)})
123-
c.subscribeConfigStreamObserverPtr.Load().(*subscribeConfigStreamObserver).observer.Start()
124-
}
104+
// TODO do close
125105

126-
logging.Info("[OpenSergo SDK] begin to subscribe config-data...")
127-
// TODO: handle error inside the ForEach here.
128-
c.subscriberRegistry.ForEachSubscribeKey(func(key model.SubscribeKey) bool {
129-
err := c.SubscribeConfig(key)
130-
if err != nil {
131-
logging.Error(err, "Failed to SubscribeConfig for key", "namespace", key.Namespace(), "app", key.App(), "kind", key.Kind())
132-
}
133-
return true
134-
})
106+
c.subscribeConfigOutsStream.Shutdown()
135107

136-
logging.Info("OpenSergoClient has been started", "host", c.host, "port", c.port)
137-
c.subscribeConfigStreamStatus.Store(started)
108+
logging.Info("OpenSergoClient has been closed", "host", c.host, "port", c.port)
138109
return nil
139110
}
140111

141112
// SubscribeConfig sends a subscribe request to opensergo-control-plane,
142113
// and return the result of subscribe config-data from opensergo-control-plane.
143114
func (c *OpenSergoClient) SubscribeConfig(subscribeKey model.SubscribeKey, opts ...api.SubscribeOption) error {
144-
configStream := c.subscribeConfigStreamPtr.Load().(*subscribeConfigStream)
145-
if configStream == nil || configStream.stream == nil {
146-
logging.Warn("gRPC stream in OpenSergoClient is nil! Cannot subscribe config-data. waiting for keepalive goroutine to restart...", "namespace", subscribeKey.Namespace(), "app", subscribeKey.App(), "kinds", subscribeKey.Kind().GetName())
147-
c.subscribeConfigStreamStatus.Store(interrupted)
148-
return errors.New("configStream not ready (nil)")
149-
}
150-
151-
options := &api.SubscribeOptions{}
152-
if len(opts) > 0 {
153-
for _, opt := range opts {
154-
opt(options)
155-
}
156-
}
157-
158-
// Register subscribers.
159-
if len(options.Subscribers) > 0 {
160-
for _, subscriber := range options.Subscribers {
161-
c.subscriberRegistry.RegisterSubscriber(subscribeKey, subscriber)
162-
}
163-
}
164-
165-
subscribeRequestTarget := transportPb.SubscribeRequestTarget{
166-
Namespace: subscribeKey.Namespace(),
167-
App: subscribeKey.App(),
168-
Kinds: []string{subscribeKey.Kind().GetName()},
169-
}
170-
171-
subscribeRequest := &transportPb.SubscribeRequest{
172-
Target: &subscribeRequestTarget,
173-
OpType: transportPb.SubscribeOpType_SUBSCRIBE,
174-
}
175-
176-
err := c.subscribeConfigStreamPtr.Load().(*subscribeConfigStream).stream.Send(subscribeRequest)
177-
if err != nil {
178-
logging.Error(err, "Failed to send or handle SubscribeConfig request", "namespace", subscribeKey.Namespace(), "app", subscribeKey.App(), "kinds", subscribeKey.Kind().GetName())
179-
return err
180-
}
181-
182-
logging.Info("Subscribe success", "namespace", subscribeKey.Namespace(), "app", subscribeKey.App(), "kinds", subscribeKey.Kind().GetName())
183-
184-
return nil
115+
return c.subscribeConfigOutsStream.SubscribeConfig(subscribeKey, opts...)
185116
}
186117

187-
// UnsubscribeConfig sends an un-subscribe request to opensergo-control-plane
118+
// UnSubscribeConfig sends an un-subscribe request to opensergo-control-plane
188119
// and remove all the subscribers by subscribeKey.
189-
func (c *OpenSergoClient) UnsubscribeConfig(subscribeKey model.SubscribeKey) error {
190-
191-
if c.subscribeConfigStreamPtr.Load().(*subscribeConfigStream).stream == nil {
192-
logging.Warn("gRPC stream in OpenSergoClient is nil! Cannot unsubscribe config-data. waiting for keepalive goroutine to restart...", "namespace", subscribeKey.Namespace(), "app", subscribeKey.App(), "kinds", subscribeKey.Kind().GetName())
193-
c.subscribeConfigStreamStatus.Store(interrupted)
194-
return errors.New("configStream not ready (nil)")
195-
}
196-
197-
subscribeRequestTarget := transportPb.SubscribeRequestTarget{
198-
Namespace: subscribeKey.Namespace(),
199-
App: subscribeKey.App(),
200-
}
201-
subscribeRequestTarget.Kinds = append(subscribeRequestTarget.Kinds, subscribeKey.Kind().GetName())
202-
203-
subscribeRequest := &transportPb.SubscribeRequest{
204-
Target: &subscribeRequestTarget,
205-
OpType: transportPb.SubscribeOpType_UNSUBSCRIBE,
206-
}
207-
208-
// Send SubscribeRequest (unsubscribe command)
209-
err := c.subscribeConfigStreamPtr.Load().(*subscribeConfigStream).stream.Send(subscribeRequest)
210-
if err != nil {
211-
logging.Error(err, "Failed to send or handle UnsubscribeConfig request", "namespace", subscribeKey.Namespace(), "app", subscribeKey.App(), "kinds", subscribeKey.Kind().GetName())
212-
return err
213-
}
214-
215-
logging.Info("Unsubscribe success", "namespace", subscribeKey.Namespace(), "app", subscribeKey.App(), "kinds", subscribeKey.Kind().GetName())
216-
217-
// Remove subscribers of the subscribe target.
218-
c.subscriberRegistry.RemoveSubscribers(subscribeKey)
219-
220-
return nil
120+
func (c *OpenSergoClient) UnSubscribeConfig(subscribeKey model.SubscribeKey) error {
121+
return c.subscribeConfigOutsStream.UnSubscribeConfig(subscribeKey)
221122
}

pkg/client/opensergo_client_model.go renamed to pkg/client/stream/stream_model.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,32 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package client
15+
package stream
1616

17-
import transportPb "github.com/opensergo/opensergo-go/pkg/proto/transport/v1"
17+
type keepalive struct {
18+
flag bool
19+
}
1820

19-
type OpensergoClientStreamStatus uint8
21+
// PbStreamStatus is the stream status of OpenSergo Universal Transport Service
22+
type PbStreamStatus uint8
2023

2124
const (
22-
initial OpensergoClientStreamStatus = iota
23-
starting
24-
started
25-
interrupted
25+
PbStreamInitial PbStreamStatus = iota
26+
PbStreamStarting
27+
PbStreamStarted
28+
PbStreamInterrupted
29+
PbStreamShutdown
2630
)
2731

28-
type subscribeConfigStream struct {
29-
stream transportPb.OpenSergoUniversalTransportService_SubscribeConfigClient
30-
}
32+
// PbStreamObserverStatus is the stream status of OpenSergo Universal Transport Service
33+
type PbStreamObserverStatus uint8
34+
35+
const (
36+
PbStreamObserverInitial PbStreamObserverStatus = iota
37+
PbStreamObserverRunning
38+
PbStreamObserverStopped
39+
)
3140

32-
type subscribeConfigStreamObserver struct {
33-
observer *SubscribeConfigStreamObserver
41+
type outsStream interface {
42+
StreamStatus() PbStreamStatus
3443
}

0 commit comments

Comments
 (0)