Skip to content

Commit c0dea03

Browse files
committed
feat: add topic filter setting
1 parent f30fedb commit c0dea03

11 files changed

+101
-29
lines changed

priv/config.hocon

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mysql {
99
keyfile = "/certs/mysql-client.key"
1010
}
1111
server = "mysql:3306"
12+
topics = []
1213
pool_size = 8
1314
username = "emqx"
1415
password = "public"

priv/config_i18n.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77
"en": "Enable MySQL integration",
88
"zh": "启用 MySQL 集成"
99
},
10+
"$mysql_topics_label": {
11+
"en": "Topics to persist",
12+
"zh": "要持久化的主题"
13+
},
14+
"$mysql_topics_desc": {
15+
"en": "Topic filters for topics to persist, separated by commas",
16+
"zh": "要持久化的主题过滤器,用逗号分隔"
17+
},
1018
"$mysql_insert_message_sql_label": {
1119
"en": "MySQL Insert Message SQL",
1220
"zh": "MySQL 插入消息 SQL"
@@ -209,6 +217,14 @@
209217
"en": "Redis Database",
210218
"zh": "Redis 数据库"
211219
},
220+
"$redis_topics_label": {
221+
"en": "Topics to persist",
222+
"zh": "要持久化的主题"
223+
},
224+
"$redis_topics_desc": {
225+
"en": "Topic filters for topics to persist, separated by commas",
226+
"zh": "要持久化的主题过滤器,用逗号分隔"
227+
},
212228
"$redis_pool_size_label": {
213229
"en": "Redis Pool Size",
214230
"zh": "Redis 连接池大小"

priv/config_schema.avsc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,20 @@
129129
"description": "$mysql_password_desc"
130130
}
131131
},
132+
{
133+
"name": "topics",
134+
"type": {
135+
"type": "array",
136+
"items": "string"
137+
},
138+
"default": [],
139+
"$ui": {
140+
"component": "input-array",
141+
"required": false,
142+
"label": "$mysql_topics_label",
143+
"description": "$mysql_topics_desc"
144+
}
145+
},
132146
{
133147
"name": "batch_size",
134148
"type": "int",
@@ -309,6 +323,20 @@
309323
"description": "$redis_database_desc"
310324
}
311325
},
326+
{
327+
"name": "topics",
328+
"type": {
329+
"type": "array",
330+
"items": "string"
331+
},
332+
"default": [],
333+
"$ui": {
334+
"component": "input-array",
335+
"required": false,
336+
"label": "$redis_topics_label",
337+
"description": "$redis_topics_desc"
338+
}
339+
},
312340
{
313341
"name": "pool_size",
314342
"type": "int",

src/emqx_omp.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,3 @@ on_config_changed(OldConf, NewConf) ->
6363
NewRedisConf = maps:get(<<"redis">>, NewConf, DefaultConf),
6464
ok = emqx_omp_redis:on_config_changed(OldRedisConf, NewRedisConf),
6565
ok.
66-

src/emqx_omp_mysql.erl

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ start(ConfigRaw) ->
7070
[delete_message_sql, select_message_sql, insert_subscription_sql, select_subscriptions_sql],
7171
ConfigRaw
7272
),
73-
Context = Statements,
73+
TopicFilters = emqx_omp_utils:topic_filters(ConfigRaw),
74+
Context = #{
75+
statements => Statements,
76+
topic_filters => TopicFilters
77+
},
7478
hook(Context).
7579

