Spaces:
Runtime error
Runtime error
var fs = require('fs'); | |
var util = require('util'); | |
var stream = require('stream'); | |
var Readable = stream.Readable; | |
var Writable = stream.Writable; | |
var PassThrough = stream.PassThrough; | |
var Pend = require('pend'); | |
var EventEmitter = require('events').EventEmitter; | |
exports.createFromBuffer = createFromBuffer; | |
exports.createFromFd = createFromFd; | |
exports.BufferSlicer = BufferSlicer; | |
exports.FdSlicer = FdSlicer; | |
util.inherits(FdSlicer, EventEmitter); | |
function FdSlicer(fd, options) { | |
options = options || {}; | |
EventEmitter.call(this); | |
this.fd = fd; | |
this.pend = new Pend(); | |
this.pend.max = 1; | |
this.refCount = 0; | |
this.autoClose = !!options.autoClose; | |
} | |
FdSlicer.prototype.read = function(buffer, offset, length, position, callback) { | |
var self = this; | |
self.pend.go(function(cb) { | |
fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) { | |
cb(); | |
callback(err, bytesRead, buffer); | |
}); | |
}); | |
}; | |
FdSlicer.prototype.write = function(buffer, offset, length, position, callback) { | |
var self = this; | |
self.pend.go(function(cb) { | |
fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) { | |
cb(); | |
callback(err, written, buffer); | |
}); | |
}); | |
}; | |
FdSlicer.prototype.createReadStream = function(options) { | |
return new ReadStream(this, options); | |
}; | |
FdSlicer.prototype.createWriteStream = function(options) { | |
return new WriteStream(this, options); | |
}; | |
FdSlicer.prototype.ref = function() { | |
this.refCount += 1; | |
}; | |
FdSlicer.prototype.unref = function() { | |
var self = this; | |
self.refCount -= 1; | |
if (self.refCount > 0) return; | |
if (self.refCount < 0) throw new Error("invalid unref"); | |
if (self.autoClose) { | |
fs.close(self.fd, onCloseDone); | |
} | |
function onCloseDone(err) { | |
if (err) { | |
self.emit('error', err); | |
} else { | |
self.emit('close'); | |
} | |
} | |
}; | |
util.inherits(ReadStream, Readable); | |
function ReadStream(context, options) { | |
options = options || {}; | |
Readable.call(this, options); | |
this.context = context; | |
this.context.ref(); | |
this.start = options.start || 0; | |
this.endOffset = options.end; | |
this.pos = this.start; | |
this.destroyed = false; | |
} | |
ReadStream.prototype._read = function(n) { | |
var self = this; | |
if (self.destroyed) return; | |
var toRead = Math.min(self._readableState.highWaterMark, n); | |
if (self.endOffset != null) { | |
toRead = Math.min(toRead, self.endOffset - self.pos); | |
} | |
if (toRead <= 0) { | |
self.destroyed = true; | |
self.push(null); | |
self.context.unref(); | |
return; | |
} | |
self.context.pend.go(function(cb) { | |
if (self.destroyed) return cb(); | |
var buffer = new Buffer(toRead); | |
fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) { | |
if (err) { | |
self.destroy(err); | |
} else if (bytesRead === 0) { | |
self.destroyed = true; | |
self.push(null); | |
self.context.unref(); | |
} else { | |
self.pos += bytesRead; | |
self.push(buffer.slice(0, bytesRead)); | |
} | |
cb(); | |
}); | |
}); | |
}; | |
ReadStream.prototype.destroy = function(err) { | |
if (this.destroyed) return; | |
err = err || new Error("stream destroyed"); | |
this.destroyed = true; | |
this.emit('error', err); | |
this.context.unref(); | |
}; | |
util.inherits(WriteStream, Writable); | |
function WriteStream(context, options) { | |
options = options || {}; | |
Writable.call(this, options); | |
this.context = context; | |
this.context.ref(); | |
this.start = options.start || 0; | |
this.endOffset = (options.end == null) ? Infinity : +options.end; | |
this.bytesWritten = 0; | |
this.pos = this.start; | |
this.destroyed = false; | |
this.on('finish', this.destroy.bind(this)); | |
} | |
WriteStream.prototype._write = function(buffer, encoding, callback) { | |
var self = this; | |
if (self.destroyed) return; | |
if (self.pos + buffer.length > self.endOffset) { | |
var err = new Error("maximum file length exceeded"); | |
err.code = 'ETOOBIG'; | |
self.destroy(); | |
callback(err); | |
return; | |
} | |
self.context.pend.go(function(cb) { | |
if (self.destroyed) return cb(); | |
fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) { | |
if (err) { | |
self.destroy(); | |
cb(); | |
callback(err); | |
} else { | |
self.bytesWritten += bytes; | |
self.pos += bytes; | |
self.emit('progress'); | |
cb(); | |
callback(); | |
} | |
}); | |
}); | |
}; | |
WriteStream.prototype.destroy = function() { | |
if (this.destroyed) return; | |
this.destroyed = true; | |
this.context.unref(); | |
}; | |
util.inherits(BufferSlicer, EventEmitter); | |
function BufferSlicer(buffer, options) { | |
EventEmitter.call(this); | |
options = options || {}; | |
this.refCount = 0; | |
this.buffer = buffer; | |
this.maxChunkSize = options.maxChunkSize || Number.MAX_SAFE_INTEGER; | |
} | |
BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) { | |
var end = position + length; | |
var delta = end - this.buffer.length; | |
var written = (delta > 0) ? delta : length; | |
this.buffer.copy(buffer, offset, position, end); | |
setImmediate(function() { | |
callback(null, written); | |
}); | |
}; | |
BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) { | |
buffer.copy(this.buffer, position, offset, offset + length); | |
setImmediate(function() { | |
callback(null, length, buffer); | |
}); | |
}; | |
BufferSlicer.prototype.createReadStream = function(options) { | |
options = options || {}; | |
var readStream = new PassThrough(options); | |
readStream.destroyed = false; | |
readStream.start = options.start || 0; | |
readStream.endOffset = options.end; | |
// by the time this function returns, we'll be done. | |
readStream.pos = readStream.endOffset || this.buffer.length; | |
// respect the maxChunkSize option to slice up the chunk into smaller pieces. | |
var entireSlice = this.buffer.slice(readStream.start, readStream.pos); | |
var offset = 0; | |
while (true) { | |
var nextOffset = offset + this.maxChunkSize; | |
if (nextOffset >= entireSlice.length) { | |
// last chunk | |
if (offset < entireSlice.length) { | |
readStream.write(entireSlice.slice(offset, entireSlice.length)); | |
} | |
break; | |
} | |
readStream.write(entireSlice.slice(offset, nextOffset)); | |
offset = nextOffset; | |
} | |
readStream.end(); | |
readStream.destroy = function() { | |
readStream.destroyed = true; | |
}; | |
return readStream; | |
}; | |
BufferSlicer.prototype.createWriteStream = function(options) { | |
var bufferSlicer = this; | |
options = options || {}; | |
var writeStream = new Writable(options); | |
writeStream.start = options.start || 0; | |
writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end; | |
writeStream.bytesWritten = 0; | |
writeStream.pos = writeStream.start; | |
writeStream.destroyed = false; | |
writeStream._write = function(buffer, encoding, callback) { | |
if (writeStream.destroyed) return; | |
var end = writeStream.pos + buffer.length; | |
if (end > writeStream.endOffset) { | |
var err = new Error("maximum file length exceeded"); | |
err.code = 'ETOOBIG'; | |
writeStream.destroyed = true; | |
callback(err); | |
return; | |
} | |
buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length); | |
writeStream.bytesWritten += buffer.length; | |
writeStream.pos = end; | |
writeStream.emit('progress'); | |
callback(); | |
}; | |
writeStream.destroy = function() { | |
writeStream.destroyed = true; | |
}; | |
return writeStream; | |
}; | |
BufferSlicer.prototype.ref = function() { | |
this.refCount += 1; | |
}; | |
BufferSlicer.prototype.unref = function() { | |
this.refCount -= 1; | |
if (this.refCount < 0) { | |
throw new Error("invalid unref"); | |
} | |
}; | |
function createFromBuffer(buffer, options) { | |
return new BufferSlicer(buffer, options); | |
} | |
function createFromFd(fd, options) { | |
return new FdSlicer(fd, options); | |
} | |