Skip to content

Commit c287bcb

Browse files
committed
fix: fix message persistence logic
1 parent e217597 commit c287bcb

File tree

1 file changed

+24
-10
lines changed

1 file changed

+24
-10
lines changed

src/emqx_omp_redis.erl

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,23 +156,22 @@ fetch_and_deliver_messages(Topic, Context) ->
156156
})
157157
end.
158158

159-
fetch_message_ids(Topic, Context) ->
159+
fetch_message_ids(Topic, #{message_ttl := TTL} = Context) ->
160160
MsgTab = msg_table(Context, Topic),
161161
case sync_cmd([<<"ZRANGE">>, MsgTab, 0, -1, <<"WITHSCORES">>]) of
162162
{ok, RawMsgIds} ->
163163
?SLOG(debug, #{
164164
msg => "omp_redis_fetch_message_ids",
165165
raw_msg_ids => RawMsgIds
166166
}),
167-
MsgIdsWithExpireTS = parse_msg_ids(RawMsgIds),
167+
MsgIdsWithCreatedTS = parse_msg_ids(RawMsgIds),
168168
Now = erlang:system_time(millisecond),
169-
%% Don't know what MsgId =:= <<"1">> is for.
170-
%% The logic is kept from v4
169+
Deadline = Now - erlang:convert_time_unit(TTL, second, millisecond),
171170
%% NOTE
172171
%% The MsgIds here are base62 encoded
173172
MsgIds = [
174173
MsgId
175-
|| {MsgId, ExpireTS} <- MsgIdsWithExpireTS, ExpireTS > Now orelse MsgId =:= <<"1">>
174+
|| {MsgId, CreatedTS} <- MsgIdsWithCreatedTS, CreatedTS > Deadline
176175
],
177176
?SLOG(debug, #{
178177
msg => "omp_redis_fetch_message_ids_parsed",
@@ -184,10 +183,18 @@ fetch_message_ids(Topic, Context) ->
184183
end.
185184

186185
parse_msg_ids([MsgId, TS | KVs]) ->
187-
[{MsgId, binary_to_integer(TS)} | parse_msg_ids(KVs)];
186+
[{MsgId, parse_zscore(TS)} | parse_msg_ids(KVs)];
188187
parse_msg_ids([]) ->
189188
[].
190189

190+
parse_zscore(ZScore) when is_binary(ZScore) ->
191+
try
192+
binary_to_float(ZScore)
193+
catch
194+
error:badarg ->
195+
binary_to_integer(ZScore)
196+
end.
197+
191198
fetch_messages(MsgIds, Context) ->
192199
fetch_messages(MsgIds, [], [], Context).
193200

@@ -264,14 +271,21 @@ on_message_publish(Message, #{message_ttl := TTL, topic_filters := TopicFilters}
264271
true ->
265272
Topic = emqx_message:topic(Message),
266273
MsgId = emqx_message:id(Message),
267-
Now = erlang:system_time(millisecond),
268-
ExpireTime = Now + erlang:convert_time_unit(TTL, second, millisecond),
274+
Now =
275+
erlang:system_time(native) /
276+
erlang:convert_time_unit(1, millisecond, native),
277+
Deadline = Now - erlang:convert_time_unit(TTL, second, millisecond),
269278
MsgIDb62 = to_b62(MsgId),
270279
Cmds = [
271280
[<<"HMSET">>, msg_table(Context, MsgIDb62)] ++ message_to_hash(Message),
272-
[<<"ZADD">>, msg_table(Context, Topic), ExpireTime, MsgIDb62],
281+
[<<"ZADD">>, msg_table(Context, Topic), float_to_binary(Now), MsgIDb62],
273282
[<<"EXPIRE">>, msg_table(Context, MsgIDb62), TTL],
274-
[<<"ZREMRANGEBYSCORE">>, msg_table(Context, Topic), 2, Now]
283+
[
284+
<<"ZREMRANGEBYSCORE">>,
285+
msg_table(Context, Topic),
286+
<<"-inf">>,
287+
float_to_binary(Deadline)
288+
]
275289
],
276290
case sync_cmds(Cmds) of
277291
{ok, _} ->

0 commit comments

Comments
 (0)