Skip to content

Commit 8b0785d

Browse files
committed
feat: add ssl support and test
1 parent d5de53f commit 8b0785d

File tree

5 files changed

+198
-48
lines changed

5 files changed

+198
-48
lines changed

docker-compose.yml

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,30 @@
11
services:
2-
# emqx:
3-
# image: emqx/emqx-enterprise:5.8.4
4-
# container_name: emqx
5-
# environment:
6-
# EMQX_LOG__CONSOLE__LEVEL: debug
7-
# EMQX_API_KEY__BOOTSTRAP_FILE: "/opt/emqx-bootstrap/api-keys.txt"
8-
# ports:
9-
# - "1883:1883"
10-
# - "18083:18083"
11-
# networks:
12-
# - emqx_network
13-
# healthcheck:
14-
# test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
15-
# interval: 5s
16-
# timeout: 25s
17-
# retries: 5
18-
# volumes:
19-
# - ./test/assets/api-keys.txt:/opt/emqx-bootstrap/api-keys.txt
20-
# depends_on:
21-
# mysql:
22-
# condition: service_healthy
23-
# redis:
24-
# condition: service_healthy
2+
emqx:
3+
image: emqx/emqx-enterprise:5.8.4
4+
container_name: emqx
5+
environment:
6+
EMQX_LOG__CONSOLE__LEVEL: debug
7+
EMQX_API_KEY__BOOTSTRAP_FILE: "/opt/emqx-bootstrap/api-keys.txt"
8+
ports:
9+
- "1883:1883"
10+
- "18083:18083"
11+
networks:
12+
- emqx_network
13+
healthcheck:
14+
test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
15+
interval: 5s
16+
timeout: 25s
17+
retries: 5
18+
volumes:
19+
- ./test/assets/certs:/certs:ro
20+
- ./test/assets/api-keys.txt:/opt/emqx-bootstrap/api-keys.txt:ro
21+
depends_on:
22+
mysql:
23+
condition: service_healthy
24+
mysql-ssl:
25+
condition: service_healthy
26+
redis:
27+
condition: service_healthy
2528

2629
mysql:
2730
image: mysql:8.0

priv/config_i18n.json

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,48 @@
111111
"en": "MySQL SSL Enable",
112112
"zh": "MySQL SSL 启用"
113113
},
114+
"$mysql_ssl_server_name_indication_label": {
115+
"en": "MySQL SSL Server Name Indication",
116+
"zh": "MySQL SSL 服务器名称指示"
117+
},
118+
"$mysql_ssl_server_name_indication_desc": {
119+
"en": "MySQL SSL Server Name Indication",
120+
"zh": "MySQL SSL 服务器名称指示"
121+
},
122+
"$mysql_ssl_verify_label": {
123+
"en": "MySQL SSL Verify",
124+
"zh": "MySQL SSL 验证"
125+
},
126+
"$mysql_ssl_verify_desc": {
127+
"en": "MySQL SSL Verify",
128+
"zh": "MySQL SSL 验证"
129+
},
130+
"$mysql_ssl_cacertfile_label": {
131+
"en": "MySQL SSL CA Cert File",
132+
"zh": "MySQL SSL CA 证书文件"
133+
},
134+
"$mysql_ssl_cacertfile_desc": {
135+
"en": "MySQL SSL CA Cert File",
136+
"zh": "MySQL SSL CA 证书文件"
137+
},
138+
"$mysql_ssl_certfile_label": {
139+
"en": "MySQL SSL Cert File",
140+
"zh": "MySQL SSL 证书文件"
141+
},
142+
"$mysql_ssl_certfile_desc": {
143+
"en": "MySQL SSL Cert File",
144+
"zh": "MySQL SSL 证书文件"
145+
},
146+
"$mysql_ssl_keyfile_label": {
147+
"en": "MySQL SSL Key File",
148+
"zh": "MySQL SSL 密钥文件"
149+
},
150+
"$mysql_ssl_keyfile_desc": {
151+
"en": "MySQL SSL Key File",
152+
"zh": "MySQL SSL 密钥文件"
153+
},
154+
155+
114156
"$enable_redis_label": {
115157
"en": "Enable Redis",
116158
"zh": "启用 Redis"

priv/config_schema.avsc

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
"name": "server",
7979
"type": "string",
8080
"default": "127.0.0.1:3306",
81-
"required": true,
8281
"$ui": {
8382
"component": "input",
8483
"required": true,
@@ -134,7 +133,6 @@
134133
"name": "batch_size",
135134
"type": "int",
136135
"default": 0,
137-
"required": true,
138136
"$ui": {
139137
"component": "input-number",
140138
"required": true,
@@ -146,7 +144,6 @@
146144
"name": "batch_time",
147145
"type": "int",
148146
"default": 100,
149-
"required": true,
150147
"$ui": {
151148
"component": "input-number",
152149
"required": true,
@@ -170,11 +167,75 @@
170167
"label": "$mysql_ssl_enable_label",
171168
"description": "$mysql_ssl_enable_desc"
172169
}
170+
},
171+
{
172+
"name": "server_name_indication",
173+
"type": "string",
174+
"default": "disable",
175+
"$ui": {
176+
"component": "input",
177+
"required": false,
178+
"label": "$mysql_ssl_server_name_indication_label",
179+
"description": "$mysql_ssl_server_name_indication_desc"
180+
}
181+
},
182+
{
183+
"name": "cacertfile",
184+
"type": "string",
185+
"default": "",
186+
"$ui": {
187+
"component": "input",
188+
"required": false,
189+
"label": "$mysql_ssl_cacertfile_label",
190+
"description": "$mysql_ssl_cacertfile_desc"
191+
}
192+
},
193+
{
194+
"name": "certfile",
195+
"type": "string",
196+
"default": "",
197+
"$ui": {
198+
"component": "input",
199+
"required": false,
200+
"label": "$mysql_ssl_certfile_label",
201+
"description": "$mysql_ssl_certfile_desc"
202+
}
203+
},
204+
{
205+
"name": "keyfile",
206+
"type": "string",
207+
"default": "",
208+
"$ui": {
209+
"component": "input",
210+
"required": false,
211+
"label": "$mysql_ssl_keyfile_label",
212+
"description": "$mysql_ssl_keyfile_desc"
213+
}
214+
},
215+
{
216+
"name": "verify",
217+
"type": "string",
218+
"default": "verify_none",
219+
"$ui": {
220+
"component": "select",
221+
"required": false,
222+
"label": "$mysql_ssl_verify_label",
223+
"description": "$mysql_ssl_verify_desc",
224+
"options": [
225+
{
226+
"label": "verify_none",
227+
"value": "verify_none"
228+
},
229+
{
230+
"label": "verify_peer",
231+
"value": "verify_peer"
232+
}
233+
]
234+
}
173235
}
174236
]
175237
}
176238
}
177-
178239
]
179240
}
180241
},

