@@ -15,11 +15,9 @@ import (
1515const timeout = 5 * time .Second
1616
1717type QueueManager struct {
18- ActiveStakingQueue client.QueueClient
19- UnbondingStakingQueue client.QueueClient
20- WithdrawableStakingQueue client.QueueClient
21- WithdrawnStakingQueue client.QueueClient
22- logger * zap.Logger
18+ ActiveStakingQueue client.QueueClient
19+ UnbondingStakingQueue client.QueueClient
20+ logger * zap.Logger
2321}
2422
2523func NewQueueManager (cfg * config.QueueConfig , logger * zap.Logger ) (* QueueManager , error ) {
@@ -33,22 +31,10 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager
3331 return nil , fmt .Errorf ("failed to create unbonding staking queue: %w" , err )
3432 }
3533
36- withdrawableStakingQueue , err := client .NewQueueClient (cfg , client .WithdrawableStakingQueueName )
37- if err != nil {
38- return nil , fmt .Errorf ("failed to create withdrawable staking queue: %w" , err )
39- }
40-
41- withdrawnStakingQueue , err := client .NewQueueClient (cfg , client .WithdrawnStakingQueueName )
42- if err != nil {
43- return nil , fmt .Errorf ("failed to create withdrawn staking queue: %w" , err )
44- }
45-
4634 return & QueueManager {
47- ActiveStakingQueue : activeStakingQueue ,
48- UnbondingStakingQueue : unbondingStakingQueue ,
49- WithdrawableStakingQueue : withdrawableStakingQueue ,
50- WithdrawnStakingQueue : withdrawnStakingQueue ,
51- logger : logger .With (zap .String ("module" , "queue consumer" )),
35+ ActiveStakingQueue : activeStakingQueue ,
36+ UnbondingStakingQueue : unbondingStakingQueue ,
37+ logger : logger .With (zap .String ("module" , "queue consumer" )),
5238 }, nil
5339}
5440
@@ -95,41 +81,13 @@ func (qc *QueueManager) PushUnbondingStakingEvent(ctx context.Context, ev *clien
9581 return nil
9682}
9783
98- func (qc * QueueManager ) PushWithdrawableStakingEvent (ctx context.Context , ev * client.StakingEvent ) error {
99- qc .logger .Debug ("pushing withdrawable staking event" , zap .String ("tx_hash" , ev .StakingTxHashHex ))
100-
101- err := pushEvent (ctx , qc .WithdrawableStakingQueue , ev )
102- if err != nil {
103- return fmt .Errorf ("failed to push staking event: %w" , err )
104- }
105-
106- qc .logger .Debug ("successfully pushed withdrawable staking event" , zap .String ("tx_hash" , ev .StakingTxHashHex ))
107- return nil
108- }
109-
110- func (qc * QueueManager ) PushWithdrawnStakingEvent (ctx context.Context , ev * client.StakingEvent ) error {
111- qc .logger .Debug ("pushing withdrawn staking event" , zap .String ("tx_hash" , ev .StakingTxHashHex ))
112-
113- err := pushEvent (ctx , qc .WithdrawnStakingQueue , ev )
114- if err != nil {
115- return fmt .Errorf ("failed to push staking event: %w" , err )
116- }
117-
118- qc .logger .Debug ("successfully pushed withdrawn staking event" , zap .String ("tx_hash" , ev .StakingTxHashHex ))
119- return nil
120- }
121-
12284// requeue message
12385func (qc * QueueManager ) ReQueueMessage (ctx context.Context , message client.QueueMessage , queueName string ) error {
12486 switch queueName {
12587 case client .ActiveStakingQueueName :
12688 return qc .ActiveStakingQueue .ReQueueMessage (ctx , message )
12789 case client .UnbondingStakingQueueName :
12890 return qc .UnbondingStakingQueue .ReQueueMessage (ctx , message )
129- case client .WithdrawableStakingQueueName :
130- return qc .WithdrawableStakingQueue .ReQueueMessage (ctx , message )
131- case client .WithdrawnStakingQueueName :
132- return qc .WithdrawnStakingQueue .ReQueueMessage (ctx , message )
13391 default :
13492 return fmt .Errorf ("unknown queue name: %s" , queueName )
13593 }
@@ -144,14 +102,6 @@ func (qc *QueueManager) Stop() error {
144102 return err
145103 }
146104
147- if err := qc .WithdrawableStakingQueue .Stop (); err != nil {
148- return err
149- }
150-
151- if err := qc .WithdrawnStakingQueue .Stop (); err != nil {
152- return err
153- }
154-
155105 return nil
156106}
157107
@@ -160,8 +110,6 @@ func (qc *QueueManager) Ping() error {
160110 queues := []client.QueueClient {
161111 qc .ActiveStakingQueue ,
162112 qc .UnbondingStakingQueue ,
163- qc .WithdrawableStakingQueue ,
164- qc .WithdrawnStakingQueue ,
165113 }
166114
167115 for _ , queue := range queues {
0 commit comments