Skip to content

Commit c1f38a4

Browse files
committed
feat: add option to initialize default schema
1 parent 7315cd8 commit c1f38a4

File tree

8 files changed

+72
-50
lines changed

8 files changed

+72
-50
lines changed

docker-compose.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ services:
3939
MYSQL_PASSWORD: public
4040
ports:
4141
- "3306:3306"
42-
volumes:
43-
- ./docker/mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
4442
networks:
4543
- emqx_network
4644
healthcheck:
@@ -68,7 +66,6 @@ services:
6866
- "3307:3306"
6967
volumes:
7068
- ./test/assets/certs:/certs:ro
71-
- ./docker/mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
7269
networks:
7370
- emqx_network
7471
healthcheck:

docker/mysql/docker-entrypoint-initdb.d/init.sql

Lines changed: 0 additions & 21 deletions
This file was deleted.

priv/config.hocon

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mysql {
1414
username = "emqx"
1515
password = "public"
1616
database = "emqx"
17+
init_default_schema = false
1718
insert_message_sql = "insert into mqtt_msg(msgid, sender, topic, qos, retain, payload, arrived) values(${id}, ${from}, ${topic}, ${qos}, ${flags.retain}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))"
1819
delete_message_sql = "delete from mqtt_msg where msgid = ${id}"
1920
select_message_sql = "select * from mqtt_msg where topic = ${topic}"

priv/config_i18n.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
"en": "Topic filters for topics to persist, separated by commas",
1616
"zh": "要持久化的主题过滤器,用逗号分隔"
1717
},
18+
"$mysql_init_default_schema_label": {
19+
"en": "Initialize Default Schema",
20+
"zh": "初始化默认架构"
21+
},
22+
"$mysql_init_default_schema_desc": {
23+
"en": "Initialize the default schema for the database",
24+
"zh": "初始化数据库的默认架构"
25+
},
1826
"$mysql_insert_message_sql_label": {
1927
"en": "MySQL Insert Message SQL",
2028
"zh": "MySQL 插入消息 SQL"

priv/config_schema.avsc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@
1919
"description": "$mysql_enable_desc"
2020
}
2121
},
22+
{
23+
"name": "init_default_schema",
24+
"type": "boolean",
25+
"default": false,
26+
"$ui": {
27+
"component": "switch",
28+
"required": true,
29+
"label": "$mysql_init_default_schema_label",
30+
"description": "$mysql_init_default_schema_desc"
31+
}
32+
},
2233
{
2334
"name": "insert_message_sql",
2435
"type": "string",

src/emqx_omp_mysql.erl

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,33 +23,32 @@
2323
]).
2424

2525
-define(RESOURCE_ID, <<"omp_mysql">>).
26+
-define(RESOURCE_ID_INIT, <<"omp_mysql_init">>).
2627
-define(RESOURCE_GROUP, <<"omp">>).
2728
-define(TIMEOUT, 1000).
2829

29-
% -define(INIT_SQL, [
30-
% """
31-
% CREATE TABLE IF NOT EXISTS `mqtt_msg` (
32-
% `id` bigint unsigned NOT NULL AUTO_INCREMENT,
33-
% `msgid` varchar(64) DEFAULT NULL,
34-
% `topic` varchar(180) NOT NULL,
35-
% `sender` varchar(64) DEFAULT NULL,
36-
% `qos` tinyint(1) NOT NULL DEFAULT '0',
37-
% `retain` tinyint(1) DEFAULT NULL,
38-
% `payload` blob,
39-
% `arrived` datetime NOT NULL,
40-
% PRIMARY KEY (`id`),
41-
% INDEX topic_index(`topic`)
42-
% ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
43-
% """,
44-
% """
45-
% CREATE TABLE IF NOT EXISTS `mqtt_sub` (
46-
% `clientid` varchar(64) NOT NULL,
47-
% `topic` varchar(180) NOT NULL,
48-
% `qos` tinyint(1) NOT NULL DEFAULT '0',
49-
% PRIMARY KEY (`clientid`, `topic`)
50-
% ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
51-
% """
52-
% ]).
30+
-define(INIT_SQL, [
31+
<<"CREATE TABLE IF NOT EXISTS `mqtt_msg` ("
32+
"`id` bigint unsigned NOT NULL AUTO_INCREMENT,"
33+
"`msgid` varchar(64) DEFAULT NULL,"
34+
"`topic` varchar(180) NOT NULL,"
35+
"`sender` varchar(64) DEFAULT NULL,"
36+
"`qos` tinyint(1) NOT NULL DEFAULT '0',"
37+
"`retain` tinyint(1) DEFAULT NULL,"
38+
"`payload` blob,"
39+
"`arrived` datetime NOT NULL,"
40+
"PRIMARY KEY (`id`),"
41+
"INDEX topic_index(`topic`)"
42+
")"
43+
"ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;">>,
44+
45+
<<"CREATE TABLE IF NOT EXISTS `mqtt_sub` ("
46+
"`clientid` varchar(64) NOT NULL,"
47+
"`topic` varchar(180) NOT NULL,"
48+
"`qos` tinyint(1) NOT NULL DEFAULT '0',"
49+
"PRIMARY KEY (`clientid`, `topic`)"
50+
") ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;">>
51+
]).
5352