7680
%%-------------------------------------------------------------------
@@ -80,7 +84,7 @@ start(ConfigRaw) ->
8084
on_client_connected(
8185
ClientInfo = #{clientid := ClientId},
8286
ConnInfo,
83-
#{select_subscriptions_sql := {Sql, ParamTemplate}} = _Context
87+
#{statements := #{select_subscriptions_sql := {Sql, ParamTemplate}}} = _Context
8488
) ->
8589
?SLOG(info, #{
8690
msg => omp_mysql_client_connected,
@@ -119,7 +123,10 @@ on_session_subscribed(
119123
ok = fetch_and_deliver_messages(ClientId, Topic, Context).
120124

121125
insert_subscription(
122-
ClientId, Topic, SubOpts, #{insert_subscription_sql := {Sql, ParamTemplate}} = _Context
126+
ClientId,
127+
Topic,
128+
SubOpts,
129+
#{statements := #{insert_subscription_sql := {Sql, ParamTemplate}}} = _Context
123130
) ->
124131
Qos = maps:get(qos, SubOpts, 0),
125132
Params = render_row(ParamTemplate, #{clientid => ClientId, topic => Topic, qos => Qos}),
@@ -136,7 +143,7 @@ insert_subscription(
136143
ok.
137144

138145
fetch_and_deliver_messages(
139-
ClientId, Topic, #{select_message_sql := {Sql, ParamTemplate}} = _Context
146+
ClientId, Topic, #{statements := #{select_message_sql := {Sql, ParamTemplate}}} = _Context
140147
) ->
141148
Params = render_row(ParamTemplate, #{clientid => ClientId, topic => Topic}),
142149
_ =
@@ -154,7 +161,7 @@ fetch_and_deliver_messages(
154161
end,
155162
ok.
156163

157-
on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
164+
on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Context) ->
158165
?SLOG(info, #{
159166
msg => omp_mysql_session_unsubscribed,
160167
clientid => ClientId,
@@ -163,17 +170,17 @@ on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
163170
}),
164171
ok.
165172

166-
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
173+
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Context) ->
167174
{ok, Message};
168-
on_message_publish(Message, _Context) ->
175+
on_message_publish(Message, #{topic_filters := TopicFilters} = _Context) ->
169176
_ =
170-
case emqx_message:qos(Message) of
171-
0 ->
177+
case emqx_omp_utils:need_persist_message(Message, TopicFilters) of
178+
false ->
172179
?SLOG(debug, #{
173-
msg => omp_mysql_message_publish_qos0,
180+
msg => omp_mysql_message_publish_skipped,
174181
message => Message
175182
});
176-
_ ->
183+
true ->
177184
MessageMap = message_to_map(Message),
178185
Res = emqx_resource:query(?RESOURCE_ID, {insert_message, MessageMap}),
179186
?SLOG(info, #{
@@ -187,9 +194,7 @@ on_message_publish(Message, _Context) ->
187194
on_message_acked(
188195
_ClientInfo = #{clientid := ClientId},
189196
#message{id = MsgId} = Message,
190-
#{
191-
delete_message_sql := {Sql, ParamTemplate}
192-
}
197+
#{statements := #{delete_message_sql := {Sql, ParamTemplate}}} = _Context
193198
) ->
194199
?SLOG(info, #{
195200
msg => omp_mysql_message_puback,
@@ -377,7 +382,6 @@ make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement}
377382

378383
{MysqlConfig, ResourceOpts}.
379384

380-
381385
sync_query(Sql, Params) ->
382386
emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params, ?TIMEOUT}).
383387

src/emqx_omp_redis.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ start(ConfigRaw) ->
7171
),
7272
message_ttl => maps:get(
7373
<<"message_ttl">>, ConfigRaw, ?DEFAULT_MESSAGE_TTL
74-
)
74+
),
75+
topic_filters => emqx_omp_utils:topic_filters(ConfigRaw)
7576
},
7677
hook(Context).
7778

@@ -229,15 +230,15 @@ on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
229230

