2424-define (RESOURCE_GROUP , <<" omp" >>).
2525-define (FETCH_MSG_BATCH_SIZE , 100 ).
2626
27+ -define (DEFAULT_MESSAGE_TTL , 7200 ).
28+ -define (DEFAULT_SUBSCRIPTION_KEY_PREFIX , <<" mqtt:sub" >>).
29+ -define (DEFAULT_MESSAGE_KEY_PREFIX , <<" mqtt:msg" >>).
30+
2731-type context () :: map ().
2832
2933% %--------------------------------------------------------------------
@@ -43,6 +47,34 @@ on_config_changed(#{<<"enable">> := true} = _OldConf, #{<<"enable">> := false} =
4347on_config_changed (#{<<" enable" >> := false } = _OldConf , #{<<" enable" >> := true } = NewConf ) ->
4448 ok = start (NewConf ).
4549
50+ % %--------------------------------------------------------------------
51+ % % start/stop
52+ % %--------------------------------------------------------------------
53+
54+ -spec stop () -> ok .
55+ stop () ->
56+ unhook (),
57+ ok = stop_resource ().
58+
59+ -spec start (map ()) -> ok .
60+ start (ConfigRaw ) ->
61+ ? SLOG (info , #{msg => omp_redis_start , config => ConfigRaw }),
62+ {RedisConfig , ResourceOpts } = make_redis_resource_config (ConfigRaw ),
63+ ok = start_resource (RedisConfig , ResourceOpts ),
64+
65+ Context = #{
66+ message_key_prefix => maps :get (
67+ <<" message_key_prefix" >>, ConfigRaw , ? DEFAULT_MESSAGE_KEY_PREFIX
68+ ),
69+ subscription_key_prefix => maps :get (
70+ <<" subscription_key_prefix" >>, ConfigRaw , ? DEFAULT_SUBSCRIPTION_KEY_PREFIX
71+ ),
72+ message_ttl => maps :get (
73+ <<" message_ttl" >>, ConfigRaw , ? DEFAULT_MESSAGE_TTL
74+ )
75+ },
76+ hook (Context ).
77+
4678% %--------------------------------------------------------------------
4779% % Callbacks
4880% %--------------------------------------------------------------------
@@ -101,8 +133,6 @@ insert_subscription(
101133 ok .
102134
103135fetch_and_deliver_messages (Topic , Context ) ->
104- % Topic = ?C(topic, Msg),
105- % MsgTab = table_name(?MSG,Topic),
106136 case fetch_message_ids (Topic , Context ) of
107137 {ok , MsgIds } ->
108138 Messages = fetch_messages (MsgIds , Context ),
@@ -317,30 +347,6 @@ to_subscriptions([Topic, QoSBin | KVs]) ->
317347to_subscriptions ([]) ->
318348 [].
319349
320- % %--------------------------------------------------------------------
321- % % start/stop
322- % %--------------------------------------------------------------------
323-
324- -spec stop () -> ok .
325- stop () ->
326- ok = stop_resource (),
327- unhook ().
328-
329- -spec start (map ()) -> ok .
330- start (ConfigRaw ) ->
331- ? SLOG (info , #{msg => omp_redis_start , config => ConfigRaw }),
332- {RedisConfig , ResourceOpts } = make_redis_resource_config (ConfigRaw ),
333- ok = start_resource (RedisConfig , ResourceOpts ),
334-
335- Context = #{
336- message_key_prefix => maps :get (<<" message_key_prefix" >>, ConfigRaw , <<" mqtt:msg" >>),
337- subscription_key_prefix => maps :get (
338- <<" subscription_key_prefix" >>, ConfigRaw , <<" mqtt:sub" >>
339- ),
340- message_ttl => maps :get (<<" message_ttl" >>, ConfigRaw , 7200 )
341- },
342- hook (Context ).
343-
344350% % Resource helpers
345351
346352make_redis_resource_config (ConfigRaw0 ) ->
0 commit comments