blob: 901f80ed3525552b9a28a3a1910c6c16eb8eb083 [file] [log] [blame]
import { Subject } from '../Subject';
import { async } from '../scheduler/async';
import { Subscriber } from '../Subscriber';
import { isNumeric } from '../util/isNumeric';
import { isScheduler } from '../util/isScheduler';
export function windowTime(windowTimeSpan) {
let scheduler = async;
let windowCreationInterval = null;
let maxWindowSize = Number.POSITIVE_INFINITY;
if (isScheduler(arguments[3])) {
scheduler = arguments[3];
}
if (isScheduler(arguments[2])) {
scheduler = arguments[2];
}
else if (isNumeric(arguments[2])) {
maxWindowSize = arguments[2];
}
if (isScheduler(arguments[1])) {
scheduler = arguments[1];
}
else if (isNumeric(arguments[1])) {
windowCreationInterval = arguments[1];
}
return function windowTimeOperatorFunction(source) {
return source.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
};
}
class WindowTimeOperator {
constructor(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler) {
this.windowTimeSpan = windowTimeSpan;
this.windowCreationInterval = windowCreationInterval;
this.maxWindowSize = maxWindowSize;
this.scheduler = scheduler;
}
call(subscriber, source) {
return source.subscribe(new WindowTimeSubscriber(subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler));
}
}
class CountedSubject extends Subject {
constructor() {
super(...arguments);
this._numberOfNextedValues = 0;
}
next(value) {
this._numberOfNextedValues++;
super.next(value);
}
get numberOfNextedValues() {
return this._numberOfNextedValues;
}
}
class WindowTimeSubscriber extends Subscriber {
constructor(destination, windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler) {
super(destination);
this.destination = destination;
this.windowTimeSpan = windowTimeSpan;
this.windowCreationInterval = windowCreationInterval;
this.maxWindowSize = maxWindowSize;
this.scheduler = scheduler;
this.windows = [];
const window = this.openWindow();
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
const closeState = { subscriber: this, window, context: null };
const creationState = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
}
else {
const timeSpanOnlyState = { subscriber: this, window, windowTimeSpan };
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
}
}
_next(value) {
const windows = this.windows;
const len = windows.length;
for (let i = 0; i < len; i++) {
const window = windows[i];
if (!window.closed) {
window.next(value);
if (window.numberOfNextedValues >= this.maxWindowSize) {
this.closeWindow(window);
}
}
}
}
_error(err) {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().error(err);
}
this.destination.error(err);
}
_complete() {
const windows = this.windows;
while (windows.length > 0) {
const window = windows.shift();
if (!window.closed) {
window.complete();
}
}
this.destination.complete();
}
openWindow() {
const window = new CountedSubject();
this.windows.push(window);
const destination = this.destination;
destination.next(window);
return window;
}
closeWindow(window) {
window.complete();
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
}
}
function dispatchWindowTimeSpanOnly(state) {
const { subscriber, windowTimeSpan, window } = state;
if (window) {
subscriber.closeWindow(window);
}
state.window = subscriber.openWindow();
this.schedule(state, windowTimeSpan);
}
function dispatchWindowCreation(state) {
const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
const window = subscriber.openWindow();
const action = this;
let context = { action, subscription: null };
const timeSpanState = { subscriber, window, context };
context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
action.add(context.subscription);
action.schedule(state, windowCreationInterval);
}
function dispatchWindowClose(state) {
const { subscriber, window, context } = state;
if (context && context.action && context.subscription) {
context.action.remove(context.subscription);
}
subscriber.closeWindow(window);
}
//# sourceMappingURL=windowTime.js.map