5453
-type statement() :: emqx_template_sql:statement().
5554
-type param_template() :: emqx_template_sql:row_template().
@@ -90,9 +89,11 @@ stop() ->
9089

9190
-spec start(map()) -> ok.
9291
start(ConfigRaw) ->
92+
ok = init_default_schema(ConfigRaw),
9393
{MysqlConfig, ResourceOpts} = make_mysql_resource_config(ConfigRaw),
9494
ok = start_resource(MysqlConfig, ResourceOpts),
9595

96+
9697
Statements = parse_statements(
9798
[delete_message_sql, select_message_sql, insert_subscription_sql, select_subscriptions_sql],
9899
ConfigRaw
@@ -392,6 +393,30 @@ make_mysql_resource_config(#{<<"insert_message_sql">> := InsertMessageStatement}
392393

393394
{MysqlConfig, ResourceOpts}.
394395

396+
init_default_schema(ConfigRaw) ->
397+
{MysqlConfig0, ResourceOpts} = make_mysql_resource_config(ConfigRaw),
398+
MysqlConfig = MysqlConfig0#{prepare_statement => #{}},
399+
{ok, _} = emqx_resource:create_local(
400+
?RESOURCE_ID_INIT,
401+
?RESOURCE_GROUP,
402+
emqx_bridge_mysql_connector,
403+
MysqlConfig,
404+
ResourceOpts
405+
),
406+
ok = lists:foreach(fun(Sql) ->
407+
case emqx_resource:simple_sync_query(?RESOURCE_ID_INIT, {sql, Sql, [], ?TIMEOUT}) of
408+
{error, Reason} ->
409+
?SLOG(error, #{
410+
msg => "omp_mysql_init_default_schema_error",
411+
sql => Sql,
412+
reason => Reason
413+
});
414+
_ ->
415+
ok
416+
end
417+
end, ?INIT_SQL),
418+
ok = emqx_resource:remove_local(?RESOURCE_ID_INIT).
419+
395420
sync_query(Sql, Params) ->
396421
emqx_resource:simple_sync_query(?RESOURCE_ID, {sql, Sql, Params, ?TIMEOUT}).
397422

src/emqx_omp_redis.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ on_message_publish(Message, #{message_ttl := TTL, topic_filters := TopicFilters}
252252
[<<"EXPIRE">>, msg_table(Context, MsgIDb62), TTL],
253253
[<<"ZREMRANGEBYSCORE">>, msg_table(Context, Topic), 2, Now]
254254
],
255-
case emqx_resource:query(?RESOURCE_ID, {cmds, Cmds}) of
255+
case sync_cmds(Cmds) of
256256
{ok, _} ->
257257
ok;
258258
{error, Reason} ->
@@ -276,7 +276,7 @@ on_message_acked(
276276
[<<"DEL">>, msg_table(Context, MsgIDb62)],
277277
[<<"ZREM">>, msg_table(Context, Topic), MsgIDb62]
278278
],
279-
case emqx_resource:simple_sync_query(?RESOURCE_ID, {cmds, Cmds}) of
279+
case sync_cmds(Cmds) of
280280
{ok, _} ->
281281
emqx_metrics_worker:inc(?METRICS_WORKER, message_acked, success);
282282
{error, Reason} ->

test/emqx_omp_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ plugin_config() ->
291291
username => <<"emqx">>,
292292
pool_size => 8,
293293
database => <<"emqx">>,
294+
init_default_schema => true,
294295
select_message_sql => <<"select * from mqtt_msg where topic = ${topic}">>,
295296
delete_message_sql => <<"delete from mqtt_msg where msgid = ${id}">>,
296297
insert_message_sql => <<

0 commit comments

Comments
 (0)