| const debug = require("debug")("streamroller:RollingFileWriteStream"); |
| const _ = require("lodash"); |
| const async = require("async"); |
| const fs = require("fs-extra"); |
| const zlib = require("zlib"); |
| const path = require("path"); |
| const newNow = require("./now"); |
| const format = require("date-format"); |
| const { Writable } = require("stream"); |
| |
| const FILENAME_SEP = "."; |
| const ZIP_EXT = ".gz"; |
| |
| const moveAndMaybeCompressFile = ( |
| sourceFilePath, |
| targetFilePath, |
| needCompress, |
| done |
| ) => { |
| if (sourceFilePath === targetFilePath) { |
| debug( |
| `moveAndMaybeCompressFile: source and target are the same, not doing anything` |
| ); |
| return done(); |
| } |
| fs.access(sourceFilePath, fs.constants.W_OK | fs.constants.R_OK, e => { |
| if (e) { |
| debug( |
| `moveAndMaybeCompressFile: source file path does not exist. not moving. sourceFilePath=${sourceFilePath}` |
| ); |
| return done(); |
| } |
| |
| debug( |
| `moveAndMaybeCompressFile: moving file from ${sourceFilePath} to ${targetFilePath} ${ |
| needCompress ? "with" : "without" |
| } compress` |
| ); |
| if (needCompress) { |
| fs.createReadStream(sourceFilePath) |
| .pipe(zlib.createGzip()) |
| .pipe(fs.createWriteStream(targetFilePath)) |
| .on("finish", () => { |
| debug( |
| `moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}` |
| ); |
| fs.unlink(sourceFilePath, done); |
| }); |
| } else { |
| debug( |
| `moveAndMaybeCompressFile: deleting file=${targetFilePath}, renaming ${sourceFilePath} to ${targetFilePath}` |
| ); |
| fs.unlink(targetFilePath, () => { |
| fs.rename(sourceFilePath, targetFilePath, done); |
| }); |
| } |
| }); |
| }; |
| |
| /** |
| * RollingFileWriteStream is mainly used when writing to a file rolling by date or size. |
| * RollingFileWriteStream inhebites from stream.Writable |
| */ |
| class RollingFileWriteStream extends Writable { |
| /** |
| * Create a RollingFileWriteStream |
| * @constructor |
| * @param {string} filePath - The file path to write. |
| * @param {object} options - The extra options |
| * @param {number} options.numToKeep - The max numbers of files to keep. |
| * @param {number} options.maxSize - The maxSize one file can reach. Unit is Byte. |
| * This should be more than 1024. The default is Number.MAX_SAFE_INTEGER. |
| * @param {string} options.mode - The mode of the files. The default is '0644'. Refer to stream.writable for more. |
| * @param {string} options.flags - The default is 'a'. Refer to stream.flags for more. |
| * @param {boolean} options.compress - Whether to compress backup files. |
| * @param {boolean} options.keepFileExt - Whether to keep the file extension. |
| * @param {string} options.pattern - The date string pattern in the file name. |
| * @param {boolean} options.alwaysIncludePattern - Whether to add date to the name of the first file. |
| */ |
| constructor(filePath, options) { |
| debug(`creating RollingFileWriteStream. path=${filePath}`); |
| super(options); |
| this.options = this._parseOption(options); |
| this.fileObject = path.parse(filePath); |
| if (this.fileObject.dir === "") { |
| this.fileObject = path.parse(path.join(process.cwd(), filePath)); |
| } |
| this.justTheFile = this._formatFileName({ isHotFile: true }); |
| this.filename = path.join(this.fileObject.dir, this.justTheFile); |
| this.state = { |
| currentSize: 0 |
| }; |
| |
| if (this.options.pattern) { |
| this.state.currentDate = format(this.options.pattern, newNow()); |
| } |
| |
| if (this.options.flags === "a") { |
| this._setExistingSizeAndDate(); |
| } |
| |
| debug( |
| `create new file with no hot file. name=${ |
| this.justTheFile |
| }, state=${JSON.stringify(this.state)}` |
| ); |
| this._renewWriteStream(); |
| } |
| |
| _setExistingSizeAndDate() { |
| try { |
| const stats = fs.statSync(this.filename); |
| this.state.currentSize = stats.size; |
| if (this.options.pattern) { |
| this.state.currentDate = format(this.options.pattern, stats.birthtime); |
| } |
| } catch (e) { |
| //file does not exist, that's fine - move along |
| return; |
| } |
| } |
| |
| _parseOption(rawOptions) { |
| const defaultOptions = { |
| maxSize: Number.MAX_SAFE_INTEGER, |
| numToKeep: Number.MAX_SAFE_INTEGER, |
| encoding: "utf8", |
| mode: parseInt("0644", 8), |
| flags: "a", |
| compress: false, |
| keepFileExt: false, |
| alwaysIncludePattern: false |
| }; |
| const options = _.defaults({}, rawOptions, defaultOptions); |
| if (options.maxSize <= 0) { |
| throw new Error(`options.maxSize (${options.maxSize}) should be > 0`); |
| } |
| if (options.numToKeep <= 0) { |
| throw new Error(`options.numToKeep (${options.numToKeep}) should be > 0`); |
| } |
| debug(`creating stream with option=${JSON.stringify(options)}`); |
| return options; |
| } |
| |
| _shouldRoll(callback) { |
| if ( |
| this.state.currentDate && |
| this.state.currentDate !== format(this.options.pattern, newNow()) |
| ) { |
| debug( |
| `_shouldRoll: rolling by date because ${ |
| this.state.currentDate |
| } !== ${format(this.options.pattern, newNow())}` |
| ); |
| this._roll({ isNextPeriod: true }, callback); |
| return; |
| } |
| if (this.state.currentSize >= this.options.maxSize) { |
| debug( |
| `_shouldRoll: rolling by size because ${this.state.currentSize} >= ${this.options.maxSize}` |
| ); |
| this._roll({ isNextPeriod: false }, callback); |
| return; |
| } |
| callback(); |
| } |
| |
| _write(chunk, encoding, callback) { |
| this._shouldRoll(() => { |
| debug( |
| `writing chunk. ` + |
| `file=${this.currentFileStream.path} ` + |
| `state=${JSON.stringify(this.state)} ` + |
| `chunk=${chunk}` |
| ); |
| this.currentFileStream.write(chunk, encoding, e => { |
| this.state.currentSize += chunk.length; |
| callback(e); |
| }); |
| }); |
| } |
| |
| // Sorted from the oldest to the latest |
| _getExistingFiles(cb) { |
| fs.readdir(this.fileObject.dir, (e, files) => { |
| debug(`_getExistingFiles: files=${files}`); |
| const existingFileDetails = _.compact( |
| _.map(files, n => { |
| const parseResult = this._parseFileName(n); |
| debug(`_getExistingFiles: parsed ${n} as `, parseResult); |
| if (!parseResult) { |
| return; |
| } |
| return _.assign({ fileName: n }, parseResult); |
| }) |
| ); |
| cb( |
| null, |
| _.sortBy( |
| existingFileDetails, |
| n => |
| (n.date |
| ? format.parse(this.options.pattern, n.date).valueOf() |
| : newNow().valueOf()) - n.index |
| ) |
| ); |
| }); |
| } |
| |
| // need file name instead of file abs path. |
| _parseFileName(fileName) { |
| let isCompressed = false; |
| if (fileName.endsWith(ZIP_EXT)) { |
| fileName = fileName.slice(0, -1 * ZIP_EXT.length); |
| isCompressed = true; |
| } |
| let metaStr; |
| if (this.options.keepFileExt) { |
| const prefix = this.fileObject.name + FILENAME_SEP; |
| const suffix = this.fileObject.ext; |
| if (!fileName.startsWith(prefix) || !fileName.endsWith(suffix)) { |
| return; |
| } |
| metaStr = fileName.slice(prefix.length, -1 * suffix.length); |
| debug( |
| `metaStr=${metaStr}, fileName=${fileName}, prefix=${prefix}, suffix=${suffix}` |
| ); |
| } else { |
| const prefix = this.fileObject.base; |
| if (!fileName.startsWith(prefix)) { |
| return; |
| } |
| metaStr = fileName.slice(prefix.length + 1); |
| debug(`metaStr=${metaStr}, fileName=${fileName}, prefix=${prefix}`); |
| } |
| if (!metaStr) { |
| return { |
| index: 0, |
| isCompressed |
| }; |
| } |
| if (this.options.pattern) { |
| const items = _.split(metaStr, FILENAME_SEP); |
| const indexStr = items[items.length - 1]; |
| debug("items: ", items, ", indexStr: ", indexStr); |
| if (indexStr !== undefined && indexStr.match(/^\d+$/)) { |
| const dateStr = metaStr.slice(0, -1 * (indexStr.length + 1)); |
| debug(`dateStr is ${dateStr}`); |
| if (dateStr) { |
| return { |
| index: parseInt(indexStr, 10), |
| date: dateStr, |
| isCompressed |
| }; |
| } |
| } |
| debug(`metaStr is ${metaStr}`); |
| return { |
| index: 0, |
| date: metaStr, |
| isCompressed |
| }; |
| } else { |
| if (metaStr.match(/^\d+$/)) { |
| return { |
| index: parseInt(metaStr, 10), |
| isCompressed |
| }; |
| } |
| } |
| return; |
| } |
| |
| _formatFileName({ date, index, isHotFile }) { |
| debug( |
| `_formatFileName: date=${date}, index=${index}, isHotFile=${isHotFile}` |
| ); |
| const dateStr = |
| date || |
| _.get(this, "state.currentDate") || |
| format(this.options.pattern, newNow()); |
| const indexOpt = index || _.get(this, "state.currentIndex"); |
| const oriFileName = this.fileObject.base; |
| if (isHotFile) { |
| debug( |
| `_formatFileName: includePattern? ${this.options.alwaysIncludePattern}, pattern: ${this.options.pattern}` |
| ); |
| if (this.options.alwaysIncludePattern && this.options.pattern) { |
| debug( |
| `_formatFileName: is hot file, and include pattern, so: ${oriFileName + |
| FILENAME_SEP + |
| dateStr}` |
| ); |
| return this.options.keepFileExt |
| ? this.fileObject.name + FILENAME_SEP + dateStr + this.fileObject.ext |
| : oriFileName + FILENAME_SEP + dateStr; |
| } |
| debug(`_formatFileName: is hot file so, filename: ${oriFileName}`); |
| return oriFileName; |
| } |
| let fileNameExtraItems = []; |
| if (this.options.pattern) { |
| fileNameExtraItems.push(dateStr); |
| } |
| if (indexOpt && this.options.maxSize < Number.MAX_SAFE_INTEGER) { |
| fileNameExtraItems.push(indexOpt); |
| } |
| let fileName; |
| if (this.options.keepFileExt) { |
| const baseFileName = |
| this.fileObject.name + |
| FILENAME_SEP + |
| fileNameExtraItems.join(FILENAME_SEP); |
| fileName = baseFileName + this.fileObject.ext; |
| } else { |
| fileName = |
| oriFileName + FILENAME_SEP + fileNameExtraItems.join(FILENAME_SEP); |
| } |
| if (this.options.compress) { |
| fileName += ZIP_EXT; |
| } |
| debug(`_formatFileName: ${fileName}`); |
| return fileName; |
| } |
| |
| _moveOldFiles(isNextPeriod, cb) { |
| const currentFilePath = this.currentFileStream.path; |
| debug(`numToKeep = ${this.options.numToKeep}`); |
| const finishedRolling = () => { |
| if (isNextPeriod) { |
| this.state.currentSize = 0; |
| this.state.currentDate = format(this.options.pattern, newNow()); |
| debug(`rolling for next period. state=${JSON.stringify(this.state)}`); |
| } else { |
| this.state.currentSize = 0; |
| debug( |
| `rolling during the same period. state=${JSON.stringify(this.state)}` |
| ); |
| } |
| this._renewWriteStream(); |
| // wait for the file to be open before cleaning up old ones, |
| // otherwise the daysToKeep calculations can be off |
| this.currentFileStream.write("", "utf8", () => this._clean(cb)); |
| }; |
| |
| this._getExistingFiles((e, files) => { |
| const filesToMove = []; |
| const todaysFiles = this.state.currentDate |
| ? files.filter(f => f.date === this.state.currentDate) |
| : files; |
| for (let i = todaysFiles.length; i >= 0; i--) { |
| debug(`i = ${i}`); |
| const sourceFilePath = |
| i === 0 |
| ? currentFilePath |
| : path.format({ |
| dir: this.fileObject.dir, |
| base: this._formatFileName({ |
| date: this.state.currentDate, |
| index: i |
| }) |
| }); |
| const targetFilePath = path.format({ |
| dir: this.fileObject.dir, |
| base: this._formatFileName({ |
| date: this.state.currentDate, |
| index: i + 1 |
| }) |
| }); |
| filesToMove.push({ sourceFilePath, targetFilePath }); |
| } |
| debug(`filesToMove = `, filesToMove); |
| async.eachOfSeries( |
| filesToMove, |
| (files, idx, cb1) => { |
| debug( |
| `src=${files.sourceFilePath}, tgt=${ |
| files.sourceFilePath |
| }, idx=${idx}, pos=${filesToMove.length - 1 - idx}` |
| ); |
| moveAndMaybeCompressFile( |
| files.sourceFilePath, |
| files.targetFilePath, |
| this.options.compress && filesToMove.length - 1 - idx === 0, |
| cb1 |
| ); |
| }, |
| finishedRolling |
| ); |
| }); |
| } |
| |
| _roll({ isNextPeriod }, cb) { |
| debug(`rolling, isNextPeriod ? ${isNextPeriod}`); |
| debug(`_roll: closing the current stream`); |
| this.currentFileStream.end("", this.options.encoding, () => { |
| this._moveOldFiles(isNextPeriod, cb); |
| }); |
| } |
| |
| _renewWriteStream() { |
| fs.ensureDirSync(this.fileObject.dir); |
| this.justTheFile = this._formatFileName({ |
| date: this.state.currentDate, |
| index: 0, |
| isHotFile: true |
| }); |
| const filePath = path.format({ |
| dir: this.fileObject.dir, |
| base: this.justTheFile |
| }); |
| const ops = _.pick(this.options, ["flags", "encoding", "mode"]); |
| this.currentFileStream = fs.createWriteStream(filePath, ops); |
| this.currentFileStream.on("error", e => { |
| this.emit("error", e); |
| }); |
| } |
| |
| _clean(cb) { |
| this._getExistingFiles((e, existingFileDetails) => { |
| debug( |
| `numToKeep = ${this.options.numToKeep}, existingFiles = ${existingFileDetails.length}` |
| ); |
| debug("existing files are: ", existingFileDetails); |
| if ( |
| this.options.numToKeep > 0 && |
| existingFileDetails.length > this.options.numToKeep |
| ) { |
| const fileNamesToRemove = _.slice( |
| existingFileDetails.map(f => f.fileName), |
| 0, |
| existingFileDetails.length - this.options.numToKeep - 1 |
| ); |
| this._deleteFiles(fileNamesToRemove, cb); |
| return; |
| } |
| cb(); |
| }); |
| } |
| |
| _deleteFiles(fileNames, done) { |
| debug(`files to delete: ${fileNames}`); |
| async.each( |
| _.map(fileNames, f => path.format({ dir: this.fileObject.dir, base: f })), |
| fs.unlink, |
| done |
| ); |
| return; |
| } |
| } |
| |
| module.exports = RollingFileWriteStream; |