diff --git a/.gitignore b/.gitignore index 48619f4..5c96d9c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ npm-debug.log node_modules test.js rethinkdbdash.js +package-lock.json rethinkdbdash_datadir* diff --git a/lib/connection.js b/lib/connection.js index 9200422..9314c41 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -28,8 +28,9 @@ function Connection(r, options, resolve, reject) { this.r = r; this.state = 0; // Track the progress of the handshake. -1 will be used for an error state. - // Set default options - We have to save them in case the user tries to reconnect - if (!helper.isPlainObject(options)) options = {}; + // Retain `options` for reconnecting + this.options = options || (options = {}); + this.host = options.host || r._host; this.port = options.port || r._port; if (options.authKey != null) { @@ -52,6 +53,7 @@ function Connection(r, options, resolve, reject) { } this.authKey = options.authKey || r._authKey; + this.releaseFeed = options.releaseFeed || r._releaseFeed; // period in *seconds* for the connection to be opened this.timeoutConnect = options.timeout || r._timeoutConnect; // The connection will be pinged every seconds @@ -212,6 +214,9 @@ function Connection(r, options, resolve, reject) { self.connection.toJSON = function() { // We want people to be able to jsonify a cursor return '"A socket object cannot be converted to JSON due to circular references."' } + // For the pool implementation + this.node = null; + this.id = Math.random(); } util.inherits(Connection, events.EventEmitter); @@ -518,6 +523,10 @@ Connection.prototype._processResponse = function(response, token) { if (includesStates === true) { cursor.setIncludesStates(); } + if ((cursor.getType() !== 'Cursor') && (self.releaseFeed === true)) { + self.metadata[token].released = true; + self.emit('release-feed'); + } if ((self.metadata[token].options.cursor === true) || ((self.metadata[token].options.cursor === undefined) && (self.r._options.cursor === true))) { // Return a cursor if (self.metadata[token].options.profile === true) { @@ -579,7 +588,9 @@ Connection.prototype._processResponse = function(response, token) { } } else if (type === responseTypes.SUCCESS_SEQUENCE) { - self.emit('release'); + if (self.metadata[token].released === false) { + self.emit('release'); + } if (typeof self.metadata[token].resolve === 'function') { currentResolve = self.metadata[token].resolve; @@ -678,6 +689,11 @@ Connection.prototype._processResponse = function(response, token) { Connection.prototype.reconnect = function(options, callback) { var self = this; + // When `options.connection` is defined, you must create a new socket to reconnect. + if (self.options.connection) { + throw new Err.ReqlRuntimeError('Cannot call `reconnect` if `options.connection` was defined'); + } + if (typeof options === 'function') { callback = options; options = {}; @@ -688,12 +704,7 @@ Connection.prototype.reconnect = function(options, callback) { if (options.noreplyWait === true) { var p = new Promise(function(resolve, reject) { self.close(options).then(function() { - self.r.connect({ - host: self.host, - port: self.port, - authKey: self.authKey, - db: self.db - }).then(function(c) { + self.r.connect(self.options).then(function(c) { resolve(c); }).error(function(e) { reject(e); @@ -704,12 +715,7 @@ Connection.prototype.reconnect = function(options, callback) { }).nodeify(callback); } else { - return self.r.connect({ - host: self.host, - port: self.port, - authKey: self.authKey, - db: self.db - }, callback); + return self.r.connect(self.options, callback); } return p; diff --git a/lib/cursor.js b/lib/cursor.js index 3cd9c77..9db9885 100644 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -2,6 +2,7 @@ var Promise = require('bluebird'); var Err = require(__dirname+'/error.js'); var helper = require(__dirname+'/helper.js'); var EventEmitter = require('events').EventEmitter; +var $$asyncIterator = require('iterall').$$asyncIterator; var MAX_CALL_STACK = 1000; @@ -366,6 +367,37 @@ Cursor.prototype._eachCb = function(err, data) { } } +Cursor.prototype.asyncIterator = function() { + var self = this; + return { + next: function() { + var iter = this; + return self._next().then(value => { + return { + done: false, + value: value + }; + }).error(function(error) { + if ((error.message !== 'You cannot retrieve data from a cursor that is closed.') && + (error.message.match(/You cannot call `next` on a closed/) === null)) { + return iter.throw(error); + } else { + return iter.return(); + } + }); + }, + return: function() { + return Promise.resolve({ value: undefined, done: true }); + }, + throw: function(err) { + return Promise.reject(err); + }, + [$$asyncIterator]: function() { + return this; + } + +}} + var methods = [ 'addListener', 'on', diff --git a/lib/helper.js b/lib/helper.js index 06db60e..d254e94 100644 --- a/lib/helper.js +++ b/lib/helper.js @@ -2,6 +2,7 @@ var protodef = require(__dirname+'/protodef.js'); var termTypes = protodef.Term.TermType; var datumTypes = protodef.Datum.DatumType; var net = require('net'); +var luxon = require('luxon'); function createLogger(poolMaster, silent) { @@ -61,10 +62,11 @@ module.exports.loopKeys = loopKeys; function convertPseudotype(obj, options) { var reqlType = obj['$reql_type$']; - if (reqlType === 'TIME' && options['timeFormat'] !== 'raw') { + if (reqlType === 'TIME' && (options['timeFormat'] === 'native' || !options['timeFormat'])) { return new Date(obj['epoch_time'] * 1000); - } - else if (reqlType === 'GROUPED_DATA' && options['groupFormat'] !== 'raw') { + } else if (reqlType === 'TIME' && options['timeFormat'] === 'ISO8601') { + return luxon.DateTime.fromMillis(obj['epoch_time'] * 1000).setZone('UTC' + obj['timezone']).toISO() + } else if (reqlType === 'GROUPED_DATA' && options['groupFormat'] !== 'raw') { var result = []; for (var i = 0, len = obj['data'].length, ref; i < len; i++) { ref = obj.data[i]; diff --git a/lib/index.js b/lib/index.js index 1cc3e89..a5c5efe 100644 --- a/lib/index.js +++ b/lib/index.js @@ -53,6 +53,7 @@ r.prototype._user = 'admin'; r.prototype._password = ''; r.prototype._timeoutConnect = 20; // seconds r.prototype._pingInterval = -1; // seconds +r.prototype._releaseFeed = false; r.prototype._nestingLevel = 100; r.prototype._arrayLimit = 100000; diff --git a/lib/linked_list.js b/lib/linked_list.js new file mode 100644 index 0000000..d7c5514 --- /dev/null +++ b/lib/linked_list.js @@ -0,0 +1,110 @@ +function LinkedList() { + this.root = null; + this.last = null; + this.length = 0; +} + +LinkedList.prototype.getLength = function() { + return this.length; +} + +LinkedList.prototype.push = function(connection) { + var node = new Node(this, connection, this.last, null); + connection.node = node; + if (this.root === null) { + this.root = node; + this.last = node; + } + else { + this.last.next = node; + this.last = node; + } + this.length++; + // Keep a reference to the node in the connection + return node; +} + +LinkedList.prototype.unshift = function(connection) { + var node = new Node(this, connection, null, this.root); + connection.node = node; + if (this.root) { + this.root.prev = node; + } + this.root = node; + if (this.last === null) { + this.last = node; + } + this.length++; + return node; +} + +// Pop a node and return the connection (not the node) +LinkedList.prototype.pop = function() { + if (this.last === null) { + return null; + } + + var last = this.last + if (this.last.prev === null) { + // this.last is the root + this.root = null; + this.last = null; + } + else { + this.last = this.last.prev; + this.last.next = null; + } + this.length--; + last.removed = true; + return last.connection; +} + +LinkedList.prototype.shift = function() { + if (this.root === null) { + return null; + } + + var result = this.root; + this.root = this.root.next; + this.length--; + result.removed = true; + return result.connection; +} + +function Node(list, connection, prev, next) { + this.list = list; + this.connection = connection; + this.prev = prev; + this.next = next; + this.removed = false; +} + +Node.prototype.remove = function() { + if (this.removed === true) { + return this.connection; + } + this.removed = true; + + if (this.prev === null) { + if (this.next === null) { + // The node is the root and has no children + this.root = null; + this.last = null; + } + else { + // The node is the root + this.root = this.next; + this.next.prev = null; + } + } + else { + this.prev.next = this.next; + if (this.next) { + this.next.prev = this.prev + } + } + this.list.length--; + return this.connection; +} + +module.exports = LinkedList; diff --git a/lib/metadata.js b/lib/metadata.js index 757cf73..fc6532c 100644 --- a/lib/metadata.js +++ b/lib/metadata.js @@ -5,6 +5,7 @@ function Metadata(resolve, reject, query, options) { this.query = query; // The query in case we have to build a backtrace this.options = options || {}; this.cursor = false; + this.released = false; } Metadata.prototype.setCursor = function() { diff --git a/lib/pool.js b/lib/pool.js index 85e1412..da89e5f 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,5 +1,6 @@ var Promise = require('bluebird'); var Dequeue = require(__dirname+'/dequeue.js'); +var LinkedList = require(__dirname+'/linked_list.js'); var helper = require(__dirname+'/helper.js'); var Err = require(__dirname+'/error.js'); var events = require('events'); @@ -27,14 +28,14 @@ function Pool(r, options) { authKey: options.authKey, user: options.user, password: options.password, - cursor: options.cursor || false, - stream: options.stream || false, ssl: options.ssl || false, - pingInterval: options.pingInterval || this._r._pingInterval + pingInterval: options.pingInterval || this._r._pingInterval, + releaseFeed: options.releaseFeed || this._r._releaseFeed } this._log = options._log; - this._pool = new Dequeue(this.options.buffer+1); + //this._pool = new Dequeue(this.options.buffer+1); + this._pool = new LinkedList(); this._draining = false; this._drainingHandlers = null; // Store the resolve/reject methods once draining is called this._localhostToDrain = 0; // number of connections to "localhost" to remove @@ -125,7 +126,7 @@ Pool.prototype._increaseNumConnections = function() { } -Pool.prototype.putConnection = function(connection) { +Pool.prototype.putConnection = function(connection, shift) { var self = this; if (connection.end === false) { // Temporary attempt to fix #192 - this should not happen. @@ -149,52 +150,43 @@ Pool.prototype.putConnection = function(connection) { self._drainingHandlers.resolve(); } } - else if (self._extraConnections > 0) { + else if ((self._extraConnections > 0) && (Object.keys(connection.metadata).length === 0)) { self._extraConnections--; connection.close().error(function(error) { self._log('Fail to properly close a connection. Error:'+JSON.stringify(error)); }); clearTimeout(connection.timeout); } - /* - // We let the pool garbage collect these connections - else if (self.getAvailableLength()+1 > self.options.buffer) { // +1 for the connection we may put back - // Note that because we have available connections here, the pool master has no pending - // queries. - connection.close().error(function(error) { - self._log('Fail to properly close a connection. Error:'+JSON.stringify(error)); - }); - clearTimeout(connection.timeout); - } - */ else { - self._pool.push(connection); + var connectionLink; + if (shift === true) { + connectionLink = self._pool.unshift(connection); + } + else { + connectionLink = self._pool.push(connection); + } self.emit('available-size', self._pool.getLength()); self.emit('available-size-diff', 1); - self.emit('new-connection', connection); clearTimeout(connection.timeout); var timeoutCb = function() { - if (self._pool.get(0) === connection) { - if (self._pool.getLength() > self.options.buffer) { - self._pool.shift().close(); - self.emit('available-size', self._pool.getLength()); - self.emit('available-size-diff', -1); - } - else { - connection.timeout = setTimeout(timeoutCb, self.options.timeoutGb); - } + if (self._pool.getLength() > self.options.buffer) { + connectionLink.remove().close(); + self.emit('available-size', self._pool.getLength()); + self.emit('available-size-diff', -1); } else { - // This should technically never happens connection.timeout = setTimeout(timeoutCb, self.options.timeoutGb); } } connection.timeout = setTimeout(timeoutCb, self.options.timeoutGb); + self.emit('new-connection', connection); } }; +var index = 0; Pool.prototype.createConnection = function() { + index++; var self = this; self._increaseNumConnections(); self._openingConnections++; @@ -203,7 +195,7 @@ Pool.prototype.createConnection = function() { if (self._draining === true) { return; // Do not create a new connection if we are draining the pool. } - + (function(index) { return self._r.connect(self.options.connection).then(function(connection) { self.emit('created-connection', self); @@ -232,6 +224,10 @@ Pool.prototype.createConnection = function() { // We are going to close connection, but we don't want another process to use it before // So we remove it from the pool now (if it's inside) self._log('Error emitted by a connection: '+JSON.stringify(error)); + connection.node.remove(); + self.emit('available-size', self._pool.getLength()); + self.emit('available-size-diff', -1); + /* for(var i=0; i= 3.0.1" + "bluebird": ">= 3.0.1", + "luxon": "1.2.0", + "iterall": "^1.2.2" }, "devDependencies": { "mocha": ">= 1.20.0", diff --git a/test/backtrace.js b/test/backtrace.js index 14d8ed5..e306798 100644 --- a/test/backtrace.js +++ b/test/backtrace.js @@ -1090,7 +1090,7 @@ It('Test backtrace for r.expr([1,2,3]).orderBy("foo").add(1)', function* (done) done(new Error("Should have thrown an error")) } catch(e) { - if (e.message === "Cannot perform get_field on a non-object non-sequence `2` in:\nr.expr([1, 2, 3]).orderBy(\"foo\").add(1)\n ^^^^^ \n") { + if (e.message === "Cannot perform get_field on a non-object non-sequence `3` in:\nr.expr([1, 2, 3]).orderBy(\"foo\").add(1)\n ^^^^^ \n") { done() } else { @@ -3164,7 +3164,52 @@ It('Test backtrace for r.table("foo").add(1).add(1).add("hello-super-long-string done(new Error("Should have thrown an error")) } catch(e) { - if (e.message === "Table `test.foo` does not exist in:\nr.table(\"foo\").add(1).add(1).add(\"hello-super-long-string\").add(\"another-long-string\")\n^^^^^^^^^^^^^^ \n .add(\"one-last-string\").map(function(var_1) {\n return r.expr([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]).map(function(var_2) {\n return var_2(\"b\").add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .mul(var_2(\"b\")).merge({\n firstName: \"xxxxxx\",\n lastName: \"yyyy\",\n email: \"xxxxx@yyyy.com\",\n phone: \"xxx-xxx-xxxx\"\n })\n }).add(2).map(function(var_3) {\n return var_3.add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n .add(\"hello-super-long-string\").add(\"another-long-string\").add(\"one-last-string\")\n })\n })\n") { + if (e.message === `Database \`test\` does not exist in: +r.table("foo").add(1).add(1).add("hello-super-long-string").add("another-long-string") +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("one-last-string").map(function(var_1) { + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + return r.expr([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]).map(function(var_2) { + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + return var_2("b").add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .mul(var_2("b")).merge({ + ^^^^^^^^^^^^^^^^^^^^^^^^ + firstName: "xxxxxx", + ^^^^^^^^^^^^^^^^^^^^ + lastName: "yyyy", + ^^^^^^^^^^^^^^^^^ + email: "xxxxx@yyyy.com", + ^^^^^^^^^^^^^^^^^^^^^^^^ + phone: "xxx-xxx-xxxx" + ^^^^^^^^^^^^^^^^^^^^^ + }) + ^^ + }).add(2).map(function(var_3) { + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + return var_3.add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + .add("hello-super-long-string").add("another-long-string").add("one-last-string") + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + }) + ^^ + }) + ^^ +`) { done() } else { diff --git a/test/cursor.js b/test/cursor.js index 73e9589..e4865e0 100644 --- a/test/cursor.js +++ b/test/cursor.js @@ -2,6 +2,7 @@ var config = require('./config.js'); var r = require('../lib')(config); var util = require(__dirname+'/util/common.js'); var assert = require('assert'); +var iterall = require('iterall'); var uuid = util.uuid; var It = util.It @@ -544,6 +545,7 @@ It('`changes` with `includeTypes` should work', function* (done) { It('`next` should work on a feed', function* (done) { try { + yield util.sleep(1000); feed = yield r.db(dbName).table(tableName2).changes().run(); setTimeout(function() { r.db(dbName).table(tableName2).update({foo: r.now()}).run(); @@ -841,5 +843,35 @@ It('`eachAync` should return an error if the connection dies', function* (done) // Kill the TCP connection connection.connection.end() }) +It('`asyncIterator` should return an async iterator', function* (done) { + try { + var connection = yield r.connect({host: config.host, port: config.port, authKey: config.authKey}); + assert(connection); - + var feed = yield r.db(dbName).table(tableName).changes().run(connection); + var iterator = feed.asyncIterator(); + assert(iterall.isAsyncIterable(iterator)) + connection.connection.end() + done(); + } catch(err) { + done(err); + } +}) +It('`asyncIterator` should have a working `next`method', function* (done) { + try { + feed = yield r.db(dbName).table(tableName2).changes().run(); + var value = 1; + setTimeout(function() { + r.db(dbName).table(tableName2).update({foo: value}).run(); + }, 100) + assert(feed); + var iterator = feed.asyncIterator(); + assert(iterator); + var i=0; + var result = yield iterator.next(); + assert(result.value.new_val.foo === value); + done(); + } catch(err) { + done(err); + } +}) diff --git a/test/dates-and-times.js b/test/dates-and-times.js index 1ce8d60..ea61838 100644 --- a/test/dates-and-times.js +++ b/test/dates-and-times.js @@ -471,3 +471,15 @@ It('`epochTime` should work', function* (done) { done(e); } }) + +It('`ISO8601` run parameter should work', function* (done) { + try { + result = yield r.time(2018,5,2,13,0,0,"-03:00").run({timeFormat: "ISO8601"}); + assert.equal(typeof result, "string"); + assert.equal(result, "2018-05-02T13:00:00.000-03:00") + done(); + } + catch(e) { + done(e); + } +}) \ No newline at end of file diff --git a/test/pool_legacy.js b/test/pool_legacy.js index 8f5a84f..7f27a99 100644 --- a/test/pool_legacy.js +++ b/test/pool_legacy.js @@ -164,7 +164,7 @@ It('The pool should shrink if a connection is not used for some time', function* assert.equal(r.getPool(0).getAvailableLength(), options.buffer) assert.equal(r.getPool(0).getLength(), options.buffer) done() - },400) + },1000) } catch(e) { done(e);