Skip to content

Commit ec364a3

Browse files
committed
wip
1 parent c0dea03 commit ec364a3

File tree

5 files changed

+149
-61
lines changed

5 files changed

+149
-61
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
emqx:
3-
image: emqx/emqx-enterprise:5.8.4
3+
image: emqx/emqx-enterprise:5.8.5
44
container_name: emqx
55
environment:
66
EMQX_LOG__CONSOLE__LEVEL: debug

src/emqx_omp_mysql.erl

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-include_lib("emqx/include/logger.hrl").
99
-include_lib("emqx/include/emqx_hooks.hrl").
1010

11+
-include("emqx_omp.hrl").
12+
1113
-export([
1214
on_config_changed/2
1315
]).
@@ -24,6 +26,31 @@
2426
-define(RESOURCE_GROUP, <<"omp">>).
2527
-define(TIMEOUT, 1000).
2628

29+
% -define(INIT_SQL, [
30+
% """
31+
% CREATE TABLE IF NOT EXISTS `mqtt_msg` (
32+
% `id` bigint unsigned NOT NULL AUTO_INCREMENT,
33+
% `msgid` varchar(64) DEFAULT NULL,
34+
% `topic` varchar(180) NOT NULL,
35+
% `sender` varchar(64) DEFAULT NULL,
36+
% `qos` tinyint(1) NOT NULL DEFAULT '0',
37+
% `retain` tinyint(1) DEFAULT NULL,
38+
% `payload` blob,
39+
% `arrived` datetime NOT NULL,
40+
% PRIMARY KEY (`id`),
41+
% INDEX topic_index(`topic`)
42+
% ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
43+
% """,
44+
% """
45+
% CREATE TABLE IF NOT EXISTS `mqtt_sub` (
46+
% `clientid` varchar(64) NOT NULL,
47+
% `topic` varchar(180) NOT NULL,
48+
% `qos` tinyint(1) NOT NULL DEFAULT '0',
49+
% PRIMARY KEY (`clientid`, `topic`)
50+
% ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
51+
% """
52+
% ]).
53+
2754
-type statement() :: emqx_template_sql:statement().
2855
-type param_template() :: emqx_template_sql:row_template().
2956

@@ -150,10 +177,15 @@ fetch_and_deliver_messages(
150177
case sync_query(Sql, Params) of
151178
{ok, Columns, Rows} ->
152179
Messages = to_messages(Columns, Rows),
180+
?SLOG(debug, #{
181+
msg => omp_mysql_fetch_and_deliver_messages,
182+
topic => Topic,
183+
messages => length(Messages)
184+
}),
153185
ok = emqx_omp_utils:deliver_messages(Topic, Messages),
154-
emqx_metrics_worker:inc(emqx_omp_metrics_worker, session_subscribed, success);
186+
emqx_metrics_worker:inc(?METRICS_WORKER, session_subscribed, success);
155187
{error, Reason} ->
156-
emqx_metrics_worker:inc(emqx_omp_metrics_worker, session_subscribed, fail),
188+
emqx_metrics_worker:inc(?METRICS_WORKER, session_subscribed, fail),
157189
?SLOG(error, #{
158190
msg => "omp_mysql_on_session_subscribed_error",
159191
reason => Reason
@@ -205,9 +237,14 @@ on_message_acked(
205237
_ =
206238
case sync_query(Sql, Params) of
207239
ok ->
208-
emqx_metrics_worker:inc(emqx_omp_metrics_worker, message_acked, success);
240+
emqx_metrics_worker:inc(?METRICS_WORKER, message_acked, success),
241+
?SLOG(debug, #{
242+
msg => "omp_mysql_message_puback_success",
243+
message => emqx_message:to_map(Message),
244+
clientid => ClientId
245+
});
209246
{error, Reason} ->
210-
emqx_metrics_worker:inc(emqx_omp_metrics_worker, message_acked, fail),
247+
emqx_metrics_worker:inc(?METRICS_WORKER, message_acked, fail),
211248
?SLOG(error, #{
212249
msg => "omp_mysql_on_message_acked_error",
213250
reason => Reason
@@ -329,33 +366,6 @@ stop_resource() ->
329366
}),
330367
emqx_resource:remove_local(?RESOURCE_ID).
331368