230231
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
231232
{ok, Message};
232-
on_message_publish(Message, #{message_ttl := TTL} = Context) ->
233+
on_message_publish(Message, #{message_ttl := TTL, topic_filters := TopicFilters} = Context) ->
233234
_ =
234-
case emqx_message:qos(Message) of
235-
0 ->
235+
case emqx_omp_utils:need_persist_message(Message, TopicFilters) of
236+
false ->
236237
?SLOG(debug, #{
237238
msg => omp_redis_message_publish_qos0,
238239
message => Message
239240
});
240-
_ ->
241+
true ->
241242
Topic = emqx_message:topic(Message),
242243
MsgId = emqx_message:id(Message),
243244
Now = erlang:system_time(millisecond),

src/emqx_omp_redis_connector.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ on_batch_query(InstId, Batch, State) ->
3939
{cmd, Cmd} -> [Cmd];
4040
{cmds, Cmds} -> Cmds
4141
end
42-
end, Batch),
42+
end,
43+
Batch
44+
),
4345
emqx_redis:on_query(InstId, {cmds, Cmds}, State).
4446

4547
on_get_status(InstId, State) ->
4648
emqx_redis:on_get_status(InstId, State).
47-

src/emqx_omp_utils.erl

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44

55
-module(emqx_omp_utils).
66

7+
-include_lib("emqx/include/logger.hrl").
8+
79
-export([
810
fix_ssl_config/1,
911
make_resource_opts/1,
1012
check_config/2,
1113
deliver_messages/2,
12-
induce_subscriptions/1
14+
induce_subscriptions/1,
15+
need_persist_message/2,
16+
topic_filters/1
1317
]).
1418

1519
fix_ssl_config(#{<<"ssl">> := SslConfig0} = RawConfig) ->
@@ -66,3 +70,23 @@ induce_subscriptions([]) ->
6670
induce_subscriptions(Subscriptions) ->
6771
erlang:send(self(), {subscribe, Subscriptions}),
6872
ok.
73+
74+
topic_filters(ConfigRaw) ->
75+
TopicFiltersRaw = maps:get(<<"topics">>, ConfigRaw, []),
76+
[emqx_topic:words(TopicFilterRaw) || TopicFilterRaw <- TopicFiltersRaw].
77+
78+
need_persist_message(Message, TopicFilters) ->
79+
?SLOG(debug, #{
80+
msg => omp_utils_need_persist_message,
81+
message => emqx_message:to_map(Message),
82+
topic_filters => TopicFilters,
83+
topic => emqx_message:topic(Message)
84+
}),
85+
is_message_qos_nonzero(Message) andalso does_message_topic_match(Message, TopicFilters).
86+
87+
is_message_qos_nonzero(Message) ->
88+
emqx_message:qos(Message) =/= 0.
89+
90+
does_message_topic_match(Message, TopicFilters) ->
91+
Topic = emqx_message:topic(Message),
92+
lists:any(fun(Filter) -> emqx_topic:match(Topic, Filter) end, TopicFilters).

test/emqx_omp_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ init_per_suite(Config) ->
4747
ok = emqx_omp_test_api_helpers:upload_plugin(Filename),
4848
ok = emqx_omp_test_api_helpers:start_plugin(PluginId),
4949
PluginConfig = plugin_config(),
50-
5150
[{plugin_id, PluginId}, {plugin_filename, Filename}, {plugin_config, PluginConfig} | Config].
5251

5352
end_per_suite(_Config) ->
@@ -237,6 +236,7 @@ plugin_config() ->
237236
enable => false,
238237
ssl => DefaultSSLConfig#{server_name_indication => <<"redis-server">>},
239238
servers => <<"invalid-host:6379">>,
239+
topics => [<<"t/#">>],
240240
redis_type => <<"single">>,
241241
pool_size => 8,
242242
username => <<"">>,
@@ -252,6 +252,7 @@ plugin_config() ->
252252
mysql => #{
253253
enable => false,
254254
ssl => DefaultSSLConfig#{server_name_indication => <<"mysql-server">>},
255+
topics => [<<"t/#">>],
255256
server => <<"invalid-host:3306">>,
256257
password => <<"public">>,
257258
username => <<"emqx">>,

test/emqx_omp_test_api_helpers.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,3 @@ delete_all_actions() ->
219219

220220
asset_path() ->
221221
filename:join([code:lib_dir(emqx_offline_message_plugin, test), "assets"]).
222-

0 commit comments

Comments
 (0)