| 'use strict'; |
| |
| var _Object$setPrototypeO; |
| |
| function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } |
| |
| var finished = require('./end-of-stream'); |
| |
| var kLastResolve = Symbol('lastResolve'); |
| var kLastReject = Symbol('lastReject'); |
| var kError = Symbol('error'); |
| var kEnded = Symbol('ended'); |
| var kLastPromise = Symbol('lastPromise'); |
| var kHandlePromise = Symbol('handlePromise'); |
| var kStream = Symbol('stream'); |
| |
| function createIterResult(value, done) { |
| return { |
| value: value, |
| done: done |
| }; |
| } |
| |
| function readAndResolve(iter) { |
| var resolve = iter[kLastResolve]; |
| |
| if (resolve !== null) { |
| var data = iter[kStream].read(); // we defer if data is null |
| // we can be expecting either 'end' or |
| // 'error' |
| |
| if (data !== null) { |
| iter[kLastPromise] = null; |
| iter[kLastResolve] = null; |
| iter[kLastReject] = null; |
| resolve(createIterResult(data, false)); |
| } |
| } |
| } |
| |
| function onReadable(iter) { |
| // we wait for the next tick, because it might |
| // emit an error with process.nextTick |
| process.nextTick(readAndResolve, iter); |
| } |
| |
| function wrapForNext(lastPromise, iter) { |
| return function (resolve, reject) { |
| lastPromise.then(function () { |
| if (iter[kEnded]) { |
| resolve(createIterResult(undefined, true)); |
| return; |
| } |
| |
| iter[kHandlePromise](resolve, reject); |
| }, reject); |
| }; |
| } |
| |
| var AsyncIteratorPrototype = Object.getPrototypeOf(function () {}); |
| var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = { |
| get stream() { |
| return this[kStream]; |
| }, |
| |
| next: function next() { |
| var _this = this; |
| |
| // if we have detected an error in the meanwhile |
| // reject straight away |
| var error = this[kError]; |
| |
| if (error !== null) { |
| return Promise.reject(error); |
| } |
| |
| if (this[kEnded]) { |
| return Promise.resolve(createIterResult(undefined, true)); |
| } |
| |
| if (this[kStream].destroyed) { |
| // We need to defer via nextTick because if .destroy(err) is |
| // called, the error will be emitted via nextTick, and |
| // we cannot guarantee that there is no error lingering around |
| // waiting to be emitted. |
| return new Promise(function (resolve, reject) { |
| process.nextTick(function () { |
| if (_this[kError]) { |
| reject(_this[kError]); |
| } else { |
| resolve(createIterResult(undefined, true)); |
| } |
| }); |
| }); |
| } // if we have multiple next() calls |
| // we will wait for the previous Promise to finish |
| // this logic is optimized to support for await loops, |
| // where next() is only called once at a time |
| |
| |
| var lastPromise = this[kLastPromise]; |
| var promise; |
| |
| if (lastPromise) { |
| promise = new Promise(wrapForNext(lastPromise, this)); |
| } else { |
| // fast path needed to support multiple this.push() |
| // without triggering the next() queue |
| var data = this[kStream].read(); |
| |
| if (data !== null) { |
| return Promise.resolve(createIterResult(data, false)); |
| } |
| |
| promise = new Promise(this[kHandlePromise]); |
| } |
| |
| this[kLastPromise] = promise; |
| return promise; |
| } |
| }, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () { |
| return this; |
| }), _defineProperty(_Object$setPrototypeO, "return", function _return() { |
| var _this2 = this; |
| |
| // destroy(err, cb) is a private API |
| // we can guarantee we have that here, because we control the |
| // Readable class this is attached to |
| return new Promise(function (resolve, reject) { |
| _this2[kStream].destroy(null, function (err) { |
| if (err) { |
| reject(err); |
| return; |
| } |
| |
| resolve(createIterResult(undefined, true)); |
| }); |
| }); |
| }), _Object$setPrototypeO), AsyncIteratorPrototype); |
| |
| var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) { |
| var _Object$create; |
| |
| var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, { |
| value: stream, |
| writable: true |
| }), _defineProperty(_Object$create, kLastResolve, { |
| value: null, |
| writable: true |
| }), _defineProperty(_Object$create, kLastReject, { |
| value: null, |
| writable: true |
| }), _defineProperty(_Object$create, kError, { |
| value: null, |
| writable: true |
| }), _defineProperty(_Object$create, kEnded, { |
| value: stream._readableState.endEmitted, |
| writable: true |
| }), _defineProperty(_Object$create, kHandlePromise, { |
| value: function value(resolve, reject) { |
| var data = iterator[kStream].read(); |
| |
| if (data) { |
| iterator[kLastPromise] = null; |
| iterator[kLastResolve] = null; |
| iterator[kLastReject] = null; |
| resolve(createIterResult(data, false)); |
| } else { |
| iterator[kLastResolve] = resolve; |
| iterator[kLastReject] = reject; |
| } |
| }, |
| writable: true |
| }), _Object$create)); |
| iterator[kLastPromise] = null; |
| finished(stream, function (err) { |
| if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { |
| var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise |
| // returned by next() and store the error |
| |
| if (reject !== null) { |
| iterator[kLastPromise] = null; |
| iterator[kLastResolve] = null; |
| iterator[kLastReject] = null; |
| reject(err); |
| } |
| |
| iterator[kError] = err; |
| return; |
| } |
| |
| var resolve = iterator[kLastResolve]; |
| |
| if (resolve !== null) { |
| iterator[kLastPromise] = null; |
| iterator[kLastResolve] = null; |
| iterator[kLastReject] = null; |
| resolve(createIterResult(undefined, true)); |
| } |
| |
| iterator[kEnded] = true; |
| }); |
| stream.on('readable', onReadable.bind(null, iterator)); |
| return iterator; |
| }; |
| |
| module.exports = createReadableStreamAsyncIterator; |