| var fs = require('fs'); |
| var util = require('util'); |
| var stream = require('stream'); |
| var Readable = stream.Readable; |
| var Writable = stream.Writable; |
| var PassThrough = stream.PassThrough; |
| var Pend = require('pend'); |
| var EventEmitter = require('events').EventEmitter; |
| |
| exports.createFromBuffer = createFromBuffer; |
| exports.createFromFd = createFromFd; |
| exports.BufferSlicer = BufferSlicer; |
| exports.FdSlicer = FdSlicer; |
| |
| util.inherits(FdSlicer, EventEmitter); |
| function FdSlicer(fd, options) { |
| options = options || {}; |
| EventEmitter.call(this); |
| |
| this.fd = fd; |
| this.pend = new Pend(); |
| this.pend.max = 1; |
| this.refCount = 0; |
| this.autoClose = !!options.autoClose; |
| } |
| |
| FdSlicer.prototype.read = function(buffer, offset, length, position, callback) { |
| var self = this; |
| self.pend.go(function(cb) { |
| fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) { |
| cb(); |
| callback(err, bytesRead, buffer); |
| }); |
| }); |
| }; |
| |
| FdSlicer.prototype.write = function(buffer, offset, length, position, callback) { |
| var self = this; |
| self.pend.go(function(cb) { |
| fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) { |
| cb(); |
| callback(err, written, buffer); |
| }); |
| }); |
| }; |
| |
| FdSlicer.prototype.createReadStream = function(options) { |
| return new ReadStream(this, options); |
| }; |
| |
| FdSlicer.prototype.createWriteStream = function(options) { |
| return new WriteStream(this, options); |
| }; |
| |
| FdSlicer.prototype.ref = function() { |
| this.refCount += 1; |
| }; |
| |
| FdSlicer.prototype.unref = function() { |
| var self = this; |
| self.refCount -= 1; |
| |
| if (self.refCount > 0) return; |
| if (self.refCount < 0) throw new Error("invalid unref"); |
| |
| if (self.autoClose) { |
| fs.close(self.fd, onCloseDone); |
| } |
| |
| function onCloseDone(err) { |
| if (err) { |
| self.emit('error', err); |
| } else { |
| self.emit('close'); |
| } |
| } |
| }; |
| |
| util.inherits(ReadStream, Readable); |
| function ReadStream(context, options) { |
| options = options || {}; |
| Readable.call(this, options); |
| |
| this.context = context; |
| this.context.ref(); |
| |
| this.start = options.start || 0; |
| this.endOffset = options.end; |
| this.pos = this.start; |
| this.destroyed = false; |
| } |
| |
| ReadStream.prototype._read = function(n) { |
| var self = this; |
| if (self.destroyed) return; |
| |
| var toRead = Math.min(self._readableState.highWaterMark, n); |
| if (self.endOffset != null) { |
| toRead = Math.min(toRead, self.endOffset - self.pos); |
| } |
| if (toRead <= 0) { |
| self.destroyed = true; |
| self.push(null); |
| self.context.unref(); |
| return; |
| } |
| self.context.pend.go(function(cb) { |
| if (self.destroyed) return cb(); |
| var buffer = new Buffer(toRead); |
| fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) { |
| if (err) { |
| self.destroy(err); |
| } else if (bytesRead === 0) { |
| self.destroyed = true; |
| self.push(null); |
| self.context.unref(); |
| } else { |
| self.pos += bytesRead; |
| self.push(buffer.slice(0, bytesRead)); |
| } |
| cb(); |
| }); |
| }); |
| }; |
| |
| ReadStream.prototype.destroy = function(err) { |
| if (this.destroyed) return; |
| err = err || new Error("stream destroyed"); |
| this.destroyed = true; |
| this.emit('error', err); |
| this.context.unref(); |
| }; |
| |
| util.inherits(WriteStream, Writable); |
| function WriteStream(context, options) { |
| options = options || {}; |
| Writable.call(this, options); |
| |
| this.context = context; |
| this.context.ref(); |
| |
| this.start = options.start || 0; |
| this.endOffset = (options.end == null) ? Infinity : +options.end; |
| this.bytesWritten = 0; |
| this.pos = this.start; |
| this.destroyed = false; |
| |
| this.on('finish', this.destroy.bind(this)); |
| } |
| |
| WriteStream.prototype._write = function(buffer, encoding, callback) { |
| var self = this; |
| if (self.destroyed) return; |
| |
| if (self.pos + buffer.length > self.endOffset) { |
| var err = new Error("maximum file length exceeded"); |
| err.code = 'ETOOBIG'; |
| self.destroy(); |
| callback(err); |
| return; |
| } |
| self.context.pend.go(function(cb) { |
| if (self.destroyed) return cb(); |
| fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) { |
| if (err) { |
| self.destroy(); |
| cb(); |
| callback(err); |
| } else { |
| self.bytesWritten += bytes; |
| self.pos += bytes; |
| self.emit('progress'); |
| cb(); |
| callback(); |
| } |
| }); |
| }); |
| }; |
| |
| WriteStream.prototype.destroy = function() { |
| if (this.destroyed) return; |
| this.destroyed = true; |
| this.context.unref(); |
| }; |
| |
| util.inherits(BufferSlicer, EventEmitter); |
| function BufferSlicer(buffer) { |
| EventEmitter.call(this); |
| |
| this.refCount = 0; |
| this.buffer = buffer; |
| } |
| |
| BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) { |
| var end = position + length; |
| var delta = end - this.buffer.length; |
| var written = (delta > 0) ? delta : length; |
| this.buffer.copy(buffer, offset, position, end); |
| setImmediate(function() { |
| callback(null, written); |
| }); |
| }; |
| |
| BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) { |
| buffer.copy(this.buffer, position, offset, offset + length); |
| setImmediate(function() { |
| callback(null, length, buffer); |
| }); |
| }; |
| |
| BufferSlicer.prototype.createReadStream = function(options) { |
| options = options || {}; |
| var readStream = new PassThrough(options); |
| readStream.start = options.start || 0; |
| readStream.endOffset = options.end; |
| readStream.pos = readStream.endOffset || this.buffer.length; // yep, we're already done |
| readStream.destroyed = false; |
| readStream.write(this.buffer.slice(readStream.start, readStream.pos)); |
| readStream.end(); |
| readStream.destroy = function() { |
| readStream.destroyed = true; |
| }; |
| return readStream; |
| }; |
| |
| BufferSlicer.prototype.createWriteStream = function(options) { |
| var bufferSlicer = this; |
| options = options || {}; |
| var writeStream = new Writable(options); |
| writeStream.start = options.start || 0; |
| writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end; |
| writeStream.bytesWritten = 0; |
| writeStream.pos = writeStream.start; |
| writeStream.destroyed = false; |
| writeStream._write = function(buffer, encoding, callback) { |
| if (writeStream.destroyed) return; |
| |
| var end = writeStream.pos + buffer.length; |
| if (end > writeStream.endOffset) { |
| var err = new Error("maximum file length exceeded"); |
| err.code = 'ETOOBIG'; |
| writeStream.destroyed = true; |
| callback(err); |
| return; |
| } |
| buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length); |
| |
| writeStream.bytesWritten += buffer.length; |
| writeStream.pos = end; |
| writeStream.emit('progress'); |
| callback(); |
| }; |
| writeStream.destroy = function() { |
| writeStream.destroyed = true; |
| }; |
| return writeStream; |
| }; |
| |
| BufferSlicer.prototype.ref = function() { |
| this.refCount += 1; |
| }; |
| |
| BufferSlicer.prototype.unref = function() { |
| this.refCount -= 1; |
| |
| if (this.refCount < 0) { |
| throw new Error("invalid unref"); |
| } |
| }; |
| |
| function createFromBuffer(buffer) { |
| return new BufferSlicer(buffer); |
| } |
| |
| function createFromFd(fd, options) { |
| return new FdSlicer(fd, options); |
| } |