src/emqx_omp_mysql.erl

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
-define(RESOURCE_ID, <<"omp_mysql">>).
2424
-define(RESOURCE_GROUP, <<"omp">>).
25+
-define(TIMEOUT, 1000).
2526

2627
-type statement() :: emqx_template_sql:statement().
2728
-type param_template() :: emqx_template_sql:row_template().
@@ -89,7 +90,7 @@ on_client_connected(
8990
}),
9091
Params = render_row(ParamTemplate, #{clientid => ClientId}),
9192
_ =
92-
case emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params}) of
93+
case sync_query(Sql, Params) of
9394
{ok, Columns, Rows} ->
9495
Subscriptions = to_subscriptions(Columns, Rows),
9596
ok = induce_subscriptions(Subscriptions),
@@ -117,24 +118,29 @@ on_session_subscribed(
117118
ok = insert_subscription(ClientId, Topic, SubOpts, Context),
118119
ok = fetch_and_deliver_messages(ClientId, Topic, Context).
119120

120-
insert_subscription(ClientId, Topic, SubOpts, #{insert_subscription_sql := {Sql, ParamTemplate}} = _Context) ->
121+
insert_subscription(
122+
ClientId, Topic, SubOpts, #{insert_subscription_sql := {Sql, ParamTemplate}} = _Context
123+
) ->
121124
Qos = maps:get(qos, SubOpts, 0),
122125
Params = render_row(ParamTemplate, #{clientid => ClientId, topic => Topic, qos => Qos}),
123-
_ = case emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params}) of
124-
ok ->
125-
ok;
126-
{error, Reason} ->
127-
?SLOG(error, #{
128-
msg => "omp_mysql_insert_subscription_error",
129-
reason => Reason
130-
})
126+
_ =
127+
case sync_query(Sql, Params) of
128+
ok ->
129+
ok;
130+
{error, Reason} ->
131+
?SLOG(error, #{
132+
msg => "omp_mysql_insert_subscription_error",
133+
reason => Reason
134+
})
131135
end,
132136
ok.
133137

134-
fetch_and_deliver_messages(ClientId, Topic, #{select_message_sql := {Sql, ParamTemplate}} = _Context) ->
138+
fetch_and_deliver_messages(
139+
ClientId, Topic, #{select_message_sql := {Sql, ParamTemplate}} = _Context
140+
) ->
135141
Params = render_row(ParamTemplate, #{clientid => ClientId, topic => Topic}),
136142
_ =
137-
case emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params}) of
143+
case sync_query(Sql, Params) of
138144
{ok, Columns, Rows} ->
139145
Messages = to_messages(Columns, Rows),
140146
ok = deliver_messages(Topic, Messages),
@@ -280,8 +286,11 @@ message_to_map(Message) ->
280286

281287
%% Subscription helpers
282288

283-
induce_subscriptions([]) -> ok;
284-
induce_subscriptions(Subscriptions) -> erlang:send(self(), {subscribe, Subscriptions}).
289+
induce_subscriptions([]) ->
290+
ok;
291+
induce_subscriptions(Subscriptions) ->
292+
erlang:send(self(), {subscribe, Subscriptions}),
293+
ok.
285294

