| import { Operator } from '../Operator'; |
| import { Observable } from '../Observable'; |
| import { Subscriber } from '../Subscriber'; |
| import { Subscription } from '../Subscription'; |
| import { subscribeToResult } from '../util/subscribeToResult'; |
| import { OuterSubscriber } from '../OuterSubscriber'; |
| import { InnerSubscriber } from '../InnerSubscriber'; |
| import { ObservableInput, OperatorFunction } from '../types'; |
| |
| /** |
| * Applies an accumulator function over the source Observable where the |
| * accumulator function itself returns an Observable, then each intermediate |
| * Observable returned is merged into the output Observable. |
| * |
| * <span class="informal">It's like {@link scan}, but the Observables returned |
| * by the accumulator are merged into the outer Observable.</span> |
| * |
| * ## Example |
| * Count the number of click events |
| * ```ts |
| * import { fromEvent, of } from 'rxjs'; |
| * import { mapTo, mergeScan } from 'rxjs/operators'; |
| * |
| * const click$ = fromEvent(document, 'click'); |
| * const one$ = click$.pipe(mapTo(1)); |
| * const seed = 0; |
| * const count$ = one$.pipe( |
| * mergeScan((acc, one) => of(acc + one), seed), |
| * ); |
| * count$.subscribe(x => console.log(x)); |
| * |
| * // Results: |
| * // 1 |
| * // 2 |
| * // 3 |
| * // 4 |
| * // ...and so on for each click |
| * ``` |
| * |
| * @param {function(acc: R, value: T): Observable<R>} accumulator |
| * The accumulator function called on each source value. |
| * @param seed The initial accumulation value. |
| * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of |
| * input Observables being subscribed to concurrently. |
| * @return {Observable<R>} An observable of the accumulated values. |
| * @method mergeScan |
| * @owner Observable |
| */ |
| export function mergeScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, |
| seed: R, |
| concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> { |
| return (source: Observable<T>) => source.lift(new MergeScanOperator(accumulator, seed, concurrent)); |
| } |
| |
| export class MergeScanOperator<T, R> implements Operator<T, R> { |
| constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, |
| private seed: R, |
| private concurrent: number) { |
| } |
| |
| call(subscriber: Subscriber<R>, source: any): any { |
| return source.subscribe(new MergeScanSubscriber( |
| subscriber, this.accumulator, this.seed, this.concurrent |
| )); |
| } |
| } |
| |
| /** |
| * We need this JSDoc comment for affecting ESDoc. |
| * @ignore |
| * @extends {Ignored} |
| */ |
| export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> { |
| private hasValue: boolean = false; |
| private hasCompleted: boolean = false; |
| private buffer: Observable<any>[] = []; |
| private active: number = 0; |
| protected index: number = 0; |
| |
| constructor(destination: Subscriber<R>, |
| private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, |
| private acc: R, |
| private concurrent: number) { |
| super(destination); |
| } |
| |
| protected _next(value: any): void { |
| if (this.active < this.concurrent) { |
| const index = this.index++; |
| const destination = this.destination; |
| let ish; |
| try { |
| const { accumulator } = this; |
| ish = accumulator(this.acc, value, index); |
| } catch (e) { |
| return destination.error(e); |
| } |
| this.active++; |
| this._innerSub(ish, value, index); |
| } else { |
| this.buffer.push(value); |
| } |
| } |
| |
| private _innerSub(ish: any, value: T, index: number): void { |
| const innerSubscriber = new InnerSubscriber(this, undefined, undefined); |
| const destination = this.destination as Subscription; |
| destination.add(innerSubscriber); |
| subscribeToResult<T, R>(this, ish, value, index, innerSubscriber); |
| } |
| |
| protected _complete(): void { |
| this.hasCompleted = true; |
| if (this.active === 0 && this.buffer.length === 0) { |
| if (this.hasValue === false) { |
| this.destination.next(this.acc); |
| } |
| this.destination.complete(); |
| } |
| this.unsubscribe(); |
| } |
| |
| notifyNext(outerValue: T, innerValue: R, |
| outerIndex: number, innerIndex: number, |
| innerSub: InnerSubscriber<T, R>): void { |
| const { destination } = this; |
| this.acc = innerValue; |
| this.hasValue = true; |
| destination.next(innerValue); |
| } |
| |
| notifyComplete(innerSub: Subscription): void { |
| const buffer = this.buffer; |
| const destination = this.destination as Subscription; |
| destination.remove(innerSub); |
| this.active--; |
| if (buffer.length > 0) { |
| this._next(buffer.shift()); |
| } else if (this.active === 0 && this.hasCompleted) { |
| if (this.hasValue === false) { |
| this.destination.next(this.acc); |
| } |
| this.destination.complete(); |
| } |
| } |
| } |