import { Observable } from '../Observable';
|
import { SchedulerLike } from '../types';
|
import { iterator as Symbol_iterator } from '../symbol/iterator';
|
import { isFunction } from '../util/isFunction';
|
import { executeSchedule } from '../util/executeSchedule';
|
|
/**
|
* Used in {@link scheduled} to create an observable from an Iterable.
|
* @param input The iterable to create an observable from
|
* @param scheduler The scheduler to use
|
*/
|
export function scheduleIterable<T>(input: Iterable<T>, scheduler: SchedulerLike) {
|
return new Observable<T>((subscriber) => {
|
let iterator: Iterator<T, T>;
|
|
// Schedule the initial creation of the iterator from
|
// the iterable. This is so the code in the iterable is
|
// not called until the scheduled job fires.
|
executeSchedule(subscriber, scheduler, () => {
|
// Create the iterator.
|
iterator = (input as any)[Symbol_iterator]();
|
|
executeSchedule(
|
subscriber,
|
scheduler,
|
() => {
|
let value: T;
|
let done: boolean | undefined;
|
try {
|
// Pull the value out of the iterator
|
({ value, done } = iterator.next());
|
} catch (err) {
|
// We got an error while pulling from the iterator
|
subscriber.error(err);
|
return;
|
}
|
|
if (done) {
|
// If it is "done" we just complete. This mimics the
|
// behavior of JavaScript's `for..of` consumption of
|
// iterables, which will not emit the value from an iterator
|
// result of `{ done: true: value: 'here' }`.
|
subscriber.complete();
|
} else {
|
// The iterable is not done, emit the value.
|
subscriber.next(value);
|
}
|
},
|
0,
|
true
|
);
|
});
|
|
// During finalization, if we see this iterator has a `return` method,
|
// then we know it is a Generator, and not just an Iterator. So we call
|
// the `return()` function. This will ensure that any `finally { }` blocks
|
// inside of the generator we can hit will be hit properly.
|
return () => isFunction(iterator?.return) && iterator.return();
|
});
|
}
|