286295
to_subscriptions(Columns, Rows) ->
287296
lists:flatmap(fun(Row) -> record_to_subscription(lists:zip(Columns, Row)) end, Rows).
@@ -357,7 +366,7 @@ render_row(RowTemplate, Map) ->
357366
Row.
358367

359368
make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement} = RawConfig0) ->
360-
RawMysqlConfig = maps:with(
369+
RawMysqlConfig0 = maps:with(
361370
[
362371
<<"server">>,
363372
<<"database">>,
@@ -368,6 +377,8 @@ make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement}
368377
],
369378
RawConfig0
370379
),
380+
SslConfig = make_ssl_config(RawMysqlConfig0),
381+
RawMysqlConfig = RawMysqlConfig0#{<<"ssl">> => SslConfig},
371382

372383
MysqlConfig0 =
373384
case
@@ -397,6 +408,22 @@ make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement}
397408

398409
{MysqlConfig, ResourceOpts}.
399410

411+
make_ssl_config(#{<<"ssl">> := SslConfig}) ->
412+
maps:filter(
413+
fun
414+
(_K, <<>>) ->
415+
false;
416+
(_K, _V) ->
417+
true
418+
end,
419+
SslConfig
420+
);
421+
make_ssl_config(_) ->
422+
#{<<"enable">> => false}.
423+
424+
sync_query(Sql, Params) ->
425+
emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params, ?TIMEOUT}).
426+
400427
%% Hook helpers
401428

402429
unhook() ->

test/emqx_omp_SUITE.erl

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ groups() ->
3131
[
3232
{mysql, [], [{group, buffered}, {group, unbuffered}]},
3333
{redis, [], [{group, buffered}, {group, unbuffered}]},
34-
{buffered, [], All},
35-
{unbuffered, [], All}
34+
{buffered, [], [{group, tcp}, {group, ssl}]},
35+
{unbuffered, [], [{group, tcp}, {group, ssl}]},
36+
{tcp, [], All},
37+
{ssl, [], All}
3638
].
3739

3840
init_per_suite(Config) ->
@@ -50,7 +52,7 @@ init_per_suite(Config) ->
5052
[{plugin_id, PluginId}, {plugin_filename, Filename}, {plugin_config, PluginConfig} | Config].
5153

5254
end_per_suite(_Config) ->
53-
% ok = emqx_omp_test_api_helpers:delete_all_plugins(),
55+
ok = emqx_omp_test_api_helpers:delete_all_plugins(),
5456
ok = emqx_omp_test_helpers:stop(),
5557
ok.
5658

@@ -65,7 +67,17 @@ init_per_group(buffered, Config) ->
6567
init_per_group(unbuffered, Config) ->
6668
PluginConfig0 = ?config(plugin_config, Config),
6769
PluginConfig = emqx_utils_maps:deep_put([mysql, batch_size], PluginConfig0, 0),
68-
?set_config(plugin_config, PluginConfig, Config).
70+
?set_config(plugin_config, PluginConfig, Config);
71+
init_per_group(tcp, Config) ->
72+
PluginConfig0 = ?config(plugin_config, Config),
73+
PluginConfig1 = emqx_utils_maps:deep_put([mysql, ssl, enable], PluginConfig0, false),
74+
PluginConfig2 = emqx_utils_maps:deep_put([mysql, server], PluginConfig1, <<"mysql:3306">>),
75+
?set_config(plugin_config, PluginConfig2, Config);
76+
init_per_group(ssl, Config) ->
77+
PluginConfig0 = ?config(plugin_config, Config),
78+
PluginConfig1 = emqx_utils_maps:deep_put([mysql, ssl, enable], PluginConfig0, true),
79+
PluginConfig2 = emqx_utils_maps:deep_put([mysql, server], PluginConfig1, <<"mysql-ssl:3306">>),
80+
?set_config(plugin_config, PluginConfig2, Config).
6981

7082
end_per_group(_Group, _Config) ->
7183
ok.
@@ -185,13 +197,18 @@ plugin_config() ->
185197
},
186198
mysql => #{
187199
ssl => #{
188-
enable => false
200+
enable => false,
201+
server_name_indication => <<"mysql-server">>,
202+
verify => <<"verify_peer">>,
203+
cacertfile => <<"/certs/ca.crt">>,
204+
certfile => <<"/certs/mysql-client.crt">>,
205+
keyfile => <<"/certs/mysql-client.key">>
189206
},
190207
password => <<"public">>,
191208
username => <<"emqx">>,
192209
pool_size => 8,
193210
database => <<"emqx">>,
194-
server => <<"localhost:3306">>,
211+
server => <<"invalid-host:3306">>,
195212
select_message_sql => <<"select * from mqtt_msg where topic = ${topic}">>,
196213
delete_message_sql => <<"delete from mqtt_msg where msgid = ${id}">>,
197214
insert_message_sql => <<

0 commit comments

Comments
 (0)