Skip to content

Commit f2bb253

Browse files
authored
Merge pull request #1 from savonarola/250402-offline-plugin-healthcheck
feat: implement plugin health check
2 parents efe2dab + ebeb524 commit f2bb253

10 files changed

+127
-10
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
services:
22
emqx:
33
image: emqx/emqx-enterprise:5.8.5
4+
# TODO: uncomment when 5.9.0 image is released
5+
# image: docker.io/emqx/emqx-enterprise:5.9.0
46
container_name: emqx
57
environment:
68
EMQX_LOG__CONSOLE__LEVEL: debug
File renamed without changes.

src/emqx_offline_message_plugin_app.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
]).
1818

1919
%% EMQX Plugin callbacks
20-
-export([on_config_changed/2]).
20+
-export([
21+
on_config_changed/2,
22+
on_health_check/1
23+
]).
2124

2225
start(_StartType, _StartArgs) ->
2326
{ok, Sup} = emqx_omp_sup:start_link(),
@@ -28,3 +31,6 @@ stop(_State) ->
2831

2932
on_config_changed(OldConf, NewConf) ->
3033
emqx_omp:on_config_changed(OldConf, NewConf).
34+
35+
on_health_check(_Options) ->
36+
emqx_omp:on_health_check().

src/emqx_omp.erl

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
%% Plugin callbacks
2121
-export([
22-
on_config_changed/2
22+
on_config_changed/2,
23+
on_health_check/0
2324
]).
2425

2526
%% gen_server callbacks
@@ -39,11 +40,14 @@
3940
%%--------------------------------------------------------------------
4041

4142
-record(state, {}).
43+
4244
-record(on_config_changed, {
4345
old_conf :: map(),
4446
new_conf :: map()
4547
}).
4648

49+
-record(on_health_check, {}).
50+
4751
%%--------------------------------------------------------------------
4852
%% API
4953
%%--------------------------------------------------------------------
@@ -68,7 +72,20 @@ child_spec() ->
6872
%%--------------------------------------------------------------------
6973

