| import { Observable } from '../Observable'; |
| import { SchedulerAction, SchedulerLike } from '../types'; |
| import { Subscriber } from '../Subscriber'; |
| import { Subscription } from '../Subscription'; |
| |
| /** |
| * Convert an object into an Observable of `[key, value]` pairs. |
| * |
| * <span class="informal">Turn entries of an object into a stream.</span> |
| * |
| * <img src="./img/pairs.png" width="100%"> |
| * |
| * `pairs` takes an arbitrary object and returns an Observable that emits arrays. Each |
| * emitted array has exactly two elements - the first is a key from the object |
| * and the second is a value corresponding to that key. Keys are extracted from |
| * an object via `Object.keys` function, which means that they will be only |
| * enumerable keys that are present on an object directly - not ones inherited |
| * via prototype chain. |
| * |
| * By default these arrays are emitted synchronously. To change that you can |
| * pass a {@link SchedulerLike} as a second argument to `pairs`. |
| * |
| * @example <caption>Converts a javascript object to an Observable</caption> |
| * ```ts |
| * import { pairs } from 'rxjs'; |
| * |
| * const obj = { |
| * foo: 42, |
| * bar: 56, |
| * baz: 78 |
| * }; |
| * |
| * pairs(obj) |
| * .subscribe( |
| * value => console.log(value), |
| * err => {}, |
| * () => console.log('the end!') |
| * ); |
| * |
| * // Logs: |
| * // ["foo", 42], |
| * // ["bar", 56], |
| * // ["baz", 78], |
| * // "the end!" |
| * ``` |
| * |
| * @param {Object} obj The object to inspect and turn into an |
| * Observable sequence. |
| * @param {Scheduler} [scheduler] An optional IScheduler to schedule |
| * when resulting Observable will emit values. |
| * @returns {(Observable<Array<string|T>>)} An observable sequence of |
| * [key, value] pairs from the object. |
| */ |
| export function pairs<T>(obj: Object, scheduler?: SchedulerLike): Observable<[string, T]> { |
| if (!scheduler) { |
| return new Observable<[string, T]>(subscriber => { |
| const keys = Object.keys(obj); |
| for (let i = 0; i < keys.length && !subscriber.closed; i++) { |
| const key = keys[i]; |
| if (obj.hasOwnProperty(key)) { |
| subscriber.next([key, obj[key]]); |
| } |
| } |
| subscriber.complete(); |
| }); |
| } else { |
| return new Observable<[string, T]>(subscriber => { |
| const keys = Object.keys(obj); |
| const subscription = new Subscription(); |
| subscription.add( |
| scheduler.schedule<{ keys: string[], index: number, subscriber: Subscriber<[string, T]>, subscription: Subscription, obj: Object }> |
| (dispatch, 0, { keys, index: 0, subscriber, subscription, obj })); |
| return subscription; |
| }); |
| } |
| } |
| |
| /** @internal */ |
| export function dispatch<T>(this: SchedulerAction<any>, |
| state: { keys: string[], index: number, subscriber: Subscriber<[string, T]>, subscription: Subscription, obj: Object }) { |
| const { keys, index, subscriber, subscription, obj } = state; |
| if (!subscriber.closed) { |
| if (index < keys.length) { |
| const key = keys[index]; |
| subscriber.next([key, obj[key]]); |
| subscription.add(this.schedule({ keys, index: index + 1, subscriber, subscription, obj })); |
| } else { |
| subscriber.complete(); |
| } |
| } |
| } |