diff --git a/config.js b/config.js index c1518ee9..f2474511 100644 --- a/config.js +++ b/config.js @@ -181,6 +181,14 @@ */ lmstEpoch: Date.UTC(2020, 2, 18, 0, 0, 0), + /* + * subscriptionMCWSFilterDelay: delay in milliseconds for combining filters for the same subscription + * endpoint connection. Smaller value = quicker display of realtime data (ex, 10ms in a + * low latency environment), higher value = avoids potentially creating and subsequently tearing down new websocket connections if filter changes are happening faster than server response times + * (ex, 100ms+ in a high latency environment) + */ + subscriptionMCWSFilterDelay: 100, + /** * timeSystems: specify the time systems to use. * Options are 'scet', 'ert', 'sclk', 'msl.sol' and 'lmst'. @@ -193,83 +201,137 @@ * * key property is required and other options are optional * timeSystem: - * * key: string, required + * * key: string, required. Time system. Options are 'scet', 'ert', 'sclk', 'msl.sol' and 'lmst'. * * limit: number, optional - maximum duration between start and end bounds allow - * * presets: array, optional - preset bounds for convenience - * * * preset: - * * * * label: string, descriptive label for preset + * * modeSettings: object, optional - presets for convenience. + * * * fixed: object, optional - valid objects are bounds objects and presets array. + * * * realtime: object, optional - valid objects are clockOffsets and presets array. + * * * lad:object, optional - valid objects are clockoffsets. + * * * * + * * * * Optional objects: * * * * bounds: start and end bounds for preset as numbers - * * * * * * * * start and end can be declared as a number or a function returning a number - * + * * * * * * * * start: and end: can be declared as a number or a function returning a number + * * * * presets: array of objects consisting of: + * * * * * bounds: - required. + * * * * * label: - required, string + * * * * clockOffsets: object, optional. Start and end relative to active clock. + * * * * start: and end: numbers relative to active clock's 0. Start is negative, end is positive. * *advanced** example configuration below * timeSystems: [ - { - key:'scet', - presets: [ - { - label: 'Last 2 hours', - bounds: { - start: Date.now() - 1000 * 60 * 60 * 2, - end: Date.now() - } + { + key:'scet', + modeSettings:{ + fixed:{ + bounds:{ + // 1 day ago + start: new Date( + Date.UTC( + new Date().getUTCFullYear(), + new Date().getUTCMonth(), + new Date().getUTCDate() + ) - 1 * 864e5 + ).getTime(), + end: new Date( + Date.UTC( + new Date().getUTCFullYear(), + new Date().getUTCMonth(), + new Date().getUTCDate() + ) + 864e5 - 1 + ).getTime() }, - { - label: 'Last 1 hour', - bounds: { - start: Date.now() - 1000 * 60 * 60, - end: Date.now() - } + presets:[ + { + label: 'Last 2 hours (SCET Recorded)', + bounds: { + start: () => Date.now() - 1000 * 60 * 60 * 2, + end: () => Date.now() } - ], - limit: 1000 * 60 * 60 * 6 - }, - { - key:'ert', - presets: [ - { - label: 'Last 2 hours', - bounds: { - start: Date.now() - 1000 * 60 * 60 * 2, - end: Date.now() - } - }, - { - label: 'Last 1 hour', - bounds: { - start: Date.now() - 1000 * 60 * 60, - end: Date.now() - } + }, + ] + }, + realtime:{ + clockOffsets:{ + start: -60 * 60 * 1000, + end: 5 * 60 * 1000 + }, + presets:[ + { + label: 'Last 2 hours (SCET Realtime)', + bounds: { + start: -60 * 60 * 1000 * 2, + end: 5 * 60 * 1000 } - ], - limit: 1000 * 60 * 60 * 6 + } + ] + }, + lad:{ + clockOffsets:{ + start: -60 * 60 * 1000, + end: 5 * 60 * 1000 + }, + }, + }, + limit: 1000 * 60 * 60 * 60 + }, + { + key:'ert', + modeSettings:{ + fixed:{ + bounds:{ + // 1 day ago + start: new Date( + Date.UTC( + new Date().getUTCFullYear(), + new Date().getUTCMonth(), + new Date().getUTCDate() + ) - 1 * 864e5 + ).getTime(), + // today + end: new Date( + Date.UTC( + new Date().getUTCFullYear(), + new Date().getUTCMonth(), + new Date().getUTCDate() + ) + 864e5 - 1 + ).getTime() }, - { - key:'sclk', - presets: [ - { - label: 'Last 2 hours', - bounds: { - start: Date.now() - 1000 * 60 * 60 * 2, - end: Date.now() - } - }, - { - label: 'Last 1 hour', - bounds: { - start: Date.now() - 1000 * 60 * 60, - end: Date.now() - } - } - ], - limit: 1000 / 5 * 60 * 60 * 6 + presets:[ + { + label: 'Last 2 hours (ERT Recorded)', + bounds: { + start: Date.now() - 1000 * 60 * 60 * 2, + end: Date.now() + } + }, + ] }, - { - key:'lmst', - presets: [] - } - ], - */ + realtime:{ + clockOffsets:{ + start: -60 * 60 * 1000, + end: 5 * 60 * 1000 + }, + presets:[ + { + label: 'Last 2 hours (ERT Realtime)', + bounds: { + start: -60 * 60 * 1000 * 2, + end: 5 * 60 * 1000 + } + } + ] + }, + lad:{ + clockOffsets:{ + start: -60 * 60 * 1000, + end: 5 * 60 * 1000 + }, + }, + }, + limit: 1000 * 60 * 60 * 60 + } + ], + */ /** * allowRealtime: whether or not to allow utc-relative time conductor. diff --git a/package.json b/package.json index d9284339..765d1b4a 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,7 @@ "mini-css-extract-plugin": "2.7.6", "moment": "2.30.1", "node-bourbon": "^4.2.3", - "openmct": "nasa/openmct#omm-r5.3.0-rc3", + "openmct": "nasa/openmct#omm-r5.3.1", "prettier": "3.4.2", "printj": "1.3.1", "raw-loader": "^0.5.1", diff --git a/src/AMMOSPlugins.js b/src/AMMOSPlugins.js index 8339a3bf..76a4d105 100644 --- a/src/AMMOSPlugins.js +++ b/src/AMMOSPlugins.js @@ -95,7 +95,7 @@ define([ openmct.install(RealtimeSessions.default()); openmct.install(new HistoricalTelemetryPlugin(options)); - openmct.install(new RealtimeTelemetryPlugin(vistaTime, options)); + openmct.install(new RealtimeTelemetryPlugin.default(vistaTime, options)); openmct.install(new TypePlugin.default()); openmct.install(new TaxonomyPlugin(options.taxonomy)); openmct.install(new LinkPlugin(options)); diff --git a/src/realtime/MCWSAlarmMessageStreamProvider.js b/src/realtime/MCWSAlarmMessageStreamProvider.js index a5184e4e..90783538 100644 --- a/src/realtime/MCWSAlarmMessageStreamProvider.js +++ b/src/realtime/MCWSAlarmMessageStreamProvider.js @@ -1,35 +1,23 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; - /** - * Provides real-time streaming DataProduct data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} - */ - var MCWSAlarmMessageStreamProvider = MCWSStreamProvider.extend({ - constructor: function (openmct, vistaTime) { - MCWSStreamProvider.call(this, openmct, vistaTime); - } - }); - - MCWSAlarmMessageStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.alarmMessageStreamUrl; - }; +/** + * Provides real-time streaming DataProduct data. + * @memberof {vista/telemetry} + */ +class MCWSAlarmMessageStreamProvider extends MCWSStreamProvider { + getUrl(domainObject) { + return domainObject.telemetry?.alarmMessageStreamUrl; + } - MCWSAlarmMessageStreamProvider.prototype.getKey = function (domainObject) { + getKey(domainObject) { return domainObject.telemetry.key; - }; + } - MCWSAlarmMessageStreamProvider.prototype.getProperty = function (domainObject) { + getProperty(domainObject) { return domainObject.telemetry.property; - }; - - MCWSAlarmMessageStreamProvider.prototype.notifyWorker = function (key, value) { - MCWSStreamProvider.prototype.notifyWorker.call(this, key, value); - }; + } - MCWSAlarmMessageStreamProvider.prototype.subscribe = function (domainObject, callback, options) { + subscribe(domainObject, callback, options) { let { telemetry: { alarmLevel = 'any' } = {} } = domainObject; alarmLevel = alarmLevel.toUpperCase(); let objects = [ @@ -68,14 +56,12 @@ define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { object.telemetry.values = domainObject.telemetry.values; }); - let unsubscribers = objects.map((object) => - MCWSStreamProvider.prototype.subscribe.call(this, object, callback, options) - ); + let unsubscribers = objects.map((object) => super.subscribe(object, callback, options)); return () => { unsubscribers.forEach((unsubscribe) => unsubscribe()); }; - }; + } +} - return MCWSAlarmMessageStreamProvider; -}); +export default MCWSAlarmMessageStreamProvider; diff --git a/src/realtime/MCWSChannelStreamProvider.js b/src/realtime/MCWSChannelStreamProvider.js index 27838ba1..45738ac2 100644 --- a/src/realtime/MCWSChannelStreamProvider.js +++ b/src/realtime/MCWSChannelStreamProvider.js @@ -1,25 +1,21 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; - /** - * Provides real-time streaming channel data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} - */ - var MCWSChannelStreamProvider = MCWSStreamProvider.extend({}); +/** + * Provides real-time streaming channel data. + * @memberof {vista/telemetry} + */ +class MCWSChannelStreamProvider extends MCWSStreamProvider { + getUrl(domainObject) { + return domainObject.telemetry?.channelStreamUrl; + } - MCWSChannelStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.channelStreamUrl; - }; - - MCWSChannelStreamProvider.prototype.getKey = function (domainObject) { + getKey(domainObject) { return domainObject.telemetry.channel_id; - }; + } - MCWSChannelStreamProvider.prototype.getProperty = function () { + getProperty() { return 'channel_id'; - }; + } +} - return MCWSChannelStreamProvider; -}); +export default MCWSChannelStreamProvider; diff --git a/src/realtime/MCWSCommandStreamProvider.js b/src/realtime/MCWSCommandStreamProvider.js index 7ba58038..0451e611 100644 --- a/src/realtime/MCWSCommandStreamProvider.js +++ b/src/realtime/MCWSCommandStreamProvider.js @@ -1,28 +1,24 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; - /** - * Provides real-time streaming CommandEvent data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} - */ - var MCWSCommandStreamProvider = MCWSStreamProvider.extend({}); +/** + * Provides real-time streaming CommandEvent data. + * @memberof {vista/telemetry} + */ +class MCWSCommandStreamProvider extends MCWSStreamProvider { + getUrl(domainObject) { + return domainObject.telemetry?.commandEventStreamUrl; + } - MCWSCommandStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.commandEventStreamUrl; - }; - - MCWSCommandStreamProvider.prototype.getKey = function (domainObject) { + getKey() { // We return undefined so that we can match on undefined properties. return undefined; - }; + } - MCWSCommandStreamProvider.prototype.getProperty = function () { + getProperty() { // We just want something that returns undefined so it matches the // key above. Hacky. return 'some_undefined_property'; - }; + } +} - return MCWSCommandStreamProvider; -}); +export default MCWSCommandStreamProvider; diff --git a/src/realtime/MCWSDataProductStreamProvider.js b/src/realtime/MCWSDataProductStreamProvider.js index a54b1766..9cd6df7e 100644 --- a/src/realtime/MCWSDataProductStreamProvider.js +++ b/src/realtime/MCWSDataProductStreamProvider.js @@ -1,35 +1,26 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; - - /** - * Provides real-time streaming DataProduct data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} - */ - var MCWSDataProductStreamProvider = MCWSStreamProvider.extend({ - constructor: function (openmct, vistaTime, options) { - this.options = options; - MCWSStreamProvider.call(this, openmct, vistaTime); - } - }); - - MCWSDataProductStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.dataProductStreamUrl; - }; - - MCWSDataProductStreamProvider.prototype.getKey = function (domainObject) { +import MCWSStreamProvider from './MCWSStreamProvider'; + +/** + * Provides real-time streaming DataProduct data. + * @memberof {vista/telemetry} + */ +class MCWSDataProductStreamProvider extends MCWSStreamProvider { + getUrl(domainObject) { + return domainObject.telemetry?.dataProductStreamUrl; + } + + getKey() { // We return undefined so that we can match on undefined properties. return undefined; - }; + } - MCWSDataProductStreamProvider.prototype.getProperty = function () { + getProperty() { // We just want something that returns undefined so it matches the // key above. Hacky. return 'some_undefined_property'; - }; + } - MCWSDataProductStreamProvider.prototype.subscribe = function (domainObject, callback, options) { + subscribe(domainObject, callback, options) { function wrappedCallback(datum) { let sessionId = datum.session_id; @@ -47,22 +38,17 @@ define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { callback(datum); } - return MCWSStreamProvider.prototype.subscribe.call( - this, - domainObject, - wrappedCallback, - options - ); - }; + return super.subscribe(domainObject, wrappedCallback, options); + } - MCWSDataProductStreamProvider.prototype.notifyWorker = function (key, value) { + notifyWorker(key, value) { if (key === 'subscribe' && this.options.realtimeProductAPIDs && value.mcwsVersion === 3.2) { value.extraFilterTerms = { apid: '(' + this.options.realtimeProductAPIDs.join(',') + ')' }; } - MCWSStreamProvider.prototype.notifyWorker.call(this, key, value); - }; + super.notifyWorker(key, value); + } +} - return MCWSDataProductStreamProvider; -}); +export default MCWSDataProductStreamProvider; diff --git a/src/realtime/MCWSEVRLevelStreamProvider.js b/src/realtime/MCWSEVRLevelStreamProvider.js index 50971f12..36ae75a1 100644 --- a/src/realtime/MCWSEVRLevelStreamProvider.js +++ b/src/realtime/MCWSEVRLevelStreamProvider.js @@ -1,27 +1,38 @@ -define(['./MCWSStreamProvider', 'lodash'], function (MCWSStreamProvider, _) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; +/** + * Provides real-time streaming EVR data by level. + * @memberof {vista/telemetry} + */ +class MCWSEVRLevelStreamProvider extends MCWSStreamProvider { /** - * Provides real-time streaming EVR data by level. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} + * Get the URL for streaming data for this domain object + * @param {Object} domainObject The domain object + * @returns {String} The URL to use for streaming */ - var MCWSEVRLevelStreamProvider = MCWSStreamProvider.extend({}); - - MCWSEVRLevelStreamProvider.prototype.getUrl = function (domainObject) { - if (domainObject.telemetry && domainObject.telemetry.level) { + getUrl(domainObject) { + if (domainObject.telemetry?.evrStreamUrl && domainObject.telemetry?.level) { return domainObject.telemetry.evrStreamUrl; } - }; + } - MCWSEVRLevelStreamProvider.prototype.getProperty = function (domainObject) { + /** + * Get the property to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The property name + */ + getProperty() { return 'level'; - }; + } - MCWSEVRLevelStreamProvider.prototype.getKey = function (domainObject) { + /** + * Get the key to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The key + */ + getKey(domainObject) { return domainObject.telemetry.level; - }; + } +} - return MCWSEVRLevelStreamProvider; -}); +export default MCWSEVRLevelStreamProvider; diff --git a/src/realtime/MCWSEVRStreamProvider.js b/src/realtime/MCWSEVRStreamProvider.js index 26f524cc..dc18827e 100644 --- a/src/realtime/MCWSEVRStreamProvider.js +++ b/src/realtime/MCWSEVRStreamProvider.js @@ -1,25 +1,36 @@ -define(['./MCWSStreamProvider', 'lodash'], function (MCWSStreamProvider, _) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; +/** + * Provides real-time streaming EVR data. + * @memberof {vista/telemetry} + */ +class MCWSEVRStreamProvider extends MCWSStreamProvider { /** - * Provides real-time streaming EVR data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} + * Get the URL for streaming data for this domain object + * @param {Object} domainObject The domain object + * @returns {String} The URL to use for streaming */ - var MCWSEVRStreamProvider = MCWSStreamProvider.extend({}); - - MCWSEVRStreamProvider.prototype.getUrl = function (domainObject) { - if (domainObject.telemetry && !domainObject.telemetry.level) { + getUrl(domainObject) { + if (domainObject.telemetry?.evrStreamUrl && !domainObject.telemetry?.level) { return domainObject.telemetry.evrStreamUrl; } - }; + } - MCWSEVRStreamProvider.prototype.getProperty = function (domainObject) { + /** + * Get the property to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The property name + */ + getProperty() { return 'module'; - }; + } - MCWSEVRStreamProvider.prototype.getKey = function (domainObject) { + /** + * Get the key to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The key + */ + getKey(domainObject) { // Can subscribe only by EVR module even if subscribing by EVR let module = domainObject.telemetry && @@ -37,48 +48,57 @@ define(['./MCWSStreamProvider', 'lodash'], function (MCWSStreamProvider, _) { } return module; - }; + } - MCWSEVRStreamProvider.prototype.subscribe = function (domainObject, callback, options) { + /** + * Subscribe to real-time updates for this domain object + * @param {Object} domainObject The domain object + * @param {Function} callback The callback to invoke with new data + * @param {Object} options Additional options + * @returns {Function} A function that will unsubscribe when called + */ + subscribe(domainObject, callback, options) { // EVR Source subscription if (domainObject.telemetry.modules) { return this.multiSubscribe(domainObject, callback, options); } if (domainObject.telemetry.evr_name) { - var wrappedCallback = function (value) { + const wrappedCallback = (value) => { if (value.name === domainObject.telemetry.evr_name) { callback(value); } }; - return MCWSStreamProvider.prototype.subscribe.call( - this, - domainObject, - wrappedCallback, - options - ); + return super.subscribe(domainObject, wrappedCallback, options); } - return MCWSStreamProvider.prototype.subscribe.call(this, domainObject, callback, options); - }; + return super.subscribe(domainObject, callback, options); + } - MCWSEVRStreamProvider.prototype.multiSubscribe = function (domainObject, callback, options) { - var unsubscribes = domainObject.telemetry.modules.map(function (module) { - var moduleObject = { + /** + * Subscribe to multiple modules + * @param {Object} domainObject The domain object + * @param {Function} callback The callback to invoke with new data + * @param {Object} options Additional options + * @returns {Function} A function that will unsubscribe when called + */ + multiSubscribe(domainObject, callback, options) { + const unsubscribes = domainObject.telemetry.modules.map((module) => { + const moduleObject = { telemetry: { evrStreamUrl: domainObject.telemetry.evrStreamUrl, module: module } }; - return MCWSStreamProvider.prototype.subscribe.call(this, moduleObject, callback, options); - }, this); + return super.subscribe(moduleObject, callback, options); + }); - return function () { - unsubscribes.forEach(function (unsubscribe) { + return () => { + unsubscribes.forEach((unsubscribe) => { unsubscribe(); }); }; - }; + } +} - return MCWSEVRStreamProvider; -}); +export default MCWSEVRStreamProvider; diff --git a/src/realtime/MCWSFrameEventStreamProvider.js b/src/realtime/MCWSFrameEventStreamProvider.js index 52e8deba..40e3f3fe 100644 --- a/src/realtime/MCWSFrameEventStreamProvider.js +++ b/src/realtime/MCWSFrameEventStreamProvider.js @@ -1,42 +1,45 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; +/** + * Provides real-time streaming DataProduct data. + * @memberof {vista/telemetry} + */ +class MCWSFrameEventStreamProvider extends MCWSStreamProvider { /** - * Provides real-time streaming DataProduct data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} + * Get the URL for streaming data for this domain object + * @param {Object} domainObject The domain object + * @returns {String} The URL to use for streaming */ - var MCWSFrameEventStreamProvider = MCWSStreamProvider.extend({ - constructor: function (openmct, vistaTime) { - MCWSStreamProvider.call(this, openmct, vistaTime); - } - }); - - MCWSFrameEventStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.frameEventStreamUrl; - }; + getUrl(domainObject) { + return domainObject.telemetry?.frameEventStreamUrl; + } - MCWSFrameEventStreamProvider.prototype.getKey = function (domainObject) { + /** + * Get the key to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The key + */ + getKey(domainObject) { if (domainObject.type === 'vista.frame-event-filter') { - var frameEventType = domainObject.identifier.key.split(':')[0]; + const frameEventType = domainObject.identifier.key.split(':')[0]; return frameEventType; } else { return undefined; } - }; + } - MCWSFrameEventStreamProvider.prototype.getProperty = function (domainObject) { + /** + * Get the property to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The property name + */ + getProperty(domainObject) { if (domainObject.type === 'vista.frame-event-filter') { return 'message_type'; } else { return 'some_undefined_property'; } - }; - - MCWSFrameEventStreamProvider.prototype.notifyWorker = function (key, value) { - MCWSStreamProvider.prototype.notifyWorker.call(this, key, value); - }; + } +} - return MCWSFrameEventStreamProvider; -}); +export default MCWSFrameEventStreamProvider; diff --git a/src/realtime/MCWSFrameSummaryStreamProvider.js b/src/realtime/MCWSFrameSummaryStreamProvider.js index 3e3f9b76..9ed50bba 100644 --- a/src/realtime/MCWSFrameSummaryStreamProvider.js +++ b/src/realtime/MCWSFrameSummaryStreamProvider.js @@ -1,36 +1,38 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; +/** + * Provides real-time streaming DataProduct data. + * @memberof {vista/telemetry} + */ +class MCWSFrameSummaryStreamProvider extends MCWSStreamProvider { /** - * Provides real-time streaming DataProduct data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} + * Get the URL for streaming data for this domain object + * @param {Object} domainObject The domain object + * @returns {String} The URL to use for streaming */ - var MCWSFrameSummaryStreamProvider = MCWSStreamProvider.extend({ - constructor: function (openmct, vistaTime) { - MCWSStreamProvider.call(this, openmct, vistaTime); - } - }); + getUrl(domainObject) { + return domainObject.telemetry?.frameSummaryStreamUrl; + } - MCWSFrameSummaryStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.frameSummaryStreamUrl; - }; - - MCWSFrameSummaryStreamProvider.prototype.getKey = function (domainObject) { + /** + * Get the key to use for this stream + * @param {Object} domainObject The domain object + * @returns {undefined} Always returns undefined to match on undefined properties + */ + getKey() { // We return undefined so that we can match on undefined properties. return undefined; - }; + } - MCWSFrameSummaryStreamProvider.prototype.getProperty = function () { + /** + * Get the property to use for this stream + * @returns {String} The property name + */ + getProperty() { // We just want something that returns undefined so it matches the - // key above. Hacky. + // key above. Hacky. return 'some_undefined_property'; - }; - - MCWSFrameSummaryStreamProvider.prototype.notifyWorker = function (key, value) { - MCWSStreamProvider.prototype.notifyWorker.call(this, key, value); - }; + } +} - return MCWSFrameSummaryStreamProvider; -}); +export default MCWSFrameSummaryStreamProvider; diff --git a/src/realtime/MCWSMessageStreamProvider.js b/src/realtime/MCWSMessageStreamProvider.js index 0bcb3aa8..3f28f446 100644 --- a/src/realtime/MCWSMessageStreamProvider.js +++ b/src/realtime/MCWSMessageStreamProvider.js @@ -1,20 +1,19 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; +/** + * Provides real-time streaming DataProduct data. + * @memberof {vista/telemetry} + */ +class MCWSMessageStreamProvider extends MCWSStreamProvider { /** - * Provides real-time streaming DataProduct data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} + * Subscribe to real-time updates for this domain object + * @param {Object} domainObject The domain object + * @param {Function} callback The callback to invoke with new data + * @param {Object} options Additional options + * @returns {Function} A function that will unsubscribe when called */ - var MCWSMessageStreamProvider = MCWSStreamProvider.extend({ - constructor: function (openmct, vistaTime) { - MCWSStreamProvider.call(this, openmct, vistaTime); - } - }); - - MCWSMessageStreamProvider.prototype.subscribe = function (domainObject, callback, options) { - var messageType = domainObject.identifier.key.split(':')[0]; + subscribe(domainObject, callback, options) { + const messageType = domainObject.identifier.key.split(':')[0]; options.filters = options.filters || {}; if (messageType === 'CommandMessages') { @@ -32,33 +31,44 @@ define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { }; } - return MCWSStreamProvider.prototype.subscribe.call(this, domainObject, callback, options); - }; + return super.subscribe(domainObject, callback, options); + } - MCWSMessageStreamProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.messageStreamUrl; - }; + /** + * Get the URL for streaming data for this domain object + * @param {Object} domainObject The domain object + * @returns {String} The URL to use for streaming + */ + getUrl(domainObject) { + return domainObject.telemetry?.messageStreamUrl; + } - MCWSMessageStreamProvider.prototype.getKey = function (domainObject) { + /** + * Get the key to use for this stream + * @param {Object} domainObject The domain object + * @returns {String|undefined} The key + */ + getKey(domainObject) { //if it is the mock domainObject used in ClearDataOnMessage.js if (domainObject.identifier.key === 'message::clear-data-static-object') { return domainObject.telemetry.key; } else { return undefined; } - }; + } - MCWSMessageStreamProvider.prototype.getProperty = function (domainObject) { + /** + * Get the property to use for this stream + * @param {Object} domainObject The domain object + * @returns {String} The property name + */ + getProperty(domainObject) { if (domainObject.identifier.key === 'message::clear-data-static-object') { return domainObject.telemetry.property; } else { return 'some_undefined_property'; } - }; - - MCWSMessageStreamProvider.prototype.notifyWorker = function (key, value) { - MCWSStreamProvider.prototype.notifyWorker.call(this, key, value); - }; + } +} - return MCWSMessageStreamProvider; -}); +export default MCWSMessageStreamProvider; diff --git a/src/realtime/MCWSPacketSummaryEventProvider.js b/src/realtime/MCWSPacketSummaryEventProvider.js index 908f437c..581acc94 100644 --- a/src/realtime/MCWSPacketSummaryEventProvider.js +++ b/src/realtime/MCWSPacketSummaryEventProvider.js @@ -1,28 +1,38 @@ -define(['./MCWSStreamProvider'], function (MCWSStreamProvider) { - 'use strict'; +import MCWSStreamProvider from './MCWSStreamProvider'; +/** + * Provides real-time streaming PacketSummaryEvent data. + * @memberof {vista/telemetry} + */ +class MCWSPacketSummaryEventProvider extends MCWSStreamProvider { /** - * Provides real-time streaming PacketSummaryEvent data. - * @constructor - * @augments {MCWSStreamProvider} - * @memberof {vista/telemetry} + * Get the URL for streaming data for this domain object + * @param {Object} domainObject The domain object + * @returns {String} The URL to use for streaming */ - var MCWSPacketSummaryEventProvider = MCWSStreamProvider.extend({}); + getUrl(domainObject) { + return domainObject.telemetry?.packetSummaryEventStreamUrl; + } - MCWSPacketSummaryEventProvider.prototype.getUrl = function (domainObject) { - return domainObject.telemetry && domainObject.telemetry.packetSummaryEventStreamUrl; - }; - - MCWSPacketSummaryEventProvider.prototype.getKey = function (domainObject) { + /** + * Get the key to use for this stream + * @param {Object} domainObject The domain object + * @returns {undefined} Always returns undefined to match on undefined properties + */ + getKey() { // We return undefined so that we can match on undefined properties. return undefined; - }; + } - MCWSPacketSummaryEventProvider.prototype.getProperty = function () { + /** + * Get the property to use for this stream + * @returns {String} The property name + */ + getProperty() { // We just want something that returns undefined so it matches the // key above. Hacky. return 'some_undefined_property'; - }; + } +} - return MCWSPacketSummaryEventProvider; -}); +export default MCWSPacketSummaryEventProvider; diff --git a/src/realtime/MCWSStreamProvider.js b/src/realtime/MCWSStreamProvider.js index da3ddcab..4067dec6 100644 --- a/src/realtime/MCWSStreamProvider.js +++ b/src/realtime/MCWSStreamProvider.js @@ -1,48 +1,39 @@ -define([ - '../lib/extend', - 'lodash', - './MCWSStreamWorker', - 'services/session/SessionService', - 'services/filtering/FilterService', - 'services/globalStaleness/globalStaleness' -], function ( - extend, - _, - runMCWSStreamWorker, - sessionServiceDefault, - filterServiceDefault, - GlobalStaleness -) { - 'use strict'; - - /** - * Provides real-time streaming telemetry for channels/EVRs with an - * associated WebSocket URL. Uses user selection from `sessionService` - * in order to filter down to an appropriate topic. - * - * @param {vista/sessions.SessionService} sessions service providing - * information about user-selected topics/sessions - * @constructor - * @implements {TelemetryService} - * @memberof vista/telemetry - */ - function MCWSStreamProvider(openmct, vistaTime) { +import runMCWSStreamWorker from './MCWSStreamWorker'; +import sessionService from 'services/session/SessionService'; +import filterService from 'services/filtering/FilterService'; +import GlobalStaleness from 'services/globalStaleness/globalStaleness'; + +/** + * Provides real-time streaming telemetry for channels/EVRs with an + * associated WebSocket URL. Uses user selection from `sessionService` + * in order to filter down to an appropriate topic. + * + * @param {vista/sessions.SessionService} sessions service providing + * information about user-selected topics/sessions + * @constructor + * @implements {TelemetryService} + * @memberof vista/telemetry + */ + +class MCWSStreamProvider { + constructor(openmct, vistaTime, options) { this.openmct = openmct; this.vistaTime = function () { return vistaTime; }; + this.options = options; - this.sessions = sessionServiceDefault.default(); - this.filterService = filterServiceDefault.default(); + this.sessions = sessionService(); + this.filterService = filterService(); this.subscriptions = {}; this.requests = {}; - } - MCWSStreamProvider.extend = extend; + this.subscriptionMCWSFilterDelay = options?.time?.subscriptionMCWSFilterDelay; + } - MCWSStreamProvider.prototype.processGlobalStaleness = function (data, latestTimestamp) { - const globalStaleness = GlobalStaleness.default(); + processGlobalStaleness(data, latestTimestamp) { + const globalStaleness = GlobalStaleness(); if (globalStaleness === null) { return; @@ -64,40 +55,42 @@ define([ } globalStaleness.updateLatestTimestamp(latestTimestamp); - }; + } - MCWSStreamProvider.prototype.onmessage = function (message) { - var data = message.data; - var url = data.url; - var key = data.key; - var values = data.values; - var subscriptions = (this.subscriptions[url] || {})[key] || []; - var timestamp = Date.now(); + onmessage(message) { + const data = message.data; + const { url, key, values } = data; + const subscriptions = (this.subscriptions[url] ?? {})[key] ?? []; + const timestamp = Date.now(); - this.processGlobalStaleness(values || [], timestamp); + this.processGlobalStaleness(values ?? [], timestamp); subscriptions.forEach(function (subscription) { + // ticks the clock for ert, scet, and sclk if they are present this.vistaTime().update(values[0]); values.forEach(subscription.callback); }, this); //Communicate websocket timeout and errors to users if (data.onclose && data.code === 1006) { + const message = `Real-time data connection lost - data may not be displayed as expected. Code: 1006`; + + this.openmct.notifications.error(message); + console.error(message); + } else if (data.onerror) { this.openmct.notifications.error( - 'Real-time data connection lost - data may not be displayed as expected.' + `Websocket Error for ${url}?${data.query}, please see console for details` ); - console.error(`Real-time data connection lost - data may not be displayed as expected.`); - } else if (data.onerror) { - this.openmct.notifications.error('Websocket Error, please see console for details'); - console.error(`Websocket Error - Code:${data.code}, Error:${data.reason}`); + console.error(`Websocket Error - Code: ${data.code}, Error: ${data.reason}`); } - }; + } - MCWSStreamProvider.prototype.worker = function () { - const worker = runMCWSStreamWorker.default(); + worker() { + const worker = runMCWSStreamWorker(); worker.onmessage = this.onmessage.bind(this); + // cache worker this.worker = function () { return worker; }; @@ -124,7 +117,7 @@ define([ } return worker; - }; + } /** * Post a message to the associated worker. @@ -132,9 +125,24 @@ define([ * @param {string} value data associated with the message * @private */ - MCWSStreamProvider.prototype.notifyWorker = function (key, value) { - this.worker().postMessage({ key: key, value: value }); - }; + notifyWorker(key, value) { + this.worker().postMessage({ key, value }); + } + + /** + * Initialize the subscription for a given URL and key. + * @param {string} url the URL to initialize + * @param {string} key the key to initialize + * @private + */ + initializeSubscription(url, key) { + if (!Object.hasOwn(this.subscriptions, url)) { + this.subscriptions[url] = {}; + } + if (!Object.hasOwn(this.subscriptions[url], key)) { + this.subscriptions[url][key] = []; + } + } /** * Add a callback function associated with a specific domain object. @@ -142,18 +150,17 @@ define([ * @param {Function} callback the callback to add * @private */ - MCWSStreamProvider.prototype.addCallback = function (domainObject, callback) { - var url = this.getUrl(domainObject), - key = this.getKey(domainObject), - subscriptions = this.subscriptions; - - subscriptions[url] = subscriptions[url] || {}; - subscriptions[url][key] = subscriptions[url][key] || []; - subscriptions[url][key].push({ - callback: callback, - domainObject: domainObject + addCallback(domainObject, callback) { + const url = this.getUrl(domainObject); + const key = this.getKey(domainObject); + + this.initializeSubscription(url, key); + + this.subscriptions[url][key].push({ + callback, + domainObject }); - }; + } /** * Remove a callback function associated with a specific domain object. @@ -161,30 +168,44 @@ define([ * @param {Function} callback the callback to remove * @private */ - MCWSStreamProvider.prototype.removeCallback = function (domainObject, callback) { - var url = this.getUrl(domainObject), - key = this.getKey(domainObject), - subscriptions = this.subscriptions; - - subscriptions[url] = subscriptions[url] || {}; - subscriptions[url][key] = subscriptions[url][key] || []; - subscriptions[url][key] = subscriptions[url][key].filter(function (c) { - return c.callback !== callback; - }); + removeCallback(domainObject, callback) { + const url = this.getUrl(domainObject); + const key = this.getKey(domainObject); + + this.initializeSubscription(url, key); - if (subscriptions[url][key].length < 1) { - delete subscriptions[url][key]; - if (Object.keys(subscriptions[url]).length < 1) { - delete subscriptions[url]; + this.subscriptions[url][key] = this.subscriptions[url][key].filter( + (c) => c.callback !== callback + ); + + if (this.subscriptions[url][key].length < 1) { + delete this.subscriptions[url][key]; + + if (Object.keys(this.subscriptions[url]).length < 1) { + delete this.subscriptions[url]; } } - }; + } - MCWSStreamProvider.prototype.supportsSubscribe = function (domainObject) { - return !!this.getUrl(domainObject); - }; + /** + * Check if the provider supports subscribing to a domain object. + * @param {DomainObject} domainObject the requested object + * @returns {boolean} true if the provider supports subscribing, false otherwise + * @private + */ + supportsSubscribe(domainObject) { + return Boolean(this.getUrl(domainObject)); + } - MCWSStreamProvider.prototype.subscribe = function (domainObject, callback, options) { + /** + * Subscribe to a domain object. + * @param {DomainObject} domainObject the requested object + * @param {Function} callback the callback to add + * @param {Object} options additional options + * @returns {Function} a function to unsubscribe + * @private + */ + subscribe(domainObject, callback, options) { if (options) { options = { ...options }; if (options.filters) { @@ -198,7 +219,8 @@ define([ key: this.getKey(domainObject), property: this.getProperty(domainObject), mcwsVersion: domainObject.telemetry.mcwsVersion, - extraFilterTerms: options && options.filters && this.serializeFilters(options.filters) + extraFilterTerms: options?.filters ? this.serializeFilters(options.filters) : undefined, + subscriptionMCWSFilterDelay: this.subscriptionMCWSFilterDelay }; function unsubscribe() { @@ -214,10 +236,17 @@ define([ this.addCallback(domainObject, callback); this.notifyWorker('subscribe', message); - return _.bind(unsubscribe, this); - }; + return unsubscribe.bind(this); + } - MCWSStreamProvider.prototype.removeFiltersIfAllSelected = function (domainObject, filters) { + /** + * Remove filters if all selected. + * @param {DomainObject} domainObject the requested object + * @param {Object} filters the filters to remove + * @returns {Object} the updated filters + * @private + */ + removeFiltersIfAllSelected(domainObject, filters) { let valuesWithFilters = this.openmct.telemetry .getMetadata(domainObject) .values() @@ -244,9 +273,9 @@ define([ }); return filters; - }; + } - MCWSStreamProvider.prototype.serializeFilters = function (filters) { + serializeFilters(filters) { let attributeKeys = Object.keys(filters); let keysToFilterStringsMap = attributeKeys.reduce((extraFilterTerms, attributeKey) => { let filtersForAttribute = filters[attributeKey]; @@ -268,7 +297,7 @@ define([ return extraFilterTerms; }, {}); return keysToFilterStringsMap; - }; + } /** * Get the WebSocket URL for streaming data associated with this request. @@ -277,9 +306,9 @@ define([ * @param {DomainObject} domainObject the requested object * @returns {string} the WebSocket URL */ - MCWSStreamProvider.prototype.getUrl = function (domainObject) { + getUrl(domainObject) { throw new Error('getUrl not implemented.'); - }; + } /** * Get a key which identifies this request (relative to other requests @@ -288,9 +317,9 @@ define([ * @param {DomainObject} domainObject the requested object * @returns {string} the key */ - MCWSStreamProvider.prototype.getKey = function (domainObject) { + getKey(domainObject) { throw new Error('getKey not implemented.'); - }; + } /** * Get the name of the property of telemetry data points which will @@ -299,9 +328,9 @@ define([ * @private * @returns {string} the property name */ - MCWSStreamProvider.prototype.getProperty = function (domainObject) { + getProperty(domainObject) { throw new Error('getProperty not implemented.'); - }; + } +} - return MCWSStreamProvider; -}); +export default MCWSStreamProvider; diff --git a/src/realtime/MCWSStreamWorkerScript.js b/src/realtime/MCWSStreamWorkerScript.js index c82f7604..7824a279 100644 --- a/src/realtime/MCWSStreamWorkerScript.js +++ b/src/realtime/MCWSStreamWorkerScript.js @@ -1,8 +1,17 @@ (function (self, WebSocket) { 'use strict'; - var worker; + let worker; + /** + * Represents a subscription to streaming channel data for + * a specific EVR or Channel. + * @typedef MCWSStreamSubscription + * @property {string} url the WebSocket URL to request data from + * @property {string} key the identifier for the specific Channel or EVR + * @property {string} property the name of the property to use when + * filtering, and when looking up keys from data points + */ /** * Represents a subscription to streaming channel data for * a specific EVR or Channel. @@ -18,195 +27,224 @@ * streaming channel data. Post messages from the worker thread * as data arrives. Recreates the underlying WebSocket * as necessary when query parameters change. - * @param {string} url the WebSocket URL - * @param {string} property the property to filter on - * @param {Object} topic metadata about the topic to listen on - * @constructor - * @private - */ - function MCWSConnection(url, property, topic, extraFilterTerms, globalFilters) { - this.url = url; - this.topic = topic; - this.subscribers = {}; - this.property = property; - this.extraFilterTerms = extraFilterTerms; - this.globalFilters = globalFilters; - } - - /** - * Notify the connection of a new subscription to the specified channel. - * MCWSConnection keeps a count of active subscriptions in order to - * adjust filtering parameters as necessary. - * @param {string} key the channel or module to subscribe to - * @private */ - MCWSConnection.prototype.subscribe = function (key) { - this.subscribers[key] = (this.subscribers[key] || 0) + 1; - if (this.subscribers[key] === 1) { - this.scheduleReconnect(); + class MCWSConnection { + /** + * @param {string} url the WebSocket URL + * @param {string} property the property to filter on + * @param {Object} topic metadata about the topic to listen on + * @param {Object} extraFilterTerms additional filter terms + * @param {Object} globalFilters global filters to apply + */ + constructor( + url, + property, + topic, + extraFilterTerms, + globalFilters, + subscriptionMCWSFilterDelay + ) { + this.url = url; + this.topic = topic; + this.subscribers = {}; + this.property = property; + this.extraFilterTerms = extraFilterTerms; + this.globalFilters = globalFilters; + this.subscriptionMCWSFilterDelay = subscriptionMCWSFilterDelay ?? 100; } - }; - /** - * Notify the connection that a subscription to the specified channel - * has ended. - * MCWSConnection keeps a count of active subscriptions in order to - * adjust filtering parameters as necessary. - * @param {string} key the channel or module to unsubscribe to - * @private - */ - MCWSConnection.prototype.unsubscribe = function (key) { - this.subscribers[key] = (this.subscribers[key] || 0) - 1; - if (this.subscribers[key] < 1) { - delete this.subscribers[key]; - this.scheduleReconnect(); + /** + * Notify the connection of a new subscription to the specified endpoint. + * MCWSConnection keeps a count of active subscriptions in order to + * adjust filtering parameters as necessary. + * @param {string} key the endpoint to subscribe to + * @private + */ + subscribe(key) { + this.subscribers[key] = (this.subscribers[key] ?? 0) + 1; + + if (this.subscribers[key] === 1) { + this.scheduleReconnect(); + } } - }; - /** - * Construct a query string for this connection's current topic, session - * and subscription state. - * @returns {string} the query string - * @private - */ - MCWSConnection.prototype.query = function () { - var filter = { - session_id: this.topic && this.topic.number, - topic: this.topic && this.topic.topic - }; - - if (this.property !== 'some_undefined_property') { - filter[this.property] = '(' + Object.keys(this.subscribers).join(',') + ')'; + /** + * Notify the connection that a subscription to the specified endpoint + * has ended. + * MCWSConnection keeps a count of active subscriptions in order to + * adjust filtering parameters as necessary. + * @param {string} key the endpoint to unsubscribe to + * @private + */ + unsubscribe(key) { + this.subscribers[key] = (this.subscribers[key] ?? 0) - 1; + + if (this.subscribers[key] < 1) { + delete this.subscribers[key]; + this.scheduleReconnect(); + } } - if (this.extraFilterTerms) { - Object.keys(this.extraFilterTerms).forEach(function (k) { - filter[k] = this.extraFilterTerms[k]; - }, this); - } + /** + * Construct a query string for this connection's current topic, session + * and subscription state. + * @returns {string} the query string + * @private + */ + getQueryString() { + const filter = { + session_id: this.topic?.number, + topic: this.topic?.topic + }; + + if (this.property !== 'some_undefined_property') { + filter[this.property] = `(${Object.keys(this.subscribers).join(',')})`; + } - if (this.globalFilters) { - Object.entries(this.globalFilters).forEach(([key, value]) => { - if (filter[key]) { - console.warn(`Global filter not applied for existing persisted filter for ${key}.`); - } else { - filter[key] = value; - } - }); - } + if (this.extraFilterTerms) { + Object.keys(this.extraFilterTerms).forEach((key) => { + filter[key] = this.extraFilterTerms[key]; + }); + } - return ( - 'filter=(' + - Object.keys(filter) - .filter(function (key) { - return !!filter[key]; - }) - .map(function (key) { - return key + '=' + filter[key]; - }) - .join(',') + - ')' - ); - }; + if (this.globalFilters) { + Object.entries(this.globalFilters).forEach(([key, value]) => { + if (filter[key]) { + console.warn(`Global filter not applied for existing persisted filter for ${key}.`); + } else { + filter[key] = value; + } + }); + } - /** - * Close any active WebSocket associated with this connection. - * @private - */ - MCWSConnection.prototype.destroy = function () { - if (this.socket) { - this.socket.close(); - delete this.socket; + return `filter=(${Object.keys(filter) + .filter((key) => Boolean(filter[key])) + .map((key) => `${key}=${filter[key]}`) + .join(',')})`; } - }; - - /** - * Set the topic for the active session. - * @param {Object} topic metadata for the selected topic, as provided - * by MCWS - * @private - */ - MCWSConnection.prototype.setTopic = function (topic) { - this.topic = topic; - this.scheduleReconnect(); - }; - MCWSConnection.prototype.setGlobalFilters = function (filters) { - this.globalFilters = filters; - this.scheduleReconnect(); - }; + /** + * Set the topic for the active session. + * @param {Object} topic metadata for the selected topic, as provided + * by MCWS + * @private + */ + setTopic(topic) { + this.topic = topic; + this.scheduleReconnect(); + } - MCWSConnection.prototype.scheduleReconnect = function () { - if (this.pending) { - clearTimeout(this.pending); + /** + * Set the global filters for the active session. + * @param {Object} filters metadata for the selected filters, as provided + * by MCWS + * @private + */ + setGlobalFilters(filters) { + this.globalFilters = filters; + this.scheduleReconnect(); } - this.pending = setTimeout( - function () { - this.pending = undefined; - this.reconnect(); - }.bind(this), - 10 - ); - }; - /** - * Reestablish the connection to the WebSocket (typically called because - * filtering parameters have changed.) - * @private - */ - MCWSConnection.prototype.reconnect = function () { - var oldSocket = this.socket, - url = this.url, - subscribers = this.subscribers, - property = this.property; - - if (Object.keys(subscribers).length < 1 || !this.topic) { - if (oldSocket) { - oldSocket.close(); - delete this.socket; + /** + * Schedule a reconnection to the WebSocket. + * @private + */ + scheduleReconnect() { + if (this.pending) { + clearTimeout(this.pending); } - return; - } - this.socket = new WebSocket(this.url + '?' + this.query()); + this.pending = setTimeout(() => { + this.pending = undefined; + this.reconnect(); + }, this.subscriptionMCWSFilterDelay); + } - this.socket.onopen = function () { - if (oldSocket) { - oldSocket.close(); - } - }; - - this.socket.onmessage = function (message) { - var data = JSON.parse(message.data); - - data.forEach(function (datum) { - var key = datum[property]; - if (subscribers[key] > 0) { - self.postMessage({ - url: url, - key: key, - values: [datum] - }); + /** + * Reestablish the connection to the WebSocket (typically called because + * filtering parameters have changed.) + * @private + */ + reconnect() { + let oldSocket = this.socket; + const { url, subscribers, property } = this; + + // no subscribers or no topic close existing socket + // suppress errors as they are not useful + if (Object.keys(subscribers).length < 1 || !this.topic) { + if (oldSocket) { + try { + oldSocket.onclose = null; + oldSocket.onerror = null; + oldSocket.close(); + // eslint-disable-next-line no-unused-vars + } catch (e) { + // Suppress errors + } + + oldSocket = undefined; + this.socket = undefined; } - }); - }; - this.socket.onclose = function (message) { - self.postMessage({ - onclose: true, - code: message.code, - reason: message.reason - }); - }; + return; + } - this.socket.onerror = function (error) { - self.postMessage({ - onerror: true, - code: error.code, - reason: error.reason - }); - }; - }; + // Create a new WebSocket connection with the updated query parameters + this.socket = new WebSocket(`${this.url}?${this.getQueryString()}`); + + // close old socket in new socket open to ensure + // no data is lost + this.socket.onopen = async () => { + if (oldSocket) { + try { + oldSocket.onclose = null; + oldSocket.onerror = null; + oldSocket.close(); + // eslint-disable-next-line no-unused-vars + } catch (e) { + // Suppress errors + } + oldSocket = undefined; + } + }; + + this.socket.onmessage = (message) => { + const data = JSON.parse(message.data); + + data.forEach((datum) => { + const key = datum[property]; + + if (subscribers[key] > 0) { + self.postMessage({ + url: url, + key: key, + values: [datum] + }); + } + }); + }; + + this.socket.onclose = (message) => { + self.postMessage({ + onclose: true, + code: message.code, + reason: message.reason + }); + }; + + this.socket.onerror = (error) => { + self.postMessage({ + onerror: true, + url: this.url, + query: this.getQueryString(), + code: error.code ?? 'unavailable', + reason: + error.reason ?? + 'WebSocket error occurred, but browser did not provide detailed error information' + }); + }; + } + } /** * Manages connections for streaming channel data on a background. @@ -216,110 +254,91 @@ * Methods may be invoked by posting a message to the worker * with an object containing `key` and `value` properties, where * `key` is the method name and `value` is the argument to provide. - * @constructor */ - function MCWSStreamWorker() { - this.connections = {}; - } + class MCWSStreamWorker { + constructor() { + this.connections = {}; + } - /** - * Release all active WebSocket connections. - */ - MCWSStreamWorker.prototype.reset = function () { - Object.keys(this.connections).forEach( - function (url) { - this.connections[url].destroy(); - delete this.connections[url]; - }.bind(this) - ); - delete this.activeTopic; - delete this.activeGlobalFilters; - }; + /** + * Add a new active subscription. + * @param {MCWSStreamSubscription} subscription the subscription to obtain + */ + subscribe(subscription) { + const { url, key, property, extraFilterTerms, subscriptionMCWSFilterDelay } = subscription; + const cacheKey = this.generateCacheKey(url, property, extraFilterTerms); + + if (!this.connections[cacheKey]) { + this.connections[cacheKey] = new MCWSConnection( + url, + property, + this.activeTopic, + extraFilterTerms, + this.activeGlobalFilters, + subscriptionMCWSFilterDelay + ); + } - /** - * Add a new active subscription. - * @param {MCWSStreamSubscription} subscription the subscription to obtain - */ - MCWSStreamWorker.prototype.subscribe = function (subscription) { - const url = subscription.url; - const key = subscription.key; - const property = subscription.property; - const extraFilterTerms = subscription.extraFilterTerms; - const cacheKey = this.generateCacheKey(url, property, extraFilterTerms); - - if (!this.connections[cacheKey]) { - this.connections[cacheKey] = new MCWSConnection( - url, - property, - this.activeTopic, - extraFilterTerms, - this.activeGlobalFilters - ); + this.connections[cacheKey].subscribe(key); } - this.connections[cacheKey].subscribe(key); - }; + generateCacheKey(url, property, extraFilterTerms) { + let filterComponent = + extraFilterTerms && + Object.keys(extraFilterTerms) + .sort() + .map((filterKey) => `${filterKey}=${extraFilterTerms[filterKey]}`) + .join('&'); + let cacheKey = `${url}__${property}`; + + if (filterComponent?.length > 0) { + cacheKey += `__${filterComponent}`; + } - MCWSStreamWorker.prototype.generateCacheKey = function (url, property, extraFilterTerms) { - let filterComponent = - extraFilterTerms && - Object.keys(extraFilterTerms) - .sort() - .map((filterKey) => filterKey + '=' + extraFilterTerms[filterKey]) - .join('&'); - let cacheKey = url + '__' + property; - if (filterComponent && filterComponent.length > 0) { - cacheKey += '__' + filterComponent; + return cacheKey; } - return cacheKey; - }; - /** - * Add a new active subscription. - * @param {MCWSStreamSubscription} subscription the subscription to release - */ - MCWSStreamWorker.prototype.unsubscribe = function (subscription) { - var url = subscription.url, - key = subscription.key, - property = subscription.property, - extraFilterTerms = subscription.extraFilterTerms, - cacheKey = this.generateCacheKey(url, property, extraFilterTerms); - - if (this.connections[cacheKey]) { - this.connections[cacheKey].unsubscribe(key); + /** + * Add a new active subscription. + * @param {MCWSStreamSubscription} subscription the subscription to release + */ + unsubscribe(subscription) { + const { url, key, property, extraFilterTerms } = subscription; + const cacheKey = this.generateCacheKey(url, property, extraFilterTerms); + + if (this.connections[cacheKey]) { + this.connections[cacheKey].unsubscribe(key); + } } - }; - /** - * Change the current topic selection. - * @param {Object} topic metadata about the selected topic - */ - MCWSStreamWorker.prototype.topic = function (topic) { - this.activeTopic = topic; - Object.keys(this.connections).forEach( - function (cacheKey) { + /** + * Change the current topic selection. + * @param {Object} topic metadata about the selected topic + */ + topic(topic) { + this.activeTopic = topic; + Object.keys(this.connections).forEach((cacheKey) => { this.connections[cacheKey].setTopic(topic); - }.bind(this) - ); - }; + }); + } - /** - * Change the global filters. - * @param {Object} filters metadata about the filters - */ - MCWSStreamWorker.prototype.globalFilters = function (filters) { - this.activeGlobalFilters = filters; - Object.keys(this.connections).forEach( - function (cacheKey) { + /** + * Change the global filters. + * @param {Object} filters metadata about the filters + */ + globalFilters(filters) { + this.activeGlobalFilters = filters; + Object.keys(this.connections).forEach((cacheKey) => { this.connections[cacheKey].setGlobalFilters(filters); - }.bind(this) - ); - }; + }); + } + } worker = new MCWSStreamWorker(); self.onmessage = function (messageEvent) { - var data = messageEvent.data, - method = worker[data.key]; + const data = messageEvent.data; + const method = worker[data.key]; + if (method) { method.call(worker, data.value); } diff --git a/src/realtime/plugin.js b/src/realtime/plugin.js index 208c5fcd..132fc535 100644 --- a/src/realtime/plugin.js +++ b/src/realtime/plugin.js @@ -1,44 +1,30 @@ -define([ - 'services/filtering/FilterService', - './MCWSChannelStreamProvider', - './MCWSEVRStreamProvider', - './MCWSEVRLevelStreamProvider', - './MCWSCommandStreamProvider', - './MCWSPacketSummaryEventProvider', - './MCWSDataProductStreamProvider', - './MCWSMessageStreamProvider', - './MCWSFrameSummaryStreamProvider', - './MCWSFrameEventStreamProvider', - './MCWSAlarmMessageStreamProvider' -], function ( - filterServiceDefault, - MCWSChannelStreamProvider, - MCWSEVRStreamProvider, - MCWSEVRLevelStreamProvider, - MCWSCommandStreamProvider, - MCWSPacketSummaryEventProvider, - MCWSDataProductStreamProvider, - MCWSMessageStreamProvider, - MCWSFrameSummaryStreamProvider, - MCWSFrameEventStreamProvider, - MCWSAlarmMessageStreamProvider -) { - function RealtimeTelemetryPlugin(vistaTime, options) { - return function install(openmct) { - filterServiceDefault.default(openmct, options.globalFilters); +import filterService from 'services/filtering/FilterService'; +import MCWSChannelStreamProvider from './MCWSChannelStreamProvider'; +import MCWSEVRStreamProvider from './MCWSEVRStreamProvider'; +import MCWSEVRLevelStreamProvider from './MCWSEVRLevelStreamProvider'; +import MCWSCommandStreamProvider from './MCWSCommandStreamProvider'; +import MCWSPacketSummaryEventProvider from './MCWSPacketSummaryEventProvider'; +import MCWSDataProductStreamProvider from './MCWSDataProductStreamProvider'; +import MCWSMessageStreamProvider from './MCWSMessageStreamProvider'; +import MCWSFrameSummaryStreamProvider from './MCWSFrameSummaryStreamProvider'; +import MCWSFrameEventStreamProvider from './MCWSFrameEventStreamProvider'; +import MCWSAlarmMessageStreamProvider from './MCWSAlarmMessageStreamProvider'; - openmct.telemetry.addProvider(new MCWSChannelStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSEVRStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSEVRLevelStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSCommandStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSPacketSummaryEventProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSDataProductStreamProvider(openmct, vistaTime, options)); - openmct.telemetry.addProvider(new MCWSMessageStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSFrameSummaryStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSFrameEventStreamProvider(openmct, vistaTime)); - openmct.telemetry.addProvider(new MCWSAlarmMessageStreamProvider(openmct, vistaTime)); - }; - } +function RealtimeTelemetryPlugin(vistaTime, options) { + return function install(openmct) { + filterService(openmct, options.globalFilters); - return RealtimeTelemetryPlugin; -}); + openmct.telemetry.addProvider(new MCWSChannelStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSEVRStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSEVRLevelStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSCommandStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSPacketSummaryEventProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSDataProductStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSMessageStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSFrameSummaryStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSFrameEventStreamProvider(openmct, vistaTime, options)); + openmct.telemetry.addProvider(new MCWSAlarmMessageStreamProvider(openmct, vistaTime, options)); + }; +} + +export default RealtimeTelemetryPlugin; diff --git a/src/services/mcws/MCWSClient.js b/src/services/mcws/MCWSClient.js index 08aa4db6..fb0edbac 100644 --- a/src/services/mcws/MCWSClient.js +++ b/src/services/mcws/MCWSClient.js @@ -59,7 +59,7 @@ class MCWSClient { delete options.params; } - + // Keepalive options.keepalive = true; diff --git a/src/time/plugin.js b/src/time/plugin.js index b018a7ab..29e37dd2 100644 --- a/src/time/plugin.js +++ b/src/time/plugin.js @@ -19,17 +19,15 @@ const SYSTEM_MAP = { export default function TimePlugin(options) { return function install(openmct) { - var TODAY_BOUNDS = { + const TODAY_BOUNDS = { start: moment.utc().startOf('day').valueOf(), end: moment.utc().endOf('day').valueOf() }; - - var solFormat = new MSLSolFormat(openmct); - var lmstFormat = new LMSTFormat(openmct); - var nowLST = solFormat.format(moment.utc()); - var sol = Number(/SOL-(\d+)M/.exec(nowLST)[1]); - - var BOUNDS_MAP = { + const solFormat = new MSLSolFormat(openmct); + const lmstFormat = new LMSTFormat(openmct); + const nowLST = solFormat.format(moment.utc()); + const sol = Number(/SOL-(\d+)M/.exec(nowLST)[1]); + const BOUNDS_MAP = { ert: TODAY_BOUNDS, scet: TODAY_BOUNDS, sclk: { @@ -51,7 +49,7 @@ export default function TimePlugin(options) { } if (options.lmstEpoch) { - var lmstFormatWithEpoch = new LMSTFormat(options.lmstEpoch); + const lmstFormatWithEpoch = new LMSTFormat(options.lmstEpoch); BOUNDS_MAP.lmst = { start: lmstFormatWithEpoch.parse('SOL-' + sol), @@ -61,74 +59,107 @@ export default function TimePlugin(options) { install.ladClocks = {}; install.timeSystems = options.timeSystems; + let useUTCClock = false; let menuOptions = []; options.timeSystems.forEach(function (timeSystem) { - const key = timeSystem.key || timeSystem; + const key = timeSystem.key ?? timeSystem; if (!SYSTEM_MAP[key]) { console.error('Invalid timeSystem specified: ' + key); + return; } const system = new SYSTEM_MAP[key](options.utcFormat); - openmct.time.addTimeSystem(system); - const systemOptions = { timeSystem: system.key, - bounds: BOUNDS_MAP[key] + name: 'fixed' }; - if (timeSystem.presets) { - systemOptions.presets = timeSystem.presets; + openmct.time.addTimeSystem(system); + + if (timeSystem.modeSettings?.fixed?.bounds) { + systemOptions.bounds = timeSystem.modeSettings.fixed.bounds; + } else { + systemOptions.bounds = BOUNDS_MAP[key]; } + + if (timeSystem.modeSettings?.fixed?.presets) { + systemOptions.presets = timeSystem.modeSettings.fixed.presets; + } + if (timeSystem.limit) { systemOptions.limit = timeSystem.limit; } + if (options.records) { systemOptions.records = options.records; } menuOptions.push(systemOptions); + const DEFAULT_OFFSET_CONFIG = { + start: -30 * 60 * 1000, + end: 5 * 60 * 1000 + }; + if (options.allowRealtime && system.isUTCBased) { + let offsetConfig = DEFAULT_OFFSET_CONFIG; + let presetConfig = []; + + if (timeSystem.modeSettings?.realtime?.clockOffsets) { + offsetConfig = timeSystem.modeSettings.realtime.clockOffsets; + } + + if (timeSystem.modeSettings?.realtime?.presets) { + presetConfig = timeSystem.modeSettings.realtime.presets; + } + useUTCClock = true; menuOptions.push({ + name: 'realtime', timeSystem: system.key, clock: 'utc.local', - clockOffsets: { - start: -30 * 60 * 1000, - end: 5 * 60 * 1000 - } + clockOffsets: offsetConfig, + presets: presetConfig }); } + if (options.allowRealtime && options.allowLAD) { - var ladClock = new LADClock(key); + const ladClock = new LADClock(key); + let offsetConfig = DEFAULT_OFFSET_CONFIG; + + if (timeSystem.modeSettings?.lad?.clockOffsets) { + offsetConfig = timeSystem.modeSettings.lad.clockOffsets; + } + install.ladClocks[key] = ladClock; openmct.time.addClock(ladClock); menuOptions.push({ timeSystem: system.key, clock: ladClock.key, - clockOffsets: { - start: -30 * 60 * 1000, - end: 5 * 60 * 1000 - } + clockOffsets: offsetConfig }); } }); + if (options.defaultMode) { - let matchingConfigIndex = menuOptions.findIndex( - (menuOption) => menuOption.clock === options.defaultMode + const isFixedMode = options.defaultMode === 'fixed'; + const matchingConfigIndex = menuOptions.findIndex((menuOption) => + isFixedMode ? !menuOption.clock : menuOption.clock === options.defaultMode ); if (matchingConfigIndex !== -1) { - let matchingConfig = menuOptions[matchingConfigIndex]; + const matchingConfig = menuOptions[matchingConfigIndex]; + menuOptions.splice(matchingConfigIndex, 1); menuOptions.unshift(matchingConfig); } else { - console.warn(`Default mode '${options.defaultMode}' specified in configuration could not be applied. - Are LAD or realtime enabled? Does the defaultMode contain a typo?`); + console.warn( + `Default mode '${options.defaultMode}' specified in configuration could not be applied. Are LAD or realtime enabled? Does the defaultMode contain a typo?` + ); } }