7074
on_config_changed(OldConf, NewConf) ->
71-
gen_server:call(?SERVER, #on_config_changed{old_conf = OldConf, new_conf = NewConf}, ?TIMEOUT).
75+
try
76+
gen_server:call(?SERVER, #on_config_changed{old_conf = OldConf, new_conf = NewConf}, ?TIMEOUT)
77+
catch
78+
exit:{noproc, _} ->
79+
ok
80+
end.
81+
82+
on_health_check() ->
83+
try
84+
gen_server:call(?SERVER, #on_health_check{}, ?TIMEOUT)
85+
catch
86+
exit:{noproc, _} ->
87+
{error, <<"Plugin is not running">>}
88+
end.
7289

7390
%%--------------------------------------------------------------------
7491
%% gen_server callbacks
@@ -83,6 +100,8 @@ init([]) ->
83100

84101
handle_call(#on_config_changed{old_conf = OldConf, new_conf = NewConf}, _From, State) ->
85102
{reply, handle_on_config_changed(OldConf, NewConf), State};
103+
handle_call(#on_health_check{}, _From, State) ->
104+
{reply, handle_on_health_check(), State};
86105
handle_call(Request, From, State) ->
87106
?SLOG(error, #{
88107
msg => "offline_message_plugin_unexpected_call", request => Request, from => From
@@ -125,9 +144,32 @@ handle_on_config_changed(OldConf, NewConf) ->
125144
ok = emqx_omp_redis:on_config_changed(OldRedisConf, NewRedisConf),
126145
ok.
127146

147+
handle_on_health_check() ->
148+
Config = current_config(),
149+
MysqlConf = maps:get(<<"mysql">>, Config, #{}),
150+
RedisConf = maps:get(<<"redis">>, Config, #{}),
151+
MysqlStatus = emqx_omp_mysql:on_health_check(MysqlConf),
152+
RedisStatus = emqx_omp_redis:on_health_check(RedisConf),
153+
Errors = status_to_error_list(MysqlStatus) ++ status_to_error_list(RedisStatus),
154+
case Errors of
155+
[] ->
156+
ok;
157+
Errors ->
158+
{error, iolist_to_binary(lists:join(",", Errors))}
159+
end.
160+
161+
status_to_error_list(ok) -> [];
162+
status_to_error_list({error, Error}) -> [Error].
163+
128164
current_config() ->
129-
{ok, Config} = emqx_plugins:get_config(?PLUGIN_NAME_VSN),
130-
Config.
165+
case emqx_plugins:get_config(?PLUGIN_NAME_VSN) of
166+
%% Pre 5.9.0
167+
{ok, Config} when is_map(Config) ->
168+
Config;
169+
%% 5.9.0 and later
170+
Config when is_map(Config) ->
171+
Config
172+
end.
131173

132174
init_metrics() ->
133175
?SLOG(info, #{msg => "omp_init_metrics"}),

src/emqx_omp_mysql.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
-include("emqx_omp.hrl").
1212

1313
-export([
14-
on_config_changed/2
14+
on_config_changed/2,
15+
on_health_check/1
1516
]).
1617

1718
-export([
@@ -78,6 +79,12 @@ on_config_changed(#{<<"enable">> := true} = _OldConf, #{<<"enable">> := false} =
7879
on_config_changed(#{<<"enable">> := false} = _OldConf, #{<<"enable">> := true} = NewConf) ->
7980
ok = start(NewConf).
8081

82+
-spec on_health_check(map()) -> ok | {error, binary()}.
83+
on_health_check(#{<<"enable">> := false}) ->
84+
ok;
85+
on_health_check(#{<<"enable">> := true}) ->
86+
emqx_omp_utils:resource_health_status(<<"MySQL">>, ?RESOURCE_ID).
87+
8188
%%-------------------------------------------------------------------
8289
%% start/stop
8390
%%-------------------------------------------------------------------

src/emqx_omp_redis.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
-include("emqx_omp.hrl").
1212

1313
-export([
14-
on_config_changed/2
14+
on_config_changed/2,
15+
on_health_check/1
1516
]).
1617

1718
-export([
@@ -49,6 +50,12 @@ on_config_changed(#{<<"enable">> := true} = _OldConf, #{<<"enable">> := false} =
4950
on_config_changed(#{<<"enable">> := false} = _OldConf, #{<<"enable">> := true} = NewConf) ->
5051
ok = start(NewConf).
5152

53+
-spec on_health_check(map()) -> ok | {error, binary()}.
54+
on_health_check(#{<<"enable">> := false}) ->
55+
ok;
56+
on_health_check(#{<<"enable">> := true}) ->
57+
emqx_omp_utils:resource_health_status(<<"Redis">>, ?RESOURCE_ID).
58+
5259
%%--------------------------------------------------------------------
5360
%% start/stop
5461
%%--------------------------------------------------------------------

src/emqx_omp_utils.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
deliver_messages/2,
1414
induce_subscriptions/1,
1515
need_persist_message/2,
16-
topic_filters/1
16+
topic_filters/1,
17+
resource_health_status/2
1718
]).
1819

1920
fix_ssl_config(#{<<"ssl">> := SslConfig0} = RawConfig) ->
@@ -90,3 +91,13 @@ is_message_qos_nonzero(Message) ->
9091
does_message_topic_match(Message, TopicFilters) ->
9192
Topic = emqx_message:topic(Message),
9293
lists:any(fun(Filter) -> emqx_topic:match(Topic, Filter) end, TopicFilters).
94+
95+
resource_health_status(Name, ResourceId) ->
96+
case emqx_resource:health_check(ResourceId) of
97+
{ok, connected} ->
98+
ok;
99+
{ok, OtherStatus} ->
100+
{error, iolist_to_binary(io_lib:format("Resource ~s is not connected, status: ~p", [Name, OtherStatus]))};
101+
{error, Reason} ->
102+
{error, iolist_to_binary(io_lib:format("Resource ~s health check failed: ~p", [Name, Reason]))}
103+
end.

test/emqx_omp_SUITE.erl

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,17 @@ groups() ->
3333
[mysql_tcp, mysql_ssl, redis_tcp, redis_ssl],
3434
[sync, async],
3535
[buffered, unbuffered],
36-
emqx_omp_test_helpers:all(?MODULE)
36+
%% TODO: restore t_health_check when 5.9.0 image is released
37+
%% and used in the Docker Compose file
38+
emqx_omp_test_helpers:all(?MODULE) -- [t_health_check]
3739
]).
3840

3941
init_per_suite(Config) ->
4042
ok = emqx_omp_test_helpers:start(),
4143

4244
%% clean up
4345
ok = emqx_omp_test_api_helpers:delete_all_plugins(),
46+
ok = emqx_omp_test_helpers:allow_plugin_install(),
4447

4548
%% install plugin
4649
{PluginId, Filename} = emqx_omp_test_api_helpers:find_plugin(),
@@ -211,6 +214,25 @@ t_subscribition_persistence(_Config) ->
211214
ok = emqtt:stop(ClientPub),
212215
ok = emqtt:stop(ClientSub2).
213216

217+
t_health_check(Config) ->
218+
PluginId = ?config(plugin_id, Config),
219+
?assertMatch(
220+
#{
221+
<<"running_status">> :=
222+
[#{<<"health_status">> := #{<<"status">> := <<"ok">>}}]
223+
},
224+
emqx_omp_test_api_helpers:get_plugin(PluginId)
225+
),
226+
Config0 = ?config(plugin_config, Config),
227+
Config1 = emqx_utils_maps:deep_put([mysql, server], Config0, <<"bad-host:3306">>),
228+
Config2 = emqx_utils_maps:deep_put([redis, servers], Config1, <<"bad-host:6379">>),
229+
ok = emqx_omp_test_api_helpers:configure_plugin(PluginId, Config2),
230+
?assertMatch(
231+
#{<<"running_status">> := [#{<<"health_status">> := #{<<"status">> := <<"error">>}}]},
232+
emqx_omp_test_api_helpers:get_plugin(PluginId)
233+
),
234+
ok.
235+
214236
t_message_order(_Config) ->
215237
Topic = unique_topic(),
216238

test/emqx_omp_test_api_helpers.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
find_plugin/0,
1818
upload_plugin/1,
1919
list_plugins/0,
20+
get_plugin/1,
2021
start_plugin/1,
2122
stop_plugin/1,
2223
delete_plugin/1,
@@ -106,6 +107,14 @@ list_plugins() ->
106107
error(Error)
107108
end.
108109

110+
get_plugin(PluginId) ->
111+
case emqx_omp_test_helpers:api_get({plugins, PluginId}) of
112+
{ok, Plugin} ->
113+
Plugin;
114+
{error, Error} ->
115+
error(Error)
116+
end.
117+
109118
plugin_id(#{<<"name">> := Name, <<"rel_vsn">> := RelVsn}) ->
110119
<<Name/binary, "-", RelVsn/binary>>.
111120

test/emqx_omp_test_helpers.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
-compile(nowarn_export_all).
88
-compile(export_all).
99

10+
-include("emqx_omp.hrl").
11+
1012
all(Suite) ->
1113
lists:usort([
1214
F
@@ -33,6 +35,13 @@ start() ->
3335
stop() ->
3436
ok.
3537

38+
allow_plugin_install() ->
39+
Command = "docker compose exec emqx /opt/emqx/bin/emqx ctl plugins allow " ++ binary_to_list(?PLUGIN_NAME_VSN),
40+
ct:print("Command: ~s~n", [Command]),
41+
os:cmd(Command),
42+
timer:sleep(1000),
43+
ok.
44+
3645
api_get(Path) ->
3746
Result = make_request({get, Path}),
3847
handle_result(Result).
@@ -94,7 +103,9 @@ handle_result({error, Reason}) ->
94103

95104
handle_result_raw({ok, Code, _Headers, ClientRef}) when Code >= 200 andalso Code < 300 ->
96105
hackney:body(ClientRef);
97-
handle_result_raw({ok, Code, _Headers, _Body}) ->
106+
handle_result_raw({ok, Code, _Headers, ClientRef}) ->
107+
{ok, Body} = hackney:body(ClientRef),
108+
ct:pal("Response body:~n~s~n", [Body]),
98109
{error, {http_status, Code}};
99110
handle_result_raw({error, Reason}) ->
100111
{error, Reason}.

0 commit comments

Comments
 (0)