blob: 85bd81c5b3771ad111aa73f7d573211c5fde34eb [file] [log] [blame]
import { Observable } from '../Observable';
import { SchedulerAction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
/**
* Convert an object into an Observable of `[key, value]` pairs.
*
* <span class="informal">Turn entries of an object into a stream.</span>
*
* <img src="./img/pairs.png" width="100%">
*
* `pairs` takes an arbitrary object and returns an Observable that emits arrays. Each
* emitted array has exactly two elements - the first is a key from the object
* and the second is a value corresponding to that key. Keys are extracted from
* an object via `Object.keys` function, which means that they will be only
* enumerable keys that are present on an object directly - not ones inherited
* via prototype chain.
*
* By default these arrays are emitted synchronously. To change that you can
* pass a {@link SchedulerLike} as a second argument to `pairs`.
*
* @example <caption>Converts a javascript object to an Observable</caption>
* ```ts
* import { pairs } from 'rxjs';
*
* const obj = {
* foo: 42,
* bar: 56,
* baz: 78
* };
*
* pairs(obj)
* .subscribe(
* value => console.log(value),
* err => {},
* () => console.log('the end!')
* );
*
* // Logs:
* // ["foo", 42],
* // ["bar", 56],
* // ["baz", 78],
* // "the end!"
* ```
*
* @param {Object} obj The object to inspect and turn into an
* Observable sequence.
* @param {Scheduler} [scheduler] An optional IScheduler to schedule
* when resulting Observable will emit values.
* @returns {(Observable<Array<string|T>>)} An observable sequence of
* [key, value] pairs from the object.
*/
export function pairs<T>(obj: Object, scheduler?: SchedulerLike): Observable<[string, T]> {
if (!scheduler) {
return new Observable<[string, T]>(subscriber => {
const keys = Object.keys(obj);
for (let i = 0; i < keys.length && !subscriber.closed; i++) {
const key = keys[i];
if (obj.hasOwnProperty(key)) {
subscriber.next([key, obj[key]]);
}
}
subscriber.complete();
});
} else {
return new Observable<[string, T]>(subscriber => {
const keys = Object.keys(obj);
const subscription = new Subscription();
subscription.add(
scheduler.schedule<{ keys: string[], index: number, subscriber: Subscriber<[string, T]>, subscription: Subscription, obj: Object }>
(dispatch, 0, { keys, index: 0, subscriber, subscription, obj }));
return subscription;
});
}
}
/** @internal */
export function dispatch<T>(this: SchedulerAction<any>,
state: { keys: string[], index: number, subscriber: Subscriber<[string, T]>, subscription: Subscription, obj: Object }) {
const { keys, index, subscriber, subscription, obj } = state;
if (!subscriber.closed) {
if (index < keys.length) {
const key = keys[index];
subscriber.next([key, obj[key]]);
subscription.add(this.schedule({ keys, index: index + 1, subscriber, subscription, obj }));
} else {
subscriber.complete();
}
}
}