@@ -304,7 +304,8 @@ lookup_dlx(#state{exchange_ref = DLXRef} = State0) ->
304304 State = log_missing_dlx_once (State0 ),
305305 {not_found , State };
306306 {ok , X } ->
307- {X , State0 }
307+ State = clear_log_missing_dlx_once (State0 ),
308+ {X , State }
308309 end .
309310
310311-spec forward (mc :state (), non_neg_integer (), rabbit_amqqueue :name (),
@@ -344,7 +345,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
344345 [] ->
345346 log_no_route_once (State1 );
346347 _ ->
347- State1
348+ clear_log_no_route_once ( State1 )
348349 end ,
349350 {RouteToQs , State2 }
350351 end ,
@@ -499,8 +500,9 @@ redeliver0(#pending{delivery = Msg0,
499500 % % Routes changed dynamically so that we don't await any publisher confirms anymore.
500501 % % Since we also received at least one publisher confirm (mandatory flag semantics),
501502 % % we can ack the message to the source quorum queue.
502- State0 # state {pendings = maps :remove (OutSeq , Pendings ),
503- settled_ids = [ConsumedId | SettledIds ]};
503+ State = State0 # state {pendings = maps :remove (OutSeq , Pendings ),
504+ settled_ids = [ConsumedId | SettledIds ]},
505+ clear_log_no_route_once (State );
504506 _ ->
505507 % % Do not redeliver message to a target queue
506508 % % 1. for which we already received a publisher confirm, or
@@ -513,7 +515,7 @@ redeliver0(#pending{delivery = Msg0,
513515 State1 = log_cycles (Cycles , DLRKeys , State0 ),
514516 case RouteToQs of
515517 [] ->
516- State1 ;
518+ log_no_route_once ( State1 ) ;
517519 _ ->
518520 Pend = Pend0 # pending {publish_count = PublishCount + 1 ,
519521 last_published_at = os :system_time (millisecond ),
@@ -523,7 +525,7 @@ redeliver0(#pending{delivery = Msg0,
523525 % % Any target queue that rejected previously and still need
524526 % % to be routed to is moved back to 'unsettled'.
525527 rejected = []},
526- State = State0 # state {pendings = maps :update (OutSeq , Pend , Pendings )},
528+ State = clear_log_no_route_once ( State0 # state {pendings = maps :update (OutSeq , Pend , Pendings )}) ,
527529 Options = #{correlation => OutSeq },
528530 deliver_to_queues (Msg ,
529531 Options ,
@@ -633,6 +635,19 @@ log_missing_dlx_once(#state{exchange_ref = DlxResource,
633635 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource )]),
634636 State # state {logged = maps :put (missing_dlx , DlxResource , Logged )}.
635637
638+ clear_log_missing_dlx_once (# state {exchange_ref = DlxResource ,
639+ queue_ref = QueueResource ,
640+ pendings = Pendings ,
641+ logged = #{missing_dlx := MissingDlx } = Logged } = State ) ->
642+ ? LOG_INFO (" Dead-letter-exchange ~ts found for quorum ~ts . Forwarding was previously "
643+ " blocked since the configured dead-letter-exchange ~ts could not be found. "
644+ " Forwarding of ~b pending dead-letter messages will be attempted." ,
645+ [rabbit_misc :rs (DlxResource ), rabbit_misc :rs (QueueResource ),
646+ rabbit_misc :rs (MissingDlx ), maps :size (Pendings )]),
647+ State # state {logged = maps :remove (missing_dlx , Logged )};
648+ clear_log_missing_dlx_once (State ) ->
649+ State .
650+
636651log_no_route_once (# state {exchange_ref = SameDlx ,
637652 routing_key = SameRoutingKey ,
638653 logged = #{no_route := {SameDlx , SameRoutingKey }}} = State ) ->
@@ -653,6 +668,22 @@ log_no_route_once(#state{queue_ref = QueueResource,
653668 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ), RoutingKey ]),
654669 State # state {logged = maps :put (no_route , {DlxResource , RoutingKey }, Logged )}.
655670
671+ clear_log_no_route_once (# state {exchange_ref = DlxResource ,
672+ routing_key = RoutingKey ,
673+ queue_ref = QueueResource ,
674+ pendings = Pendings ,
675+ logged = #{no_route := {OldDlx , OldRoutingKey }} = Logged } = State ) ->
676+ ? LOG_INFO (" Discovered a route to forward dead-letter messages from quorum ~ts on "
677+ " configured dead-letter-exchange ~ts and dead-letter-routing-key '~ts '. "
678+ " Previously dead-letter messages could not be forwarded on configured "
679+ " dead-letter-exchange ~ts and dead-letter-routing-key '~ts '. "
680+ " Forwarding of ~b pending dead-letter messages will be attempted." ,
681+ [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ),
682+ RoutingKey , rabbit_misc :rs (OldDlx ), OldRoutingKey , maps :size (Pendings )]),
683+ State # state {logged = maps :remove (no_route , Logged )};
684+ clear_log_no_route_once (State ) ->
685+ State .
686+
656687log_cycles (Cycles , RoutingKeys , State ) ->
657688 lists :foldl (fun (Cycle , S ) -> log_cycle_once (Cycle , RoutingKeys , S ) end , State , Cycles ).
658689
0 commit comments