Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ npm-debug.log
node_modules
test.js
rethinkdbdash.js
package-lock.json
rethinkdbdash_datadir*
36 changes: 21 additions & 15 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ function Connection(r, options, resolve, reject) {
this.r = r;
this.state = 0; // Track the progress of the handshake. -1 will be used for an error state.

// Set default options - We have to save them in case the user tries to reconnect
if (!helper.isPlainObject(options)) options = {};
// Retain `options` for reconnecting
this.options = options || (options = {});

this.host = options.host || r._host;
this.port = options.port || r._port;
if (options.authKey != null) {
Expand All @@ -52,6 +53,7 @@ function Connection(r, options, resolve, reject) {
}

this.authKey = options.authKey || r._authKey;
this.releaseFeed = options.releaseFeed || r._releaseFeed;
// period in *seconds* for the connection to be opened
this.timeoutConnect = options.timeout || r._timeoutConnect;
// The connection will be pinged every <pingInterval> seconds
Expand Down Expand Up @@ -212,6 +214,9 @@ function Connection(r, options, resolve, reject) {
self.connection.toJSON = function() { // We want people to be able to jsonify a cursor
return '"A socket object cannot be converted to JSON due to circular references."'
}
// For the pool implementation
this.node = null;
this.id = Math.random();
}

util.inherits(Connection, events.EventEmitter);
Expand Down Expand Up @@ -518,6 +523,10 @@ Connection.prototype._processResponse = function(response, token) {
if (includesStates === true) {
cursor.setIncludesStates();
}
if ((cursor.getType() !== 'Cursor') && (self.releaseFeed === true)) {
self.metadata[token].released = true;
self.emit('release-feed');
}
if ((self.metadata[token].options.cursor === true) || ((self.metadata[token].options.cursor === undefined) && (self.r._options.cursor === true))) {
// Return a cursor
if (self.metadata[token].options.profile === true) {
Expand Down Expand Up @@ -579,7 +588,9 @@ Connection.prototype._processResponse = function(response, token) {
}
}
else if (type === responseTypes.SUCCESS_SEQUENCE) {
self.emit('release');
if (self.metadata[token].released === false) {
self.emit('release');
}

if (typeof self.metadata[token].resolve === 'function') {
currentResolve = self.metadata[token].resolve;
Expand Down Expand Up @@ -678,6 +689,11 @@ Connection.prototype._processResponse = function(response, token) {
Connection.prototype.reconnect = function(options, callback) {
var self = this;

// When `options.connection` is defined, you must create a new socket to reconnect.
if (self.options.connection) {
throw new Err.ReqlRuntimeError('Cannot call `reconnect` if `options.connection` was defined');
}

if (typeof options === 'function') {
callback = options;
options = {};
Expand All @@ -688,12 +704,7 @@ Connection.prototype.reconnect = function(options, callback) {
if (options.noreplyWait === true) {
var p = new Promise(function(resolve, reject) {
self.close(options).then(function() {
self.r.connect({
host: self.host,
port: self.port,
authKey: self.authKey,
db: self.db
}).then(function(c) {
self.r.connect(self.options).then(function(c) {
resolve(c);
}).error(function(e) {
reject(e);
Expand All @@ -704,12 +715,7 @@ Connection.prototype.reconnect = function(options, callback) {
}).nodeify(callback);
}
else {
return self.r.connect({
host: self.host,
port: self.port,
authKey: self.authKey,
db: self.db
}, callback);
return self.r.connect(self.options, callback);
}

return p;
Expand Down
32 changes: 32 additions & 0 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var Promise = require('bluebird');
var Err = require(__dirname+'/error.js');
var helper = require(__dirname+'/helper.js');
var EventEmitter = require('events').EventEmitter;
var $$asyncIterator = require('iterall').$$asyncIterator;

var MAX_CALL_STACK = 1000;

Expand Down Expand Up @@ -366,6 +367,37 @@ Cursor.prototype._eachCb = function(err, data) {
}
}

Cursor.prototype.asyncIterator = function() {
var self = this;
return {
next: function() {
var iter = this;
return self._next().then(value => {
return {
done: false,
value: value
};
}).error(function(error) {
if ((error.message !== 'You cannot retrieve data from a cursor that is closed.') &&
(error.message.match(/You cannot call `next` on a closed/) === null)) {
return iter.throw(error);
} else {
return iter.return();
}
});
},
return: function() {
return Promise.resolve({ value: undefined, done: true });
},
throw: function(err) {
return Promise.reject(err);
},
[$$asyncIterator]: function() {
return this;
}

}}

var methods = [
'addListener',
'on',
Expand Down
8 changes: 5 additions & 3 deletions lib/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var protodef = require(__dirname+'/protodef.js');
var termTypes = protodef.Term.TermType;
var datumTypes = protodef.Datum.DatumType;
var net = require('net');
var luxon = require('luxon');


function createLogger(poolMaster, silent) {
Expand Down Expand Up @@ -61,10 +62,11 @@ module.exports.loopKeys = loopKeys;

function convertPseudotype(obj, options) {
var reqlType = obj['$reql_type$'];
if (reqlType === 'TIME' && options['timeFormat'] !== 'raw') {
if (reqlType === 'TIME' && (options['timeFormat'] === 'native' || !options['timeFormat'])) {
return new Date(obj['epoch_time'] * 1000);
}
else if (reqlType === 'GROUPED_DATA' && options['groupFormat'] !== 'raw') {
} else if (reqlType === 'TIME' && options['timeFormat'] === 'ISO8601') {
return luxon.DateTime.fromMillis(obj['epoch_time'] * 1000).setZone('UTC' + obj['timezone']).toISO()
} else if (reqlType === 'GROUPED_DATA' && options['groupFormat'] !== 'raw') {
var result = [];
for (var i = 0, len = obj['data'].length, ref; i < len; i++) {
ref = obj.data[i];
Expand Down
1 change: 1 addition & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ r.prototype._user = 'admin';
r.prototype._password = '';
r.prototype._timeoutConnect = 20; // seconds
r.prototype._pingInterval = -1; // seconds
r.prototype._releaseFeed = false;

r.prototype._nestingLevel = 100;
r.prototype._arrayLimit = 100000;
Expand Down
110 changes: 110 additions & 0 deletions lib/linked_list.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
function LinkedList() {
this.root = null;
this.last = null;
this.length = 0;
}

LinkedList.prototype.getLength = function() {
return this.length;
}

LinkedList.prototype.push = function(connection) {
var node = new Node(this, connection, this.last, null);
connection.node = node;
if (this.root === null) {
this.root = node;
this.last = node;
}
else {
this.last.next = node;
this.last = node;
}
this.length++;
// Keep a reference to the node in the connection
return node;
}

LinkedList.prototype.unshift = function(connection) {
var node = new Node(this, connection, null, this.root);
connection.node = node;
if (this.root) {
this.root.prev = node;
}
this.root = node;
if (this.last === null) {
this.last = node;
}
this.length++;
return node;
}

// Pop a node and return the connection (not the node)
LinkedList.prototype.pop = function() {
if (this.last === null) {
return null;
}

var last = this.last
if (this.last.prev === null) {
// this.last is the root
this.root = null;
this.last = null;
}
else {
this.last = this.last.prev;
this.last.next = null;
}
this.length--;
last.removed = true;
return last.connection;
}

LinkedList.prototype.shift = function() {
if (this.root === null) {
return null;
}

var result = this.root;
this.root = this.root.next;
this.length--;
result.removed = true;
return result.connection;
}

function Node(list, connection, prev, next) {
this.list = list;
this.connection = connection;
this.prev = prev;
this.next = next;
this.removed = false;
}

Node.prototype.remove = function() {
if (this.removed === true) {
return this.connection;
}
this.removed = true;

if (this.prev === null) {
if (this.next === null) {
// The node is the root and has no children
this.root = null;
this.last = null;
}
else {
// The node is the root
this.root = this.next;
this.next.prev = null;
}
}
else {
this.prev.next = this.next;
if (this.next) {
this.next.prev = this.prev
}
}
this.list.length--;
return this.connection;
}

module.exports = LinkedList;
1 change: 1 addition & 0 deletions lib/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ function Metadata(resolve, reject, query, options) {
this.query = query; // The query in case we have to build a backtrace
this.options = options || {};
this.cursor = false;
this.released = false;
}

Metadata.prototype.setCursor = function() {
Expand Down
Loading