332-
parse_statements(Keys, BinMap) ->
333-
lists:foldl(
334-
fun(Key, StatementsAcc) ->
335-
BinKey = atom_to_binary(Key, utf8),
336-
StatementRaw = maps:get(BinKey, BinMap),
337-
Parsed = parse_statement(StatementRaw),
338-
StatementsAcc#{Key => Parsed}
339-
end,
340-
#{},
341-
Keys
342-
).
343-
344-
parse_statement(StatementRaw) ->
345-
{Statement, RowTamplate} = emqx_template_sql:parse_prepstmt(
346-
StatementRaw,
347-
#{parameters => '?', strip_double_quote => true}
348-
),
349-
{Statement, RowTamplate}.
350-
351-
render_row(RowTemplate, Map) ->
352-
{Row, _Errors} = emqx_template:render(
353-
RowTemplate,
354-
Map,
355-
#{var_trans => fun(_Name, Value) -> emqx_utils_sql:to_sql_value(Value) end}
356-
),
357-
Row.
358-
359369
make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement} = RawConfig0) ->
360370
RawMysqlConfig0 = maps:with(
361371
[
@@ -385,6 +395,35 @@ make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement}
385395
sync_query(Sql, Params) ->
386396
emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params, ?TIMEOUT}).
387397

398+
%% Render helpers
399+
400+
parse_statements(Keys, BinMap) ->
401+
lists:foldl(
402+
fun(Key, StatementsAcc) ->
403+
BinKey = atom_to_binary(Key, utf8),
404+
StatementRaw = maps:get(BinKey, BinMap),
405+
Parsed = parse_statement(StatementRaw),
406+
StatementsAcc#{Key => Parsed}
407+
end,
408+
#{},
409+
Keys
410+
).
411+
412+
parse_statement(StatementRaw) ->
413+
{Statement, RowTamplate} = emqx_template_sql:parse_prepstmt(
414+
StatementRaw,
415+
#{parameters => '?', strip_double_quote => true}
416+
),
417+
{Statement, RowTamplate}.
418+
419+
render_row(RowTemplate, Map) ->
420+
{Row, _Errors} = emqx_template:render(
421+
RowTemplate,
422+
Map,
423+
#{var_trans => fun(_Name, Value) -> emqx_utils_sql:to_sql_value(Value) end}
424+
),
425+
Row.
426+
388427
%% Hook helpers
389428

390429
unhook() ->

src/emqx_omp_redis.erl

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-include_lib("emqx/include/logger.hrl").
99
-include_lib("emqx/include/emqx_hooks.hrl").
1010

11+
-include("emqx_omp.hrl").
12+
1113
-export([
1214
on_config_changed/2
1315
]).
@@ -138,9 +140,9 @@ fetch_and_deliver_messages(Topic, Context) ->
138140
{ok, MsgIds} ->
139141
Messages = fetch_messages(MsgIds, Context),
140142
ok = emqx_omp_utils:deliver_messages(Topic, Messages),
141-
emqx_metrics_worker:inc(emqx_omp_metrics_worker, session_subscribed, success);
143+
emqx_metrics_worker:inc(?METRICS_WORKER, session_subscribed, success);
142144
{error, Reason} ->
143-
emqx_metrics_worker:inc(emqx_omp_metrics_worker, session_subscribed, fail),
145+
emqx_metrics_worker:inc(?METRICS_WORKER, session_subscribed, fail),
144146
?SLOG(error, #{
145147
msg => "omp_redis_fetch_message_ids_error",
146148
reason => Reason
@@ -183,7 +185,7 @@ fetch_messages(MsgIds, Context) ->
183185
fetch_messages(MsgIds, [], [], Context).
184186

185187
fetch_messages([], [], Acc, _Context) ->
186-
Acc;
188+
lists:reverse(Acc);
187189
fetch_messages(MsgIds, MsgIdBatchAcc, Acc0, Context) when
188190
length(MsgIdBatchAcc) >= ?FETCH_MSG_BATCH_SIZE orelse
189191
(length(MsgIds) =:= 0 andalso length(MsgIdBatchAcc) > 0)
@@ -194,7 +196,7 @@ fetch_messages(MsgIds, MsgIdBatchAcc, Acc0, Context) when
194196
],
195197
case sync_cmds(Cmds) of
196198
{ok, Results} ->
197-
Acc = append_results(Results, Acc0),
199+
Acc = append_results(lists:reverse(Results), Acc0),
198200
fetch_messages(MsgIds, [], Acc, Context);
199201
{error, Reason} ->
200202
?SLOG(error, #{
@@ -274,13 +276,17 @@ on_message_acked(
274276
[<<"DEL">>, msg_table(Context, MsgIDb62)],
275277
[<<"ZREM">>, msg_table(Context, Topic), MsgIDb62]
276278
],
277-
Res = emqx_resource:simple_sync_query(?RESOURCE_ID, {cmds, Cmds}),
278-
?SLOG(info, #{
279-
msg => omp_redis_message_puback,
280-
message => Message,
281-
result => Res,
282-
clientid => ClientId
283-
}).
279+
case emqx_resource:simple_sync_query(?RESOURCE_ID, {cmds, Cmds}) of
280+
{ok, _} ->
281+
emqx_metrics_worker:inc(?METRICS_WORKER, message_acked, success);
282+
{error, Reason} ->
283+
emqx_metrics_worker:inc(?METRICS_WORKER, message_acked, fail),
284+
?SLOG(error, #{
285+
msg => "omp_redis_message_puback_error",
286+
reason => Reason
287+
})
288+
end,
289+
ok.
284290

