| /** |
| * Module dependencies. |
| */ |
| |
| var transports = require('./transports/index'); |
| var Emitter = require('component-emitter'); |
| var debug = require('debug')('engine.io-client:socket'); |
| var index = require('indexof'); |
| var parser = require('engine.io-parser'); |
| var parseuri = require('parseuri'); |
| var parseqs = require('parseqs'); |
| |
| /** |
| * Module exports. |
| */ |
| |
| module.exports = Socket; |
| |
| /** |
| * Socket constructor. |
| * |
| * @param {String|Object} uri or options |
| * @param {Object} options |
| * @api public |
| */ |
| |
| function Socket (uri, opts) { |
| if (!(this instanceof Socket)) return new Socket(uri, opts); |
| |
| opts = opts || {}; |
| |
| if (uri && 'object' === typeof uri) { |
| opts = uri; |
| uri = null; |
| } |
| |
| if (uri) { |
| uri = parseuri(uri); |
| opts.hostname = uri.host; |
| opts.secure = uri.protocol === 'https' || uri.protocol === 'wss'; |
| opts.port = uri.port; |
| if (uri.query) opts.query = uri.query; |
| } else if (opts.host) { |
| opts.hostname = parseuri(opts.host).host; |
| } |
| |
| this.secure = null != opts.secure ? opts.secure |
| : (global.location && 'https:' === location.protocol); |
| |
| if (opts.hostname && !opts.port) { |
| // if no port is specified manually, use the protocol default |
| opts.port = this.secure ? '443' : '80'; |
| } |
| |
| this.agent = opts.agent || false; |
| this.hostname = opts.hostname || |
| (global.location ? location.hostname : 'localhost'); |
| this.port = opts.port || (global.location && location.port |
| ? location.port |
| : (this.secure ? 443 : 80)); |
| this.query = opts.query || {}; |
| if ('string' === typeof this.query) this.query = parseqs.decode(this.query); |
| this.upgrade = false !== opts.upgrade; |
| this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/'; |
| this.forceJSONP = !!opts.forceJSONP; |
| this.jsonp = false !== opts.jsonp; |
| this.forceBase64 = !!opts.forceBase64; |
| this.enablesXDR = !!opts.enablesXDR; |
| this.timestampParam = opts.timestampParam || 't'; |
| this.timestampRequests = opts.timestampRequests; |
| this.transports = opts.transports || ['polling', 'websocket']; |
| this.transportOptions = opts.transportOptions || {}; |
| this.readyState = ''; |
| this.writeBuffer = []; |
| this.prevBufferLen = 0; |
| this.policyPort = opts.policyPort || 843; |
| this.rememberUpgrade = opts.rememberUpgrade || false; |
| this.binaryType = null; |
| this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades; |
| this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false; |
| |
| if (true === this.perMessageDeflate) this.perMessageDeflate = {}; |
| if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) { |
| this.perMessageDeflate.threshold = 1024; |
| } |
| |
| // SSL options for Node.js client |
| this.pfx = opts.pfx || null; |
| this.key = opts.key || null; |
| this.passphrase = opts.passphrase || null; |
| this.cert = opts.cert || null; |
| this.ca = opts.ca || null; |
| this.ciphers = opts.ciphers || null; |
| this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized; |
| this.forceNode = !!opts.forceNode; |
| |
| // other options for Node.js client |
| var freeGlobal = typeof global === 'object' && global; |
| if (freeGlobal.global === freeGlobal) { |
| if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) { |
| this.extraHeaders = opts.extraHeaders; |
| } |
| |
| if (opts.localAddress) { |
| this.localAddress = opts.localAddress; |
| } |
| } |
| |
| // set on handshake |
| this.id = null; |
| this.upgrades = null; |
| this.pingInterval = null; |
| this.pingTimeout = null; |
| |
| // set on heartbeat |
| this.pingIntervalTimer = null; |
| this.pingTimeoutTimer = null; |
| |
| this.open(); |
| } |
| |
| Socket.priorWebsocketSuccess = false; |
| |
| /** |
| * Mix in `Emitter`. |
| */ |
| |
| Emitter(Socket.prototype); |
| |
| /** |
| * Protocol version. |
| * |
| * @api public |
| */ |
| |
| Socket.protocol = parser.protocol; // this is an int |
| |
| /** |
| * Expose deps for legacy compatibility |
| * and standalone browser access. |
| */ |
| |
| Socket.Socket = Socket; |
| Socket.Transport = require('./transport'); |
| Socket.transports = require('./transports/index'); |
| Socket.parser = require('engine.io-parser'); |
| |
| /** |
| * Creates transport of the given type. |
| * |
| * @param {String} transport name |
| * @return {Transport} |
| * @api private |
| */ |
| |
| Socket.prototype.createTransport = function (name) { |
| debug('creating transport "%s"', name); |
| var query = clone(this.query); |
| |
| // append engine.io protocol identifier |
| query.EIO = parser.protocol; |
| |
| // transport name |
| query.transport = name; |
| |
| // per-transport options |
| var options = this.transportOptions[name] || {}; |
| |
| // session id if we already have one |
| if (this.id) query.sid = this.id; |
| |
| var transport = new transports[name]({ |
| query: query, |
| socket: this, |
| agent: options.agent || this.agent, |
| hostname: options.hostname || this.hostname, |
| port: options.port || this.port, |
| secure: options.secure || this.secure, |
| path: options.path || this.path, |
| forceJSONP: options.forceJSONP || this.forceJSONP, |
| jsonp: options.jsonp || this.jsonp, |
| forceBase64: options.forceBase64 || this.forceBase64, |
| enablesXDR: options.enablesXDR || this.enablesXDR, |
| timestampRequests: options.timestampRequests || this.timestampRequests, |
| timestampParam: options.timestampParam || this.timestampParam, |
| policyPort: options.policyPort || this.policyPort, |
| pfx: options.pfx || this.pfx, |
| key: options.key || this.key, |
| passphrase: options.passphrase || this.passphrase, |
| cert: options.cert || this.cert, |
| ca: options.ca || this.ca, |
| ciphers: options.ciphers || this.ciphers, |
| rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized, |
| perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate, |
| extraHeaders: options.extraHeaders || this.extraHeaders, |
| forceNode: options.forceNode || this.forceNode, |
| localAddress: options.localAddress || this.localAddress, |
| requestTimeout: options.requestTimeout || this.requestTimeout, |
| protocols: options.protocols || void (0) |
| }); |
| |
| return transport; |
| }; |
| |
| function clone (obj) { |
| var o = {}; |
| for (var i in obj) { |
| if (obj.hasOwnProperty(i)) { |
| o[i] = obj[i]; |
| } |
| } |
| return o; |
| } |
| |
| /** |
| * Initializes transport to use and starts probe. |
| * |
| * @api private |
| */ |
| Socket.prototype.open = function () { |
| var transport; |
| if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) { |
| transport = 'websocket'; |
| } else if (0 === this.transports.length) { |
| // Emit error on next tick so it can be listened to |
| var self = this; |
| setTimeout(function () { |
| self.emit('error', 'No transports available'); |
| }, 0); |
| return; |
| } else { |
| transport = this.transports[0]; |
| } |
| this.readyState = 'opening'; |
| |
| // Retry with the next transport if the transport is disabled (jsonp: false) |
| try { |
| transport = this.createTransport(transport); |
| } catch (e) { |
| this.transports.shift(); |
| this.open(); |
| return; |
| } |
| |
| transport.open(); |
| this.setTransport(transport); |
| }; |
| |
| /** |
| * Sets the current transport. Disables the existing one (if any). |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.setTransport = function (transport) { |
| debug('setting transport %s', transport.name); |
| var self = this; |
| |
| if (this.transport) { |
| debug('clearing existing transport %s', this.transport.name); |
| this.transport.removeAllListeners(); |
| } |
| |
| // set up transport |
| this.transport = transport; |
| |
| // set up transport listeners |
| transport |
| .on('drain', function () { |
| self.onDrain(); |
| }) |
| .on('packet', function (packet) { |
| self.onPacket(packet); |
| }) |
| .on('error', function (e) { |
| self.onError(e); |
| }) |
| .on('close', function () { |
| self.onClose('transport close'); |
| }); |
| }; |
| |
| /** |
| * Probes a transport. |
| * |
| * @param {String} transport name |
| * @api private |
| */ |
| |
| Socket.prototype.probe = function (name) { |
| debug('probing transport "%s"', name); |
| var transport = this.createTransport(name, { probe: 1 }); |
| var failed = false; |
| var self = this; |
| |
| Socket.priorWebsocketSuccess = false; |
| |
| function onTransportOpen () { |
| if (self.onlyBinaryUpgrades) { |
| var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary; |
| failed = failed || upgradeLosesBinary; |
| } |
| if (failed) return; |
| |
| debug('probe transport "%s" opened', name); |
| transport.send([{ type: 'ping', data: 'probe' }]); |
| transport.once('packet', function (msg) { |
| if (failed) return; |
| if ('pong' === msg.type && 'probe' === msg.data) { |
| debug('probe transport "%s" pong', name); |
| self.upgrading = true; |
| self.emit('upgrading', transport); |
| if (!transport) return; |
| Socket.priorWebsocketSuccess = 'websocket' === transport.name; |
| |
| debug('pausing current transport "%s"', self.transport.name); |
| self.transport.pause(function () { |
| if (failed) return; |
| if ('closed' === self.readyState) return; |
| debug('changing transport and sending upgrade packet'); |
| |
| cleanup(); |
| |
| self.setTransport(transport); |
| transport.send([{ type: 'upgrade' }]); |
| self.emit('upgrade', transport); |
| transport = null; |
| self.upgrading = false; |
| self.flush(); |
| }); |
| } else { |
| debug('probe transport "%s" failed', name); |
| var err = new Error('probe error'); |
| err.transport = transport.name; |
| self.emit('upgradeError', err); |
| } |
| }); |
| } |
| |
| function freezeTransport () { |
| if (failed) return; |
| |
| // Any callback called by transport should be ignored since now |
| failed = true; |
| |
| cleanup(); |
| |
| transport.close(); |
| transport = null; |
| } |
| |
| // Handle any error that happens while probing |
| function onerror (err) { |
| var error = new Error('probe error: ' + err); |
| error.transport = transport.name; |
| |
| freezeTransport(); |
| |
| debug('probe transport "%s" failed because of error: %s', name, err); |
| |
| self.emit('upgradeError', error); |
| } |
| |
| function onTransportClose () { |
| onerror('transport closed'); |
| } |
| |
| // When the socket is closed while we're probing |
| function onclose () { |
| onerror('socket closed'); |
| } |
| |
| // When the socket is upgraded while we're probing |
| function onupgrade (to) { |
| if (transport && to.name !== transport.name) { |
| debug('"%s" works - aborting "%s"', to.name, transport.name); |
| freezeTransport(); |
| } |
| } |
| |
| // Remove all listeners on the transport and on self |
| function cleanup () { |
| transport.removeListener('open', onTransportOpen); |
| transport.removeListener('error', onerror); |
| transport.removeListener('close', onTransportClose); |
| self.removeListener('close', onclose); |
| self.removeListener('upgrading', onupgrade); |
| } |
| |
| transport.once('open', onTransportOpen); |
| transport.once('error', onerror); |
| transport.once('close', onTransportClose); |
| |
| this.once('close', onclose); |
| this.once('upgrading', onupgrade); |
| |
| transport.open(); |
| }; |
| |
| /** |
| * Called when connection is deemed open. |
| * |
| * @api public |
| */ |
| |
| Socket.prototype.onOpen = function () { |
| debug('socket open'); |
| this.readyState = 'open'; |
| Socket.priorWebsocketSuccess = 'websocket' === this.transport.name; |
| this.emit('open'); |
| this.flush(); |
| |
| // we check for `readyState` in case an `open` |
| // listener already closed the socket |
| if ('open' === this.readyState && this.upgrade && this.transport.pause) { |
| debug('starting upgrade probes'); |
| for (var i = 0, l = this.upgrades.length; i < l; i++) { |
| this.probe(this.upgrades[i]); |
| } |
| } |
| }; |
| |
| /** |
| * Handles a packet. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.onPacket = function (packet) { |
| if ('opening' === this.readyState || 'open' === this.readyState || |
| 'closing' === this.readyState) { |
| debug('socket receive: type "%s", data "%s"', packet.type, packet.data); |
| |
| this.emit('packet', packet); |
| |
| // Socket is live - any packet counts |
| this.emit('heartbeat'); |
| |
| switch (packet.type) { |
| case 'open': |
| this.onHandshake(JSON.parse(packet.data)); |
| break; |
| |
| case 'pong': |
| this.setPing(); |
| this.emit('pong'); |
| break; |
| |
| case 'error': |
| var err = new Error('server error'); |
| err.code = packet.data; |
| this.onError(err); |
| break; |
| |
| case 'message': |
| this.emit('data', packet.data); |
| this.emit('message', packet.data); |
| break; |
| } |
| } else { |
| debug('packet received with socket readyState "%s"', this.readyState); |
| } |
| }; |
| |
| /** |
| * Called upon handshake completion. |
| * |
| * @param {Object} handshake obj |
| * @api private |
| */ |
| |
| Socket.prototype.onHandshake = function (data) { |
| this.emit('handshake', data); |
| this.id = data.sid; |
| this.transport.query.sid = data.sid; |
| this.upgrades = this.filterUpgrades(data.upgrades); |
| this.pingInterval = data.pingInterval; |
| this.pingTimeout = data.pingTimeout; |
| this.onOpen(); |
| // In case open handler closes socket |
| if ('closed' === this.readyState) return; |
| this.setPing(); |
| |
| // Prolong liveness of socket on heartbeat |
| this.removeListener('heartbeat', this.onHeartbeat); |
| this.on('heartbeat', this.onHeartbeat); |
| }; |
| |
| /** |
| * Resets ping timeout. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.onHeartbeat = function (timeout) { |
| clearTimeout(this.pingTimeoutTimer); |
| var self = this; |
| self.pingTimeoutTimer = setTimeout(function () { |
| if ('closed' === self.readyState) return; |
| self.onClose('ping timeout'); |
| }, timeout || (self.pingInterval + self.pingTimeout)); |
| }; |
| |
| /** |
| * Pings server every `this.pingInterval` and expects response |
| * within `this.pingTimeout` or closes connection. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.setPing = function () { |
| var self = this; |
| clearTimeout(self.pingIntervalTimer); |
| self.pingIntervalTimer = setTimeout(function () { |
| debug('writing ping packet - expecting pong within %sms', self.pingTimeout); |
| self.ping(); |
| self.onHeartbeat(self.pingTimeout); |
| }, self.pingInterval); |
| }; |
| |
| /** |
| * Sends a ping packet. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.ping = function () { |
| var self = this; |
| this.sendPacket('ping', function () { |
| self.emit('ping'); |
| }); |
| }; |
| |
| /** |
| * Called on `drain` event |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.onDrain = function () { |
| this.writeBuffer.splice(0, this.prevBufferLen); |
| |
| // setting prevBufferLen = 0 is very important |
| // for example, when upgrading, upgrade packet is sent over, |
| // and a nonzero prevBufferLen could cause problems on `drain` |
| this.prevBufferLen = 0; |
| |
| if (0 === this.writeBuffer.length) { |
| this.emit('drain'); |
| } else { |
| this.flush(); |
| } |
| }; |
| |
| /** |
| * Flush write buffers. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.flush = function () { |
| if ('closed' !== this.readyState && this.transport.writable && |
| !this.upgrading && this.writeBuffer.length) { |
| debug('flushing %d packets in socket', this.writeBuffer.length); |
| this.transport.send(this.writeBuffer); |
| // keep track of current length of writeBuffer |
| // splice writeBuffer and callbackBuffer on `drain` |
| this.prevBufferLen = this.writeBuffer.length; |
| this.emit('flush'); |
| } |
| }; |
| |
| /** |
| * Sends a message. |
| * |
| * @param {String} message. |
| * @param {Function} callback function. |
| * @param {Object} options. |
| * @return {Socket} for chaining. |
| * @api public |
| */ |
| |
| Socket.prototype.write = |
| Socket.prototype.send = function (msg, options, fn) { |
| this.sendPacket('message', msg, options, fn); |
| return this; |
| }; |
| |
| /** |
| * Sends a packet. |
| * |
| * @param {String} packet type. |
| * @param {String} data. |
| * @param {Object} options. |
| * @param {Function} callback function. |
| * @api private |
| */ |
| |
| Socket.prototype.sendPacket = function (type, data, options, fn) { |
| if ('function' === typeof data) { |
| fn = data; |
| data = undefined; |
| } |
| |
| if ('function' === typeof options) { |
| fn = options; |
| options = null; |
| } |
| |
| if ('closing' === this.readyState || 'closed' === this.readyState) { |
| return; |
| } |
| |
| options = options || {}; |
| options.compress = false !== options.compress; |
| |
| var packet = { |
| type: type, |
| data: data, |
| options: options |
| }; |
| this.emit('packetCreate', packet); |
| this.writeBuffer.push(packet); |
| if (fn) this.once('flush', fn); |
| this.flush(); |
| }; |
| |
| /** |
| * Closes the connection. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.close = function () { |
| if ('opening' === this.readyState || 'open' === this.readyState) { |
| this.readyState = 'closing'; |
| |
| var self = this; |
| |
| if (this.writeBuffer.length) { |
| this.once('drain', function () { |
| if (this.upgrading) { |
| waitForUpgrade(); |
| } else { |
| close(); |
| } |
| }); |
| } else if (this.upgrading) { |
| waitForUpgrade(); |
| } else { |
| close(); |
| } |
| } |
| |
| function close () { |
| self.onClose('forced close'); |
| debug('socket closing - telling transport to close'); |
| self.transport.close(); |
| } |
| |
| function cleanupAndClose () { |
| self.removeListener('upgrade', cleanupAndClose); |
| self.removeListener('upgradeError', cleanupAndClose); |
| close(); |
| } |
| |
| function waitForUpgrade () { |
| // wait for upgrade to finish since we can't send packets while pausing a transport |
| self.once('upgrade', cleanupAndClose); |
| self.once('upgradeError', cleanupAndClose); |
| } |
| |
| return this; |
| }; |
| |
| /** |
| * Called upon transport error |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.onError = function (err) { |
| debug('socket error %j', err); |
| Socket.priorWebsocketSuccess = false; |
| this.emit('error', err); |
| this.onClose('transport error', err); |
| }; |
| |
| /** |
| * Called upon transport close. |
| * |
| * @api private |
| */ |
| |
| Socket.prototype.onClose = function (reason, desc) { |
| if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) { |
| debug('socket close with reason: "%s"', reason); |
| var self = this; |
| |
| // clear timers |
| clearTimeout(this.pingIntervalTimer); |
| clearTimeout(this.pingTimeoutTimer); |
| |
| // stop event from firing again for transport |
| this.transport.removeAllListeners('close'); |
| |
| // ensure transport won't stay open |
| this.transport.close(); |
| |
| // ignore further transport communication |
| this.transport.removeAllListeners(); |
| |
| // set ready state |
| this.readyState = 'closed'; |
| |
| // clear session id |
| this.id = null; |
| |
| // emit close event |
| this.emit('close', reason, desc); |
| |
| // clean buffers after, so users can still |
| // grab the buffers on `close` event |
| self.writeBuffer = []; |
| self.prevBufferLen = 0; |
| } |
| }; |
| |
| /** |
| * Filters upgrades, returning only those matching client transports. |
| * |
| * @param {Array} server upgrades |
| * @api private |
| * |
| */ |
| |
| Socket.prototype.filterUpgrades = function (upgrades) { |
| var filteredUpgrades = []; |
| for (var i = 0, j = upgrades.length; i < j; i++) { |
| if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]); |
| } |
| return filteredUpgrades; |
| }; |