blob: 140e003894dafe5a4acc6ce5ff1d455663d53165 [file] [log] [blame]
var fs = require('fs');
var util = require('util');
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var PassThrough = stream.PassThrough;
var Pend = require('pend');
var EventEmitter = require('events').EventEmitter;
exports.createFromBuffer = createFromBuffer;
exports.createFromFd = createFromFd;
exports.BufferSlicer = BufferSlicer;
exports.FdSlicer = FdSlicer;
util.inherits(FdSlicer, EventEmitter);
function FdSlicer(fd, options) {
options = options || {};
EventEmitter.call(this);
this.fd = fd;
this.pend = new Pend();
this.pend.max = 1;
this.refCount = 0;
this.autoClose = !!options.autoClose;
}
FdSlicer.prototype.read = function(buffer, offset, length, position, callback) {
var self = this;
self.pend.go(function(cb) {
fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) {
cb();
callback(err, bytesRead, buffer);
});
});
};
FdSlicer.prototype.write = function(buffer, offset, length, position, callback) {
var self = this;
self.pend.go(function(cb) {
fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) {
cb();
callback(err, written, buffer);
});
});
};
FdSlicer.prototype.createReadStream = function(options) {
return new ReadStream(this, options);
};
FdSlicer.prototype.createWriteStream = function(options) {
return new WriteStream(this, options);
};
FdSlicer.prototype.ref = function() {
this.refCount += 1;
};
FdSlicer.prototype.unref = function() {
var self = this;
self.refCount -= 1;
if (self.refCount > 0) return;
if (self.refCount < 0) throw new Error("invalid unref");
if (self.autoClose) {
fs.close(self.fd, onCloseDone);
}
function onCloseDone(err) {
if (err) {
self.emit('error', err);
} else {
self.emit('close');
}
}
};
util.inherits(ReadStream, Readable);
function ReadStream(context, options) {
options = options || {};
Readable.call(this, options);
this.context = context;
this.context.ref();
this.start = options.start || 0;
this.endOffset = options.end;
this.pos = this.start;
this.destroyed = false;
}
ReadStream.prototype._read = function(n) {
var self = this;
if (self.destroyed) return;
var toRead = Math.min(self._readableState.highWaterMark, n);
if (self.endOffset != null) {
toRead = Math.min(toRead, self.endOffset - self.pos);
}
if (toRead <= 0) {
self.destroyed = true;
self.push(null);
self.context.unref();
return;
}
self.context.pend.go(function(cb) {
if (self.destroyed) return cb();
var buffer = new Buffer(toRead);
fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) {
if (err) {
self.destroy(err);
} else if (bytesRead === 0) {
self.destroyed = true;
self.push(null);
self.context.unref();
} else {
self.pos += bytesRead;
self.push(buffer.slice(0, bytesRead));
}
cb();
});
});
};
ReadStream.prototype.destroy = function(err) {
if (this.destroyed) return;
err = err || new Error("stream destroyed");
this.destroyed = true;
this.emit('error', err);
this.context.unref();
};
util.inherits(WriteStream, Writable);
function WriteStream(context, options) {
options = options || {};
Writable.call(this, options);
this.context = context;
this.context.ref();
this.start = options.start || 0;
this.endOffset = (options.end == null) ? Infinity : +options.end;
this.bytesWritten = 0;
this.pos = this.start;
this.destroyed = false;
this.on('finish', this.destroy.bind(this));
}
WriteStream.prototype._write = function(buffer, encoding, callback) {
var self = this;
if (self.destroyed) return;
if (self.pos + buffer.length > self.endOffset) {
var err = new Error("maximum file length exceeded");
err.code = 'ETOOBIG';
self.destroy();
callback(err);
return;
}
self.context.pend.go(function(cb) {
if (self.destroyed) return cb();
fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) {
if (err) {
self.destroy();
cb();
callback(err);
} else {
self.bytesWritten += bytes;
self.pos += bytes;
self.emit('progress');
cb();
callback();
}
});
});
};
WriteStream.prototype.destroy = function() {
if (this.destroyed) return;
this.destroyed = true;
this.context.unref();
};
util.inherits(BufferSlicer, EventEmitter);
function BufferSlicer(buffer) {
EventEmitter.call(this);
this.refCount = 0;
this.buffer = buffer;
}
BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) {
var end = position + length;
var delta = end - this.buffer.length;
var written = (delta > 0) ? delta : length;
this.buffer.copy(buffer, offset, position, end);
setImmediate(function() {
callback(null, written);
});
};
BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) {
buffer.copy(this.buffer, position, offset, offset + length);
setImmediate(function() {
callback(null, length, buffer);
});
};
BufferSlicer.prototype.createReadStream = function(options) {
options = options || {};
var readStream = new PassThrough(options);
readStream.start = options.start || 0;
readStream.endOffset = options.end;
readStream.pos = readStream.endOffset || this.buffer.length; // yep, we're already done
readStream.destroyed = false;
readStream.write(this.buffer.slice(readStream.start, readStream.pos));
readStream.end();
readStream.destroy = function() {
readStream.destroyed = true;
};
return readStream;
};
BufferSlicer.prototype.createWriteStream = function(options) {
var bufferSlicer = this;
options = options || {};
var writeStream = new Writable(options);
writeStream.start = options.start || 0;
writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end;
writeStream.bytesWritten = 0;
writeStream.pos = writeStream.start;
writeStream.destroyed = false;
writeStream._write = function(buffer, encoding, callback) {
if (writeStream.destroyed) return;
var end = writeStream.pos + buffer.length;
if (end > writeStream.endOffset) {
var err = new Error("maximum file length exceeded");
err.code = 'ETOOBIG';
writeStream.destroyed = true;
callback(err);
return;
}
buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length);
writeStream.bytesWritten += buffer.length;
writeStream.pos = end;
writeStream.emit('progress');
callback();
};
writeStream.destroy = function() {
writeStream.destroyed = true;
};
return writeStream;
};
BufferSlicer.prototype.ref = function() {
this.refCount += 1;
};
BufferSlicer.prototype.unref = function() {
this.refCount -= 1;
if (this.refCount < 0) {
throw new Error("invalid unref");
}
};
function createFromBuffer(buffer) {
return new BufferSlicer(buffer);
}
function createFromFd(fd, options) {
return new FdSlicer(fd, options);
}