285291
%%--------------------------------------------------------------------
286292
%% Internal functions

src/emqx_omp_sup.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,5 @@ start_link() ->
1616
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
1717

1818
init([]) ->
19-
ChildSpec = emqx_metrics_worker:child_spec(
20-
?METRICS_WORKER,
21-
emqx_omp_metrics_worker
22-
),
19+
ChildSpec = emqx_metrics_worker:child_spec(?METRICS_WORKER),
2320
{ok, {{one_for_all, 0, 1}, [ChildSpec]}}.

test/emqx_omp_SUITE.erl

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ init_per_suite(Config) ->
5050
[{plugin_id, PluginId}, {plugin_filename, Filename}, {plugin_config, PluginConfig} | Config].
5151

5252
end_per_suite(_Config) ->
53-
ok = emqx_omp_test_api_helpers:delete_all_plugins(),
53+
% ok = emqx_omp_test_api_helpers:delete_all_plugins(),
5454
ok = emqx_omp_test_helpers:stop(),
5555
ok.
5656

@@ -127,16 +127,18 @@ end_per_testcase(_Case, _Config) ->
127127
%%--------------------------------------------------------------------
128128

129129
t_different_subscribers(_Config) ->
130+
Topic = unique_topic(),
131+
130132
% publish message
131-
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
133+
Payload = unique_payload(),
132134
ClientPub = emqtt_connect(),
133-
_ = emqtt:publish(ClientPub, <<"t/1">>, Payload, 1),
135+
_ = emqtt:publish(ClientPub, Topic, Payload, 1),
134136
ok = emqtt:stop(ClientPub),
135137
ct:sleep(500),
136138

