1
0
mirror of https://github.com/mgerb/mywebsite synced 2026-01-12 02:42:48 +00:00
Files
mywebsite/node_modules/mongodb-core/lib/connection/connection.js

489 lines
18 KiB
JavaScript

"use strict";
var inherits = require('util').inherits
, EventEmitter = require('events').EventEmitter
, net = require('net')
, tls = require('tls')
, f = require('util').format
, getSingleProperty = require('./utils').getSingleProperty
, debugOptions = require('./utils').debugOptions
, Response = require('./commands').Response
, MongoError = require('../error')
, Logger = require('./logger');
var _id = 0;
var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
, 'connectionTimeout', 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert'
, 'rejectUnauthorized', 'promoteLongs', 'checkServerIdentity'];
/**
* Creates a new Connection instance
* @class
* @param {string} options.host The server host
* @param {number} options.port The server port
* @param {number} [options.size=5] Server connection pool size
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
* @param {number} [options.socketTimeout=0] TCP Socket timeout setting
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
* @param {boolean} [options.ssl=false] Use SSL for connection
* @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
* @param {Buffer} [options.ca] SSL Certificate store binary buffer
* @param {Buffer} [options.cert] SSL Certificate binary buffer
* @param {Buffer} [options.key] SSL Key file binary buffer
* @param {string} [options.passphrase] SSL Certificate pass phrase
* @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
* @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
* @fires Connection#connect
* @fires Connection#close
* @fires Connection#error
* @fires Connection#timeout
* @fires Connection#parseError
* @return {Connection} A cursor instance
*/
var Connection = function(options) {
// Add event listener
EventEmitter.call(this);
// Set empty if no options passed
this.options = options || {};
// Identification information
this.id = _id++;
// Logger instance
this.logger = Logger('Connection', options);
// No bson parser passed in
if(!options.bson) throw new Error("must pass in valid bson parser");
// Get bson parser
this.bson = options.bson;
// Grouping tag used for debugging purposes
this.tag = options.tag;
// Message handler
this.messageHandler = options.messageHandler;
// Max BSON message size
this.maxBsonMessageSize = options.maxBsonMessageSize || (1024 * 1024 * 16 * 4);
// Debug information
if(this.logger.isDebug()) this.logger.debug(f('creating connection %s with options [%s]', this.id, JSON.stringify(debugOptions(debugFields, options))));
// Default options
this.port = options.port || 27017;
this.host = options.host || 'localhost';
this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
this.keepAliveInitialDelay = options.keepAliveInitialDelay || 0;
this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
this.connectionTimeout = options.connectionTimeout || 0;
this.socketTimeout = options.socketTimeout || 0;
// If connection was destroyed
this.destroyed = false;
// Check if we have a domain socket
this.domainSocket = this.host.indexOf('\/') != -1;
// Serialize commands using function
this.singleBufferSerializtion = typeof options.singleBufferSerializtion == 'boolean' ? options.singleBufferSerializtion : true;
this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
// SSL options
this.ca = options.ca || null;
this.cert = options.cert || null;
this.key = options.key || null;
this.passphrase = options.passphrase || null;
this.ssl = typeof options.ssl == 'boolean' ? options.ssl : false;
this.rejectUnauthorized = typeof options.rejectUnauthorized == 'boolean' ? options.rejectUnauthorized : true;
this.checkServerIdentity = typeof options.checkServerIdentity == 'boolean'
|| typeof options.checkServerIdentity == 'function' ? options.checkServerIdentity : true;
// If ssl not enabled
if(!this.ssl) this.rejectUnauthorized = false;
// Response options
this.responseOptions = {
promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true
}
// Flushing
this.flushing = false;
this.queue = [];
// Internal state
this.connection = null;
this.writeStream = null;
}
inherits(Connection, EventEmitter);
//
// Connection handlers
var errorHandler = function(self) {
return function(err) {
// Debug information
if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] errored out with [%s]', self.id, self.host, self.port, JSON.stringify(err)));
// Emit the error
if(self.listeners('error').length > 0) self.emit("error", MongoError.create(err), self);
}
}
var timeoutHandler = function(self) {
return function() {
// Debug information
if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
// Emit timeout error
self.emit("timeout"
, MongoError.create(f("connection %s to %s:%s timed out", self.id, self.host, self.port))
, self);
}
}
var closeHandler = function(self) {
return function(hadError) {
// Debug information
if(self.logger.isDebug()) self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
// Emit close event
if(!hadError) {
self.emit("close"
, MongoError.create(f("connection %s to %s:%s closed", self.id, self.host, self.port))
, self);
}
}
}
var dataHandler = function(self) {
return function(data) {
// Parse until we are done with the data
while(data.length > 0) {
// If we still have bytes to read on the current message
if(self.bytesRead > 0 && self.sizeOfMessage > 0) {
// Calculate the amount of remaining bytes
var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
// Check if the current chunk contains the rest of the message
if(remainingBytesToRead > data.length) {
// Copy the new data into the exiting buffer (should have been allocated when we know the message size)
data.copy(self.buffer, self.bytesRead);
// Adjust the number of bytes read so it point to the correct index in the buffer
self.bytesRead = self.bytesRead + data.length;
// Reset state of buffer
data = new Buffer(0);
} else {
// Copy the missing part of the data into our current buffer
data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
// Slice the overflow into a new buffer that we will then re-parse
data = data.slice(remainingBytesToRead);
// Emit current complete message
try {
var emitBuffer = self.buffer;
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Emit the buffer
self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
} catch(err) {
var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
bytesRead:self.bytesRead,
stubBuffer:self.stubBuffer}};
// We got a parse Error fire it off then keep going
self.emit("parseError", errorObject, self);
}
}
} else {
// Stub buffer is kept in case we don't get enough bytes to determine the
// size of the message (< 4 bytes)
if(self.stubBuffer != null && self.stubBuffer.length > 0) {
// If we have enough bytes to determine the message size let's do it
if(self.stubBuffer.length + data.length > 4) {
// Prepad the data
var newData = new Buffer(self.stubBuffer.length + data.length);
self.stubBuffer.copy(newData, 0);
data.copy(newData, self.stubBuffer.length);
// Reassign for parsing
data = newData;
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
} else {
// Add the the bytes to the stub buffer
var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
// Copy existing stub buffer
self.stubBuffer.copy(newStubBuffer, 0);
// Copy missing part of the data
data.copy(newStubBuffer, self.stubBuffer.length);
// Exit parsing loop
data = new Buffer(0);
}
} else {
if(data.length > 4) {
// Retrieve the message size
// var sizeOfMessage = data.readUInt32LE(0);
var sizeOfMessage = data[0] | data[1] << 8 | data[2] << 16 | data[3] << 24;
// If we have a negative sizeOfMessage emit error and return
if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
sizeOfMessage: sizeOfMessage,
bytesRead: self.bytesRead,
stubBuffer: self.stubBuffer}};
// We got a parse Error fire it off then keep going
self.emit("parseError", errorObject, self);
return;
}
// Ensure that the size of message is larger than 0 and less than the max allowed
if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length) {
self.buffer = new Buffer(sizeOfMessage);
// Copy all the data into the buffer
data.copy(self.buffer, 0);
// Update bytes read
self.bytesRead = data.length;
// Update sizeOfMessage
self.sizeOfMessage = sizeOfMessage;
// Ensure stub buffer is null
self.stubBuffer = null;
// Exit parsing loop
data = new Buffer(0);
} else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage == data.length) {
try {
var emitBuffer = data;
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Exit parsing loop
data = new Buffer(0);
// Emit the message
self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
} catch (err) {
var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
bytesRead:self.bytesRead,
stubBuffer:self.stubBuffer}};
// We got a parse Error fire it off then keep going
self.emit("parseError", errorObject, self);
}
} else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
var errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
sizeOfMessage:sizeOfMessage,
bytesRead:0,
buffer:null,
stubBuffer:null}};
// We got a parse Error fire it off then keep going
self.emit("parseError", errorObject, self);
// Clear out the state of the parser
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Exit parsing loop
data = new Buffer(0);
} else {
var emitBuffer = data.slice(0, sizeOfMessage);
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Copy rest of message
data = data.slice(sizeOfMessage);
// Emit the message
self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
}
} else {
// Create a buffer that contains the space for the non-complete message
self.stubBuffer = new Buffer(data.length)
// Copy the data to the stub buffer
data.copy(self.stubBuffer, 0);
// Exit parsing loop
data = new Buffer(0);
}
}
}
}
}
}
/**
* Connect
* @method
*/
Connection.prototype.connect = function(_options) {
var self = this;
_options = _options || {};
// Check if we are overriding the promoteLongs
if(typeof _options.promoteLongs == 'boolean') {
self.responseOptions.promoteLongs = _options.promoteLongs;
}
// Create new connection instance
self.connection = self.domainSocket
? net.createConnection(self.host)
: net.createConnection(self.port, self.host);
// Set the options for the connection
self.connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
self.connection.setTimeout(self.connectionTimeout);
self.connection.setNoDelay(self.noDelay);
// If we have ssl enabled
if(self.ssl) {
var sslOptions = {
socket: self.connection
, rejectUnauthorized: self.rejectUnauthorized
}
if(self.ca) sslOptions.ca = self.ca;
if(self.cert) sslOptions.cert = self.cert;
if(self.key) sslOptions.key = self.key;
if(self.passphrase) sslOptions.passphrase = self.passphrase;
// Override checkServerIdentity behavior
if(self.checkServerIdentity == false) {
// Skip the identiy check by retuning undefined as per node documents
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
sslOptions.checkServerIdentity = function(servername, cert) {
return undefined;
}
} else if(typeof self.checkServerIdentity == 'function') {
sslOptions.checkServerIdentity = self.checkServerIdentity;
}
// Attempt SSL connection
self.connection = tls.connect(self.port, self.host, sslOptions, function() {
// Error on auth or skip
if(self.connection.authorizationError && self.rejectUnauthorized) {
return self.emit("error", self.connection.authorizationError, self, {ssl:true});
}
// Set socket timeout instead of connection timeout
self.connection.setTimeout(self.socketTimeout);
// We are done emit connect
self.emit('connect', self);
});
self.connection.setTimeout(self.connectionTimeout);
} else {
self.connection.on('connect', function() {
// Set socket timeout instead of connection timeout
self.connection.setTimeout(self.socketTimeout);
// Emit connect event
self.emit('connect', self);
});
}
// Add handlers for events
self.connection.once('error', errorHandler(self));
self.connection.once('timeout', timeoutHandler(self));
self.connection.once('close', closeHandler(self));
self.connection.on('data', dataHandler(self));
}
/**
* Destroy connection
* @method
*/
Connection.prototype.destroy = function() {
if(this.connection) {
this.connection.end();
this.connection.destroy();
}
this.destroyed = true;
}
/**
* Write to connection
* @method
* @param {Command} command Command to write out need to implement toBin and toBinUnified
*/
Connection.prototype.write = function(buffer) {
// Debug Log
if(this.logger.isDebug()) {
if(!Array.isArray(buffer)) {
this.logger.debug(f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port));
} else {
for(var i = 0; i < buffer.length; i++)
this.logger.debug(f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port));
}
}
// Write out the command
if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
// Iterate over all buffers and write them in order to the socket
for(var i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
}
/**
* Return id of connection as a string
* @method
* @return {string}
*/
Connection.prototype.toString = function() {
return "" + this.id;
}
/**
* Return json object of connection
* @method
* @return {object}
*/
Connection.prototype.toJSON = function() {
return {id: this.id, host: this.host, port: this.port};
}
/**
* Is the connection connected
* @method
* @return {boolean}
*/
Connection.prototype.isConnected = function() {
if(this.destroyed) return false;
return !this.connection.destroyed && this.connection.writable;
}
/**
* A server connect event, used to verify that the connection is up and running
*
* @event Connection#connect
* @type {Connection}
*/
/**
* The server connection closed, all pool connections closed
*
* @event Connection#close
* @type {Connection}
*/
/**
* The server connection caused an error, all pool connections closed
*
* @event Connection#error
* @type {Connection}
*/
/**
* The server connection timed out, all pool connections closed
*
* @event Connection#timeout
* @type {Connection}
*/
/**
* The driver experienced an invalid message, all pool connections closed
*
* @event Connection#parseError
* @type {Connection}
*/
module.exports = Connection;