Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,67 @@ import { $build, Strophe } from 'strophe.js';
*
* @class streamManagement
*/
Strophe.addConnectionPlugin('streamManagement', {
interface IStreamManagementPlugin {
logging: boolean;
autoSendCountOnEveryIncomingStanza: boolean;
requestResponseInterval: number;
_c: any;
_NS: string;
_isStreamManagementEnabled: boolean;
_serverProcesssedStanzasCounter: number | null;
_clientProcessedStanzasCounter: number | null;
_clientSentStanzasCounter: number | null;
_originalXMLOutput: ((elem: Element) => any) | null;
_requestHandler: any;
_incomingHandler: any;
_requestResponseIntervalCount: number;
_isSupported: boolean;
_unacknowledgedStanzas: Element[];
_acknowledgedStanzaListeners: ((stanza: Element) => void)[];
_resumeToken?: string;
_connectionStatus?: number;
_resuming?: boolean;
_connectArgs?: IArguments;
_originalConnect?: (...args: any[]) => any;
_originalOnStreamFeaturesAfterSASL?: (...args: any[]) => any;
_originalDoDisconnect?: (...args: any[]) => any;
_originalDisconnect?: (...args: any[]) => any;
_resumeState?: any;
_storedJid?: string;
_ackHandler?: any;
_enabledHandler?: any;
_resumeFailedHandler?: any;
_resumedHandler?: any;

addAcknowledgedStanzaListener(listener: (stanza: Element) => void): void;
enable(resume: boolean): void;
getResumeToken(): string | undefined;
isSupported(): boolean;
resume(): void;
requestAcknowledgement(): void;
getOutgoingCounter(): number | null;
getIncomingCounter(): number | null;
init(conn: any): void;
statusChanged(status: number): void;
xmlOutput(elem: Element): any;
_interceptDisconnect(): void;
_interceptDoDisconnect(): void;
_interceptConnectArgs(): void;
_onStreamFeaturesAfterSASL(elem: Element): any;
_incomingStanzaHandler(elem: Element): boolean;
_handleEnabled(elem: Element): boolean;
_handleResumeFailed(elem: Element): boolean;
_handleResumed(elem: Element): boolean;
_handleAcknowledgedStanzas(reportedHandledCount: number, lastKnownHandledCount: number): void;
_handleServerRequestHandler(): boolean;
_handleServerAck(elem: Element): boolean;
_answerProcessedStanzas(): void;
_increaseSentStanzasCounter(elem: Element): void;
_increaseReceivedStanzasCounter(): void;
_throwError(msg: string): never;
}

const streamManagement: IStreamManagementPlugin = {

/**
* @property {Boolean} logging: Set to true to enable logging regarding out of sync stanzas.
Expand Down Expand Up @@ -117,11 +177,11 @@ Strophe.addConnectionPlugin('streamManagement', {
*/
_acknowledgedStanzaListeners: [],

addAcknowledgedStanzaListener: function(listener) {
addAcknowledgedStanzaListener: function(listener: (stanza: Element) => void): void {
this._acknowledgedStanzaListeners.push(listener);
},

enable: function(resume) {
enable: function(resume: boolean): void {
if (!this._isSupported) {
throw new Error('The server doesn\'t support urn:xmpp:sm:3 namespace');
} else if (this._connectionStatus !== Strophe.Status.CONNECTED) {
Expand All @@ -132,15 +192,15 @@ Strophe.addConnectionPlugin('streamManagement', {
this._c.pause();
},

getResumeToken: function() {
getResumeToken: function(): Optional<string> {
return this._resumeToken;
},

isSupported() {
isSupported(): boolean {
return this._isSupported;
},

resume: function() {
resume: function(): void {
if (!this.getResumeToken()) {
throw new Error('No resume token');
}
Expand All @@ -151,26 +211,26 @@ Strophe.addConnectionPlugin('streamManagement', {
this._c.options.explicitResourceBinding = true;
this._resuming = true;

this._originalConnect.apply(this._c, this._connectArgs);
this._originalConnect!.apply(this._c, this._connectArgs);
},

requestAcknowledgement: function() {
requestAcknowledgement: function(): void {
if (this._connectionStatus !== Strophe.Status.CONNECTED) {
throw new Error('requestAcknowledgement() can only be called in the CONNECTED state');
}
this._requestResponseIntervalCount = 0;
this._c.send($build('r', { xmlns: this._NS }));
},

getOutgoingCounter: function() {
getOutgoingCounter: function(): Nullable<number> {
return this._clientSentStanzasCounter;
},

getIncomingCounter: function() {
getIncomingCounter: function(): Nullable<number> {
return this._clientProcessedStanzasCounter;
},

init: function(conn) {
init: function(conn: any): void {
this._c = conn;
Strophe.addNamespace('SM', this._NS);

Expand All @@ -191,12 +251,12 @@ Strophe.addConnectionPlugin('streamManagement', {
this._c.disconnect = this._interceptDisconnect.bind(this);
},

_interceptDisconnect: function() {
_interceptDisconnect: function(): void {
this._resumeToken = undefined;
this._originalDisconnect.apply(this._c, arguments);
this._originalDisconnect!.apply(this._c, arguments);
},

_interceptDoDisconnect: function() {
_interceptDoDisconnect: function(): void {
if (this.getResumeToken()
&& !this._resuming
&& this._c.connected && !this._c.disconnecting) {
Expand All @@ -217,22 +277,22 @@ Strophe.addConnectionPlugin('streamManagement', {
// as they would interfere with the resume flow. They will be resent anyway.
this._c._data = [];

this._originalDoDisconnect.apply(this._c, arguments);
this._originalDoDisconnect!.apply(this._c, arguments);
},

_interceptConnectArgs: function() {
_interceptConnectArgs: function(): void {
this._connectArgs = arguments;

this._originalConnect.apply(this._c, arguments);
this._originalConnect!.apply(this._c, arguments);
},

_onStreamFeaturesAfterSASL: function(elem) {
_onStreamFeaturesAfterSASL: function(elem: Element): any {
this._isSupported = elem.getElementsByTagNameNS(this._NS, "sm").length > 0;

return this._originalOnStreamFeaturesAfterSASL.apply(this._c, arguments);
return this._originalOnStreamFeaturesAfterSASL!.apply(this._c, arguments);
},

statusChanged: function (status) {
statusChanged: function (status: number): void {
this._connectionStatus = status;
if (!this.getResumeToken()
&& (status === Strophe.Status.CONNECTED || status === Strophe.Status.DISCONNECTED)) {
Expand Down Expand Up @@ -309,41 +369,41 @@ Strophe.addConnectionPlugin('streamManagement', {
* @method Send
* @public
*/
xmlOutput: function(elem) {
xmlOutput: function(elem: Element): any {
if (Strophe.isTagEqual(elem, 'iq') ||
Strophe.isTagEqual(elem, 'presence') ||
Strophe.isTagEqual(elem, 'message')) {
this._increaseSentStanzasCounter(elem);
}

return this._originalXMLOutput.call(this._c, elem);
return this._originalXMLOutput!.call(this._c, elem);
},

_handleEnabled: function(elem) {
_handleEnabled: function(elem: Element): boolean {
this._isStreamManagementEnabled = true;
// FIXME fail if requested, but not enabled
this._resumeToken = elem.getAttribute('resume') === 'true' && elem.getAttribute('id');
this._resumeToken = elem.getAttribute('resume') === 'true' && elem.getAttribute('id') || undefined;

this._c.resume();

return true;
},

_handleResumeFailed: function(elem) {
_handleResumeFailed: function(elem: Element): boolean {
const error = elem && (
(elem.firstElementChild && elem.firstElementChild.tagName)
|| (elem.firstChild && elem.firstChild.tagName));
(elem.firstElementChild && (elem.firstElementChild as Element).tagName)
|| (elem.firstChild && (elem.firstChild as Element).tagName));

this._c._changeConnectStatus(Strophe.Status.ERROR, error, elem);
this._c._doDisconnect();

return true;
},

_handleResumed: function(elem) {
_handleResumed: function(elem: Element): boolean {
// FIXME check if in the correct state
var handledCount = parseInt(elem.getAttribute('h'));
this._handleAcknowledgedStanzas(handledCount, this._serverProcesssedStanzasCounter);
var handledCount = parseInt(elem.getAttribute('h')!);
this._handleAcknowledgedStanzas(handledCount, this._serverProcesssedStanzasCounter!);

this._resuming = false;
this._c.do_bind = false; // No need to bind our resource anymore
Expand All @@ -364,7 +424,7 @@ Strophe.addConnectionPlugin('streamManagement', {
return true;
},

_incomingStanzaHandler: function(elem) {
_incomingStanzaHandler: function(elem: Element): boolean {
if (Strophe.isTagEqual(elem, 'iq') || Strophe.isTagEqual(elem, 'presence') || Strophe.isTagEqual(elem, 'message')) {
this._increaseReceivedStanzasCounter();

Expand All @@ -376,7 +436,7 @@ Strophe.addConnectionPlugin('streamManagement', {
return true;
},

_handleAcknowledgedStanzas: function(reportedHandledCount, lastKnownHandledCount) {
_handleAcknowledgedStanzas: function(reportedHandledCount: number, lastKnownHandledCount: number): void {
var delta = reportedHandledCount - lastKnownHandledCount;

if (delta < 0) {
Expand All @@ -388,7 +448,7 @@ Strophe.addConnectionPlugin('streamManagement', {
}

for(var i = 0; i < delta; i++) {
var stanza = this._unacknowledgedStanzas.shift();
var stanza = this._unacknowledgedStanzas.shift()!;
for (var j = 0; j < this._acknowledgedStanzaListeners.length; j++) {
this._acknowledgedStanzaListeners[j](stanza);
}
Expand All @@ -405,34 +465,34 @@ Strophe.addConnectionPlugin('streamManagement', {
}
},

_handleServerRequestHandler: function() {
_handleServerRequestHandler: function(): boolean {
this._answerProcessedStanzas();

return true;
},

_handleServerAck: function(elem){
var handledCount = parseInt(elem.getAttribute('h'));
this._handleAcknowledgedStanzas(handledCount, this._serverProcesssedStanzasCounter);
_handleServerAck: function(elem: Element): boolean {
var handledCount = parseInt(elem.getAttribute('h')!);
this._handleAcknowledgedStanzas(handledCount, this._serverProcesssedStanzasCounter!);

return true;
},

_answerProcessedStanzas: function() {
_answerProcessedStanzas: function(): void {
if (this._isStreamManagementEnabled) {
this._c.send($build('a', { xmlns: this._NS, h: this._clientProcessedStanzasCounter }));
}
},

_increaseSentStanzasCounter: function(elem) {
_increaseSentStanzasCounter: function(elem: Element): void {
if (this._isStreamManagementEnabled) {
if (this._unacknowledgedStanzas.indexOf(elem) !== -1) {

return;
}

this._unacknowledgedStanzas.push(elem);
this._clientSentStanzasCounter++;
this._clientSentStanzasCounter!++;

if (this.requestResponseInterval > 0) {
this._requestResponseIntervalCount++;
Expand All @@ -449,15 +509,17 @@ Strophe.addConnectionPlugin('streamManagement', {
}
},

_increaseReceivedStanzasCounter: function() {
_increaseReceivedStanzasCounter: function(): void {
if (this._isStreamManagementEnabled) {
this._clientProcessedStanzasCounter++;
this._clientProcessedStanzasCounter!++;
}
},

_throwError: function(msg) {
_throwError: function(msg: string): never {
Strophe.error(msg);
throw new Error(msg);
}

});
};

Strophe.addConnectionPlugin('streamManagement', streamManagement);