137139
% A new subscriber should receive the message
138140
ClientSub0 = emqtt_connect(),
139-
_ = emqtt:subscribe(ClientSub0, <<"t/1">>, 1),
141+
_ = emqtt:subscribe(ClientSub0, Topic, 1),
140142
receive
141143
{publish, #{payload := Payload}} ->
142144
ok
@@ -149,7 +151,7 @@ t_different_subscribers(_Config) ->
149151
%% Another subscriber should NOT receive the message:
150152
%% it should be deleted.
151153
ClientSub1 = emqtt_connect(),
152-
_ = emqtt:subscribe(ClientSub1, <<"t/1">>, 1),
154+
_ = emqtt:subscribe(ClientSub1, Topic, 1),
153155
receive
154156
{publish, #{payload := Payload} = Msg1} ->
155157
ct:fail("Message received: ~p", [Msg1])
@@ -159,17 +161,19 @@ t_different_subscribers(_Config) ->
159161
ok = emqtt:stop(ClientSub1).
160162

161163
t_subscribition_persistence(_Config) ->
162-
SubscriberOpts = [{clientid, <<"subscriber">>}, {clean_start, true}],
164+
ClientId = unique_clientid(),
165+
Topic = unique_topic(),
166+
SubscriberOpts = [{clientid, ClientId}, {clean_start, true}],
163167

164168
%% Subscribe to topic and disconnect loosing session (clean_start = true)
165169
ClientSub0 = emqtt_connect(SubscriberOpts),
166-
_ = emqtt:subscribe(ClientSub0, <<"t/2">>, 1),
170+
_ = emqtt:subscribe(ClientSub0, Topic, 1),
167171
ok = emqtt:stop(ClientSub0),
168172

169173
%% Publish message to topic
170-
Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
174+
Payload0 = unique_payload(),
171175
ClientPub = emqtt_connect(),
172-
_ = emqtt:publish(ClientPub, <<"t/2">>, Payload0, 1),
176+
_ = emqtt:publish(ClientPub, Topic, Payload0, 1),
173177
ct:sleep(500),
174178

175179
%% Reconnect subscriber
@@ -194,8 +198,8 @@ t_subscribition_persistence(_Config) ->
194198
after 1000 ->
195199
ok
196200
end,
197-
Payload1 = emqx_guid:to_hexstr(emqx_guid:gen()),
198-
_ = emqtt:publish(ClientPub, <<"t/2">>, Payload1, 1),
201+
Payload1 = unique_payload(),
202+
_ = emqtt:publish(ClientPub, Topic, Payload1, 1),
199203
receive
200204
{publish, #{payload := Payload1}} ->
201205
ok
@@ -207,8 +211,37 @@ t_subscribition_persistence(_Config) ->
207211
ok = emqtt:stop(ClientPub),
208212
ok = emqtt:stop(ClientSub2).
209213

210-
%% TODO
211-
%% Test message order
214+
t_message_order(_Config) ->
215+
Topic = unique_topic(),
216+
217+
% publish message
218+
ClientPub = emqtt_connect(),
219+
lists:foreach(
220+
fun(I) ->
221+
Payload = integer_to_binary(I),
222+
_ = emqtt:publish(ClientPub, Topic, Payload, 1)
223+
end,
224+
lists:seq(1, 200)
225+
),
226+
ok = emqtt:stop(ClientPub),
227+
ct:sleep(500),
228+
229+
%% Collect messages
230+
ClientSub = emqtt_connect(),
231+
_ = emqtt:subscribe(ClientSub, Topic, 1),
232+
Messages = receive_messages(),
233+
ok = emqtt:stop(ClientSub),
234+
235+
%% Check messages order
236+
?assertEqual(lists:seq(1, 200), Messages).
237+
238+
receive_messages() ->
239+
receive
240+
{publish, #{payload := Payload}} ->
241+
[binary_to_integer(Payload) | receive_messages()]
242+
after 500 ->
243+
[]
244+
end.
212245

213246
%%--------------------------------------------------------------------
214247
%% Internal functions
@@ -262,7 +295,8 @@ plugin_config() ->
262295
delete_message_sql => <<"delete from mqtt_msg where msgid = ${id}">>,
263296
insert_message_sql => <<
264297
"insert into mqtt_msg(msgid, sender, topic, qos, retain, payload, arrived)"
265-
"values(${id}, ${from}, ${topic}, ${qos}, ${flags.retain}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))"
298+
"values(${id}, ${from}, ${topic}, ${qos}, ${flags.retain}, "
299+
"${payload}, FROM_UNIXTIME(${timestamp}/1000))"
266300
>>,
267301
insert_subscription_sql => <<
268302
"insert into mqtt_sub(clientid, topic, qos)"
@@ -291,3 +325,15 @@ set_server(redis_tcp, Config) ->
291325
emqx_utils_maps:deep_put([redis, servers], Config, <<"redis:6379">>);
292326
set_server(redis_ssl, Config) ->
293327
emqx_utils_maps:deep_put([redis, servers], Config, <<"redis-ssl:6380">>).
328+
329+
unique_id() ->
330+
<<(emqx_guid:to_hexstr(emqx_guid:gen()))/binary>>.
331+
332+
unique_topic() ->
333+
<<"t/", (unique_id())/binary>>.
334+
335+
unique_clientid() ->
336+
<<"c/", (unique_id())/binary>>.
337+
338+
unique_payload() ->
339+
<<"p/", (unique_id())/binary>>.

0 commit comments

Comments
 (0)