From 0d920a39b154561c7398348c1a142e23f973e030 Mon Sep 17 00:00:00 2001 From: Adam Ullman Date: Fri, 4 Sep 2020 16:21:47 +1000 Subject: [PATCH] Retain retained messages and send them to new subscriptions --- src/mqtt-pubsub.ts | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/mqtt-pubsub.ts b/src/mqtt-pubsub.ts index 1153d6a..8603325 100644 --- a/src/mqtt-pubsub.ts +++ b/src/mqtt-pubsub.ts @@ -1,5 +1,5 @@ import { PubSubEngine } from 'graphql-subscriptions/dist/pubsub-engine'; -import { connect, Client, ISubscriptionGrant, IClientPublishOptions, IClientSubscribeOptions } from 'mqtt'; +import { connect, Client, ISubscriptionGrant, IClientPublishOptions, IClientSubscribeOptions, IPublishPacket } from 'mqtt'; import { PubSubAsyncIterator } from './pubsub-async-iterator'; export interface PubSubMQTTOptions { @@ -22,6 +22,7 @@ export class MQTTPubSub implements PubSubEngine { private mqttConnection: Client; private subscriptionMap: { [subId: number]: [string, Function] }; private subsRefsMap: { [trigger: string]: Array }; + private retainedMessagesMap: { [trigger: string]: any }; private currentSubscriptionId: number; private parseMessageWithEncoding: string; @@ -70,6 +71,7 @@ export class MQTTPubSub implements PubSubEngine { this.subscriptionMap = {}; this.subsRefsMap = {}; + this.retainedMessagesMap = {}; this.currentSubscriptionId = 0; this.onMQTTSubscribe = options.onMQTTSubscribe || (() => null); this.publishOptionsResolver = options.publishOptions || (() => Promise.resolve({} as IClientPublishOptions)); @@ -95,6 +97,13 @@ export class MQTTPubSub implements PubSubEngine { if (refs && refs.length > 0) { const newRefs = [...refs, id]; this.subsRefsMap[triggerName] = newRefs; + // If we have a retained message then send it to this new subscription + const retainedMessage = this.retainedMessagesMap[triggerName]; + if (retainedMessage) { + setTimeout(() => { + onMessage(retainedMessage); + }, 0); + } return Promise.resolve(id); } else { @@ -136,6 +145,9 @@ export class MQTTPubSub implements PubSubEngine { if (refs.length === 1) { this.mqttConnection.unsubscribe(triggerName); newRefs = []; + // Delete the retained messages for this subscription + // we will get sent any retained messages when we resubscribe + delete this.retainedMessagesMap[triggerName]; } else { const index = refs.indexOf(subId); @@ -152,7 +164,7 @@ export class MQTTPubSub implements PubSubEngine { return new PubSubAsyncIterator(this, triggers); } - private onMessage(topic: string, message: Buffer) { + private onMessage(topic: string, message: Buffer, packet: IPublishPacket) { const subscribers = [].concat( ...Object.keys(this.subsRefsMap) .filter((key) => MQTTPubSub.matches(key, topic)) @@ -175,6 +187,10 @@ export class MQTTPubSub implements PubSubEngine { const listener = this.subscriptionMap[subId][1]; listener(parsedMessage); } + if (packet.retain) { + // Retain the message if it is meant to be retained + this.retainedMessagesMap[topic] = parsedMessage + } } }