@@ -50,10 +50,6 @@ var raiseWarningLevelLimiter = rate.NewLimiter(rate.Every(3*time.Second), 1)
5050
5151func nopCover (v fixedpoint.Value ) {}
5252
53- type connector interface {
54- Connect (ctx context.Context ) error
55- }
56-
5753type cancelOrdersBySymbolSide interface {
5854 CancelOrdersBySymbolSide (
5955 ctx context.Context , symbol string , side types.SideType ,
@@ -299,7 +295,7 @@ type Strategy struct {
299295
300296 simpleHedgeMode bool
301297
302- connectors [] connector
298+ connectorManager * types. ConnectorManager
303299
304300 profitChanged int64
305301}
@@ -435,6 +431,9 @@ func (s *Strategy) Initialize() error {
435431 for _ , sig := range s .SignalConfigList .Signals {
436432 s .logger .Infof ("using signal provider: %s" , sig )
437433 }
434+
435+ // initialize connector manager
436+ s .connectorManager = types .NewConnectorManager ()
438437 return nil
439438}
440439
@@ -2509,7 +2508,7 @@ func (s *Strategy) CrossRun(
25092508 s .marketTradeStream .OnMarketTrade (func (trade types.Trade ) {
25102509 s .lastPrice .Set (trade .Price )
25112510 })
2512- s .addConnector (s .marketTradeStream )
2511+ s .connectorManager . Add (s .marketTradeStream )
25132512 }
25142513
25152514 if s .FastCancel != nil && s .FastCancel .Enabled {
@@ -2522,15 +2521,15 @@ func (s *Strategy) CrossRun(
25222521 s .makerBookStream = bbgo .NewBookStream (s .hedgeSession , s .SourceSymbol )
25232522 s .makerBook = types .NewStreamBook (s .Symbol , s .makerSession .ExchangeName )
25242523 s .makerBook .BindStream (s .makerBookStream )
2525- s .addConnector (s .makerBookStream )
2524+ s .connectorManager . Add (s .makerBookStream )
25262525 }
25272526
25282527 // TODO: replace the following stream book with HedgeMarket integration
25292528 s .sourceBookStream = bbgo .NewBookStream (s .hedgeSession , s .SourceSymbol , types.SubscribeOptions {
25302529 Depth : types .DepthLevelFull ,
25312530 Speed : types .SpeedLow ,
25322531 })
2533- s .addConnector (s .sourceBookStream )
2532+ s .connectorManager . Add (s .sourceBookStream )
25342533
25352534 s .sourceBook = types .NewStreamBook (s .SourceSymbol , s .hedgeSession .ExchangeName )
25362535 s .sourceBook .BindStream (s .sourceBookStream )
@@ -2730,8 +2729,13 @@ func (s *Strategy) CrossRun(
27302729 )
27312730
27322731 // TODO: use connectivity group to ensure all connections are ready
2733- for _ , ctr := range s .connectors {
2734- if err := ctr .Connect (s .tradingCtx ); err != nil {
2732+ // for _, ctr := range s.connectors {
2733+ // if err := ctr.Connect(s.tradingCtx); err != nil {
2734+ // return err
2735+ // }
2736+ // }
2737+ if s .connectorManager != nil {
2738+ if err := s .connectorManager .Connect (s .tradingCtx ); err != nil {
27352739 return err
27362740 }
27372741 }
@@ -2901,7 +2905,3 @@ func parseSymbolSelector(
29012905 }
29022906 return session , market , nil
29032907}
2904-
2905- func (s * Strategy ) addConnector (ctr connector ) {
2906- s .connectors = append (s .connectors , ctr )
2907- }
0 commit comments