| 'use strict'; |
| |
| const EventEmitter = require('events'); |
| const crypto = require('crypto'); |
| const https = require('https'); |
| const http = require('http'); |
| const net = require('net'); |
| const tls = require('tls'); |
| const url = require('url'); |
| |
| const PerMessageDeflate = require('./permessage-deflate'); |
| const EventTarget = require('./event-target'); |
| const extension = require('./extension'); |
| const Receiver = require('./receiver'); |
| const Sender = require('./sender'); |
| const { |
| BINARY_TYPES, |
| EMPTY_BUFFER, |
| GUID, |
| kStatusCode, |
| kWebSocket, |
| NOOP |
| } = require('./constants'); |
| |
| const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED']; |
| const protocolVersions = [8, 13]; |
| const closeTimeout = 30 * 1000; |
| |
| /** |
| * Class representing a WebSocket. |
| * |
| * @extends EventEmitter |
| */ |
| class WebSocket extends EventEmitter { |
| /** |
| * Create a new `WebSocket`. |
| * |
| * @param {(String|url.Url|url.URL)} address The URL to which to connect |
| * @param {(String|String[])} protocols The subprotocols |
| * @param {Object} options Connection options |
| */ |
| constructor(address, protocols, options) { |
| super(); |
| |
| this.readyState = WebSocket.CONNECTING; |
| this.protocol = ''; |
| |
| this._binaryType = BINARY_TYPES[0]; |
| this._closeFrameReceived = false; |
| this._closeFrameSent = false; |
| this._closeMessage = ''; |
| this._closeTimer = null; |
| this._closeCode = 1006; |
| this._extensions = {}; |
| this._receiver = null; |
| this._sender = null; |
| this._socket = null; |
| |
| if (address !== null) { |
| this._isServer = false; |
| this._redirects = 0; |
| |
| if (Array.isArray(protocols)) { |
| protocols = protocols.join(', '); |
| } else if (typeof protocols === 'object' && protocols !== null) { |
| options = protocols; |
| protocols = undefined; |
| } |
| |
| initAsClient(this, address, protocols, options); |
| } else { |
| this._isServer = true; |
| } |
| } |
| |
| get CONNECTING() { |
| return WebSocket.CONNECTING; |
| } |
| get CLOSING() { |
| return WebSocket.CLOSING; |
| } |
| get CLOSED() { |
| return WebSocket.CLOSED; |
| } |
| get OPEN() { |
| return WebSocket.OPEN; |
| } |
| |
| /** |
| * This deviates from the WHATWG interface since ws doesn't support the |
| * required default "blob" type (instead we define a custom "nodebuffer" |
| * type). |
| * |
| * @type {String} |
| */ |
| get binaryType() { |
| return this._binaryType; |
| } |
| |
| set binaryType(type) { |
| if (!BINARY_TYPES.includes(type)) return; |
| |
| this._binaryType = type; |
| |
| // |
| // Allow to change `binaryType` on the fly. |
| // |
| if (this._receiver) this._receiver._binaryType = type; |
| } |
| |
| /** |
| * @type {Number} |
| */ |
| get bufferedAmount() { |
| if (!this._socket) return 0; |
| |
| // |
| // `socket.bufferSize` is `undefined` if the socket is closed. |
| // |
| return (this._socket.bufferSize || 0) + this._sender._bufferedBytes; |
| } |
| |
| /** |
| * @type {String} |
| */ |
| get extensions() { |
| return Object.keys(this._extensions).join(); |
| } |
| |
| /** |
| * Set up the socket and the internal resources. |
| * |
| * @param {net.Socket} socket The network socket between the server and client |
| * @param {Buffer} head The first packet of the upgraded stream |
| * @param {Number} maxPayload The maximum allowed message size |
| * @private |
| */ |
| setSocket(socket, head, maxPayload) { |
| const receiver = new Receiver( |
| this._binaryType, |
| this._extensions, |
| maxPayload |
| ); |
| |
| this._sender = new Sender(socket, this._extensions); |
| this._receiver = receiver; |
| this._socket = socket; |
| |
| receiver[kWebSocket] = this; |
| socket[kWebSocket] = this; |
| |
| receiver.on('conclude', receiverOnConclude); |
| receiver.on('drain', receiverOnDrain); |
| receiver.on('error', receiverOnError); |
| receiver.on('message', receiverOnMessage); |
| receiver.on('ping', receiverOnPing); |
| receiver.on('pong', receiverOnPong); |
| |
| socket.setTimeout(0); |
| socket.setNoDelay(); |
| |
| if (head.length > 0) socket.unshift(head); |
| |
| socket.on('close', socketOnClose); |
| socket.on('data', socketOnData); |
| socket.on('end', socketOnEnd); |
| socket.on('error', socketOnError); |
| |
| this.readyState = WebSocket.OPEN; |
| this.emit('open'); |
| } |
| |
| /** |
| * Emit the `'close'` event. |
| * |
| * @private |
| */ |
| emitClose() { |
| this.readyState = WebSocket.CLOSED; |
| |
| if (!this._socket) { |
| this.emit('close', this._closeCode, this._closeMessage); |
| return; |
| } |
| |
| if (this._extensions[PerMessageDeflate.extensionName]) { |
| this._extensions[PerMessageDeflate.extensionName].cleanup(); |
| } |
| |
| this._receiver.removeAllListeners(); |
| this.emit('close', this._closeCode, this._closeMessage); |
| } |
| |
| /** |
| * Start a closing handshake. |
| * |
| * +----------+ +-----------+ +----------+ |
| * - - -|ws.close()|-->|close frame|-->|ws.close()|- - - |
| * | +----------+ +-----------+ +----------+ | |
| * +----------+ +-----------+ | |
| * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING |
| * +----------+ +-----------+ | |
| * | | | +---+ | |
| * +------------------------+-->|fin| - - - - |
| * | +---+ | +---+ |
| * - - - - -|fin|<---------------------+ |
| * +---+ |
| * |
| * @param {Number} code Status code explaining why the connection is closing |
| * @param {String} data A string explaining why the connection is closing |
| * @public |
| */ |
| close(code, data) { |
| if (this.readyState === WebSocket.CLOSED) return; |
| if (this.readyState === WebSocket.CONNECTING) { |
| const msg = 'WebSocket was closed before the connection was established'; |
| return abortHandshake(this, this._req, msg); |
| } |
| |
| if (this.readyState === WebSocket.CLOSING) { |
| if (this._closeFrameSent && this._closeFrameReceived) this._socket.end(); |
| return; |
| } |
| |
| this.readyState = WebSocket.CLOSING; |
| this._sender.close(code, data, !this._isServer, (err) => { |
| // |
| // This error is handled by the `'error'` listener on the socket. We only |
| // want to know if the close frame has been sent here. |
| // |
| if (err) return; |
| |
| this._closeFrameSent = true; |
| if (this._closeFrameReceived) this._socket.end(); |
| }); |
| |
| // |
| // Specify a timeout for the closing handshake to complete. |
| // |
| this._closeTimer = setTimeout( |
| this._socket.destroy.bind(this._socket), |
| closeTimeout |
| ); |
| } |
| |
| /** |
| * Send a ping. |
| * |
| * @param {*} data The data to send |
| * @param {Boolean} mask Indicates whether or not to mask `data` |
| * @param {Function} cb Callback which is executed when the ping is sent |
| * @public |
| */ |
| ping(data, mask, cb) { |
| if (typeof data === 'function') { |
| cb = data; |
| data = mask = undefined; |
| } else if (typeof mask === 'function') { |
| cb = mask; |
| mask = undefined; |
| } |
| |
| if (this.readyState !== WebSocket.OPEN) { |
| const err = new Error( |
| `WebSocket is not open: readyState ${this.readyState} ` + |
| `(${readyStates[this.readyState]})` |
| ); |
| |
| if (cb) return cb(err); |
| throw err; |
| } |
| |
| if (typeof data === 'number') data = data.toString(); |
| if (mask === undefined) mask = !this._isServer; |
| this._sender.ping(data || EMPTY_BUFFER, mask, cb); |
| } |
| |
| /** |
| * Send a pong. |
| * |
| * @param {*} data The data to send |
| * @param {Boolean} mask Indicates whether or not to mask `data` |
| * @param {Function} cb Callback which is executed when the pong is sent |
| * @public |
| */ |
| pong(data, mask, cb) { |
| if (typeof data === 'function') { |
| cb = data; |
| data = mask = undefined; |
| } else if (typeof mask === 'function') { |
| cb = mask; |
| mask = undefined; |
| } |
| |
| if (this.readyState !== WebSocket.OPEN) { |
| const err = new Error( |
| `WebSocket is not open: readyState ${this.readyState} ` + |
| `(${readyStates[this.readyState]})` |
| ); |
| |
| if (cb) return cb(err); |
| throw err; |
| } |
| |
| if (typeof data === 'number') data = data.toString(); |
| if (mask === undefined) mask = !this._isServer; |
| this._sender.pong(data || EMPTY_BUFFER, mask, cb); |
| } |
| |
| /** |
| * Send a data message. |
| * |
| * @param {*} data The message to send |
| * @param {Object} options Options object |
| * @param {Boolean} options.compress Specifies whether or not to compress `data` |
| * @param {Boolean} options.binary Specifies whether `data` is binary or text |
| * @param {Boolean} options.fin Specifies whether the fragment is the last one |
| * @param {Boolean} options.mask Specifies whether or not to mask `data` |
| * @param {Function} cb Callback which is executed when data is written out |
| * @public |
| */ |
| send(data, options, cb) { |
| if (typeof options === 'function') { |
| cb = options; |
| options = {}; |
| } |
| |
| if (this.readyState !== WebSocket.OPEN) { |
| const err = new Error( |
| `WebSocket is not open: readyState ${this.readyState} ` + |
| `(${readyStates[this.readyState]})` |
| ); |
| |
| if (cb) return cb(err); |
| throw err; |
| } |
| |
| if (typeof data === 'number') data = data.toString(); |
| |
| const opts = Object.assign( |
| { |
| binary: typeof data !== 'string', |
| mask: !this._isServer, |
| compress: true, |
| fin: true |
| }, |
| options |
| ); |
| |
| if (!this._extensions[PerMessageDeflate.extensionName]) { |
| opts.compress = false; |
| } |
| |
| this._sender.send(data || EMPTY_BUFFER, opts, cb); |
| } |
| |
| /** |
| * Forcibly close the connection. |
| * |
| * @public |
| */ |
| terminate() { |
| if (this.readyState === WebSocket.CLOSED) return; |
| if (this.readyState === WebSocket.CONNECTING) { |
| const msg = 'WebSocket was closed before the connection was established'; |
| return abortHandshake(this, this._req, msg); |
| } |
| |
| if (this._socket) { |
| this.readyState = WebSocket.CLOSING; |
| this._socket.destroy(); |
| } |
| } |
| } |
| |
| readyStates.forEach((readyState, i) => { |
| WebSocket[readyState] = i; |
| }); |
| |
| // |
| // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes. |
| // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface |
| // |
| ['open', 'error', 'close', 'message'].forEach((method) => { |
| Object.defineProperty(WebSocket.prototype, `on${method}`, { |
| /** |
| * Return the listener of the event. |
| * |
| * @return {(Function|undefined)} The event listener or `undefined` |
| * @public |
| */ |
| get() { |
| const listeners = this.listeners(method); |
| for (var i = 0; i < listeners.length; i++) { |
| if (listeners[i]._listener) return listeners[i]._listener; |
| } |
| |
| return undefined; |
| }, |
| /** |
| * Add a listener for the event. |
| * |
| * @param {Function} listener The listener to add |
| * @public |
| */ |
| set(listener) { |
| const listeners = this.listeners(method); |
| for (var i = 0; i < listeners.length; i++) { |
| // |
| // Remove only the listeners added via `addEventListener`. |
| // |
| if (listeners[i]._listener) this.removeListener(method, listeners[i]); |
| } |
| this.addEventListener(method, listener); |
| } |
| }); |
| }); |
| |
| WebSocket.prototype.addEventListener = EventTarget.addEventListener; |
| WebSocket.prototype.removeEventListener = EventTarget.removeEventListener; |
| |
| module.exports = WebSocket; |
| |
| /** |
| * Initialize a WebSocket client. |
| * |
| * @param {WebSocket} websocket The client to initialize |
| * @param {(String|url.Url|url.URL)} address The URL to which to connect |
| * @param {String} protocols The subprotocols |
| * @param {Object} options Connection options |
| * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable |
| * permessage-deflate |
| * @param {Number} options.handshakeTimeout Timeout in milliseconds for the |
| * handshake request |
| * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` |
| * header |
| * @param {String} options.origin Value of the `Origin` or |
| * `Sec-WebSocket-Origin` header |
| * @param {Number} options.maxPayload The maximum allowed message size |
| * @param {Boolean} options.followRedirects Whether or not to follow redirects |
| * @param {Number} options.maxRedirects The maximum number of redirects allowed |
| * @private |
| */ |
| function initAsClient(websocket, address, protocols, options) { |
| const opts = Object.assign( |
| { |
| protocolVersion: protocolVersions[1], |
| maxPayload: 100 * 1024 * 1024, |
| perMessageDeflate: true, |
| followRedirects: false, |
| maxRedirects: 10 |
| }, |
| options, |
| { |
| createConnection: undefined, |
| socketPath: undefined, |
| hostname: undefined, |
| protocol: undefined, |
| timeout: undefined, |
| method: undefined, |
| auth: undefined, |
| host: undefined, |
| path: undefined, |
| port: undefined |
| } |
| ); |
| |
| if (!protocolVersions.includes(opts.protocolVersion)) { |
| throw new RangeError( |
| `Unsupported protocol version: ${opts.protocolVersion} ` + |
| `(supported versions: ${protocolVersions.join(', ')})` |
| ); |
| } |
| |
| var parsedUrl; |
| |
| if (typeof address === 'object' && address.href !== undefined) { |
| parsedUrl = address; |
| websocket.url = address.href; |
| } else { |
| // |
| // The WHATWG URL constructor is not available on Node.js < 6.13.0 |
| // |
| parsedUrl = url.URL ? new url.URL(address) : url.parse(address); |
| websocket.url = address; |
| } |
| |
| const isUnixSocket = parsedUrl.protocol === 'ws+unix:'; |
| |
| if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) { |
| throw new Error(`Invalid URL: ${websocket.url}`); |
| } |
| |
| const isSecure = |
| parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:'; |
| const defaultPort = isSecure ? 443 : 80; |
| const key = crypto.randomBytes(16).toString('base64'); |
| const get = isSecure ? https.get : http.get; |
| const path = parsedUrl.search |
| ? `${parsedUrl.pathname || '/'}${parsedUrl.search}` |
| : parsedUrl.pathname || '/'; |
| var perMessageDeflate; |
| |
| opts.createConnection = isSecure ? tlsConnect : netConnect; |
| opts.defaultPort = opts.defaultPort || defaultPort; |
| opts.port = parsedUrl.port || defaultPort; |
| opts.host = parsedUrl.hostname.startsWith('[') |
| ? parsedUrl.hostname.slice(1, -1) |
| : parsedUrl.hostname; |
| opts.headers = Object.assign( |
| { |
| 'Sec-WebSocket-Version': opts.protocolVersion, |
| 'Sec-WebSocket-Key': key, |
| Connection: 'Upgrade', |
| Upgrade: 'websocket' |
| }, |
| opts.headers |
| ); |
| opts.path = path; |
| opts.timeout = opts.handshakeTimeout; |
| |
| if (opts.perMessageDeflate) { |
| perMessageDeflate = new PerMessageDeflate( |
| opts.perMessageDeflate !== true ? opts.perMessageDeflate : {}, |
| false, |
| opts.maxPayload |
| ); |
| opts.headers['Sec-WebSocket-Extensions'] = extension.format({ |
| [PerMessageDeflate.extensionName]: perMessageDeflate.offer() |
| }); |
| } |
| if (protocols) { |
| opts.headers['Sec-WebSocket-Protocol'] = protocols; |
| } |
| if (opts.origin) { |
| if (opts.protocolVersion < 13) { |
| opts.headers['Sec-WebSocket-Origin'] = opts.origin; |
| } else { |
| opts.headers.Origin = opts.origin; |
| } |
| } |
| if (parsedUrl.auth) { |
| opts.auth = parsedUrl.auth; |
| } else if (parsedUrl.username || parsedUrl.password) { |
| opts.auth = `${parsedUrl.username}:${parsedUrl.password}`; |
| } |
| |
| if (isUnixSocket) { |
| const parts = path.split(':'); |
| |
| opts.socketPath = parts[0]; |
| opts.path = parts[1]; |
| } |
| |
| var req = (websocket._req = get(opts)); |
| |
| if (opts.timeout) { |
| req.on('timeout', () => { |
| abortHandshake(websocket, req, 'Opening handshake has timed out'); |
| }); |
| } |
| |
| req.on('error', (err) => { |
| if (websocket._req.aborted) return; |
| |
| req = websocket._req = null; |
| websocket.readyState = WebSocket.CLOSING; |
| websocket.emit('error', err); |
| websocket.emitClose(); |
| }); |
| |
| req.on('response', (res) => { |
| const location = res.headers.location; |
| const statusCode = res.statusCode; |
| |
| if ( |
| location && |
| opts.followRedirects && |
| statusCode >= 300 && |
| statusCode < 400 |
| ) { |
| if (++websocket._redirects > opts.maxRedirects) { |
| abortHandshake(websocket, req, 'Maximum redirects exceeded'); |
| return; |
| } |
| |
| req.abort(); |
| |
| const addr = url.URL |
| ? new url.URL(location, address) |
| : url.resolve(address, location); |
| |
| initAsClient(websocket, addr, protocols, options); |
| } else if (!websocket.emit('unexpected-response', req, res)) { |
| abortHandshake( |
| websocket, |
| req, |
| `Unexpected server response: ${res.statusCode}` |
| ); |
| } |
| }); |
| |
| req.on('upgrade', (res, socket, head) => { |
| websocket.emit('upgrade', res); |
| |
| // |
| // The user may have closed the connection from a listener of the `upgrade` |
| // event. |
| // |
| if (websocket.readyState !== WebSocket.CONNECTING) return; |
| |
| req = websocket._req = null; |
| |
| const digest = crypto |
| .createHash('sha1') |
| .update(key + GUID) |
| .digest('base64'); |
| |
| if (res.headers['sec-websocket-accept'] !== digest) { |
| abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header'); |
| return; |
| } |
| |
| const serverProt = res.headers['sec-websocket-protocol']; |
| const protList = (protocols || '').split(/, */); |
| var protError; |
| |
| if (!protocols && serverProt) { |
| protError = 'Server sent a subprotocol but none was requested'; |
| } else if (protocols && !serverProt) { |
| protError = 'Server sent no subprotocol'; |
| } else if (serverProt && !protList.includes(serverProt)) { |
| protError = 'Server sent an invalid subprotocol'; |
| } |
| |
| if (protError) { |
| abortHandshake(websocket, socket, protError); |
| return; |
| } |
| |
| if (serverProt) websocket.protocol = serverProt; |
| |
| if (perMessageDeflate) { |
| try { |
| const extensions = extension.parse( |
| res.headers['sec-websocket-extensions'] |
| ); |
| |
| if (extensions[PerMessageDeflate.extensionName]) { |
| perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]); |
| websocket._extensions[ |
| PerMessageDeflate.extensionName |
| ] = perMessageDeflate; |
| } |
| } catch (err) { |
| abortHandshake( |
| websocket, |
| socket, |
| 'Invalid Sec-WebSocket-Extensions header' |
| ); |
| return; |
| } |
| } |
| |
| websocket.setSocket(socket, head, opts.maxPayload); |
| }); |
| } |
| |
| /** |
| * Create a `net.Socket` and initiate a connection. |
| * |
| * @param {Object} options Connection options |
| * @return {net.Socket} The newly created socket used to start the connection |
| * @private |
| */ |
| function netConnect(options) { |
| // |
| // Override `options.path` only if `options` is a copy of the original options |
| // object. This is always true on Node.js >= 8 but not on Node.js 6 where |
| // `options.socketPath` might be `undefined` even if the `socketPath` option |
| // was originally set. |
| // |
| if (options.protocolVersion) options.path = options.socketPath; |
| return net.connect(options); |
| } |
| |
| /** |
| * Create a `tls.TLSSocket` and initiate a connection. |
| * |
| * @param {Object} options Connection options |
| * @return {tls.TLSSocket} The newly created socket used to start the connection |
| * @private |
| */ |
| function tlsConnect(options) { |
| options.path = undefined; |
| options.servername = options.servername || options.host; |
| return tls.connect(options); |
| } |
| |
| /** |
| * Abort the handshake and emit an error. |
| * |
| * @param {WebSocket} websocket The WebSocket instance |
| * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the |
| * socket to destroy |
| * @param {String} message The error message |
| * @private |
| */ |
| function abortHandshake(websocket, stream, message) { |
| websocket.readyState = WebSocket.CLOSING; |
| |
| const err = new Error(message); |
| Error.captureStackTrace(err, abortHandshake); |
| |
| if (stream.setHeader) { |
| stream.abort(); |
| stream.once('abort', websocket.emitClose.bind(websocket)); |
| websocket.emit('error', err); |
| } else { |
| stream.destroy(err); |
| stream.once('error', websocket.emit.bind(websocket, 'error')); |
| stream.once('close', websocket.emitClose.bind(websocket)); |
| } |
| } |
| |
| /** |
| * The listener of the `Receiver` `'conclude'` event. |
| * |
| * @param {Number} code The status code |
| * @param {String} reason The reason for closing |
| * @private |
| */ |
| function receiverOnConclude(code, reason) { |
| const websocket = this[kWebSocket]; |
| |
| websocket._socket.removeListener('data', socketOnData); |
| websocket._socket.resume(); |
| |
| websocket._closeFrameReceived = true; |
| websocket._closeMessage = reason; |
| websocket._closeCode = code; |
| |
| if (code === 1005) websocket.close(); |
| else websocket.close(code, reason); |
| } |
| |
| /** |
| * The listener of the `Receiver` `'drain'` event. |
| * |
| * @private |
| */ |
| function receiverOnDrain() { |
| this[kWebSocket]._socket.resume(); |
| } |
| |
| /** |
| * The listener of the `Receiver` `'error'` event. |
| * |
| * @param {(RangeError|Error)} err The emitted error |
| * @private |
| */ |
| function receiverOnError(err) { |
| const websocket = this[kWebSocket]; |
| |
| websocket._socket.removeListener('data', socketOnData); |
| |
| websocket.readyState = WebSocket.CLOSING; |
| websocket._closeCode = err[kStatusCode]; |
| websocket.emit('error', err); |
| websocket._socket.destroy(); |
| } |
| |
| /** |
| * The listener of the `Receiver` `'finish'` event. |
| * |
| * @private |
| */ |
| function receiverOnFinish() { |
| this[kWebSocket].emitClose(); |
| } |
| |
| /** |
| * The listener of the `Receiver` `'message'` event. |
| * |
| * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message |
| * @private |
| */ |
| function receiverOnMessage(data) { |
| this[kWebSocket].emit('message', data); |
| } |
| |
| /** |
| * The listener of the `Receiver` `'ping'` event. |
| * |
| * @param {Buffer} data The data included in the ping frame |
| * @private |
| */ |
| function receiverOnPing(data) { |
| const websocket = this[kWebSocket]; |
| |
| websocket.pong(data, !websocket._isServer, NOOP); |
| websocket.emit('ping', data); |
| } |
| |
| /** |
| * The listener of the `Receiver` `'pong'` event. |
| * |
| * @param {Buffer} data The data included in the pong frame |
| * @private |
| */ |
| function receiverOnPong(data) { |
| this[kWebSocket].emit('pong', data); |
| } |
| |
| /** |
| * The listener of the `net.Socket` `'close'` event. |
| * |
| * @private |
| */ |
| function socketOnClose() { |
| const websocket = this[kWebSocket]; |
| |
| this.removeListener('close', socketOnClose); |
| this.removeListener('end', socketOnEnd); |
| |
| websocket.readyState = WebSocket.CLOSING; |
| |
| // |
| // The close frame might not have been received or the `'end'` event emitted, |
| // for example, if the socket was destroyed due to an error. Ensure that the |
| // `receiver` stream is closed after writing any remaining buffered data to |
| // it. If the readable side of the socket is in flowing mode then there is no |
| // buffered data as everything has been already written and `readable.read()` |
| // will return `null`. If instead, the socket is paused, any possible buffered |
| // data will be read as a single chunk and emitted synchronously in a single |
| // `'data'` event. |
| // |
| websocket._socket.read(); |
| websocket._receiver.end(); |
| |
| this.removeListener('data', socketOnData); |
| this[kWebSocket] = undefined; |
| |
| clearTimeout(websocket._closeTimer); |
| |
| if ( |
| websocket._receiver._writableState.finished || |
| websocket._receiver._writableState.errorEmitted |
| ) { |
| websocket.emitClose(); |
| } else { |
| websocket._receiver.on('error', receiverOnFinish); |
| websocket._receiver.on('finish', receiverOnFinish); |
| } |
| } |
| |
| /** |
| * The listener of the `net.Socket` `'data'` event. |
| * |
| * @param {Buffer} chunk A chunk of data |
| * @private |
| */ |
| function socketOnData(chunk) { |
| if (!this[kWebSocket]._receiver.write(chunk)) { |
| this.pause(); |
| } |
| } |
| |
| /** |
| * The listener of the `net.Socket` `'end'` event. |
| * |
| * @private |
| */ |
| function socketOnEnd() { |
| const websocket = this[kWebSocket]; |
| |
| websocket.readyState = WebSocket.CLOSING; |
| websocket._receiver.end(); |
| this.end(); |
| } |
| |
| /** |
| * The listener of the `net.Socket` `'error'` event. |
| * |
| * @private |
| */ |
| function socketOnError() { |
| const websocket = this[kWebSocket]; |
| |
| this.removeListener('error', socketOnError); |
| this.on('error', NOOP); |
| |
| websocket.readyState = WebSocket.CLOSING; |
| this.destroy(); |
| } |