zhangjian
2023-08-07 6b009b0f6d3ef3aee97c362cebcd679d1b9088a3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/innerFrom';
import { noop } from '../util/noop';
 
/**
 * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
 *
 * The `skipUntil` operator causes the observable stream to skip the emission of values until the passed in observable
 * emits the first value. This can be particularly useful in combination with user interactions, responses of HTTP
 * requests or waiting for specific times to pass by.
 *
 * ![](skipUntil.png)
 *
 * Internally, the `skipUntil` operator subscribes to the passed in `notifier` `ObservableInput` (which gets converted
 * to an Observable) in order to recognize the emission of its first value. When `notifier` emits next, the operator
 * unsubscribes from it and starts emitting the values of the *source* observable until it completes or errors. It
 * will never let the *source* observable emit any values if the `notifier` completes or throws an error without
 * emitting a value before.
 *
 * ## Example
 *
 * In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere
 * within the page
 *
 * ```ts
 * import { interval, fromEvent, skipUntil } from 'rxjs';
 *
 * const intervalObservable = interval(1000);
 * const click = fromEvent(document, 'click');
 *
 * const emitAfterClick = intervalObservable.pipe(
 *   skipUntil(click)
 * );
 * // clicked at 4.6s. output: 5...6...7...8........ or
 * // clicked at 7.3s. output: 8...9...10..11.......
 * emitAfterClick.subscribe(value => console.log(value));
 * ```
 *
 * @see {@link last}
 * @see {@link skip}
 * @see {@link skipWhile}
 * @see {@link skipLast}
 *
 * @param notifier An `ObservableInput` that has to emit an item before the source Observable elements begin to
 * be mirrored by the resulting Observable.
 * @return A function that returns an Observable that skips items from the
 * source Observable until the `notifier` Observable emits an item, then emits the
 * remaining items.
 */
export function skipUntil<T>(notifier: ObservableInput<any>): MonoTypeOperatorFunction<T> {
  return operate((source, subscriber) => {
    let taking = false;
 
    const skipSubscriber = createOperatorSubscriber(
      subscriber,
      () => {
        skipSubscriber?.unsubscribe();
        taking = true;
      },
      noop
    );
 
    innerFrom(notifier).subscribe(skipSubscriber);
 
    source.subscribe(createOperatorSubscriber(subscriber, (value) => taking && subscriber.next(value)));
  });
}