| import { Observable } from '../Observable'; | 
| import { innerFrom } from '../observable/innerFrom'; | 
| import { Subject } from '../Subject'; | 
| import { Subscription } from '../Subscription'; | 
|   | 
| import { MonoTypeOperatorFunction, ObservableInput } from '../types'; | 
| import { operate } from '../util/lift'; | 
| import { createOperatorSubscriber } from './OperatorSubscriber'; | 
|   | 
| /** | 
|  * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable | 
|  * calls `error`, this method will emit the Throwable that caused the error to the `ObservableInput` returned from `notifier`. | 
|  * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child | 
|  * subscription. Otherwise this method will resubscribe to the source Observable. | 
|  * | 
|  *  | 
|  * | 
|  * Retry an observable sequence on error based on custom criteria. | 
|  * | 
|  * ## Example | 
|  * | 
|  * ```ts | 
|  * import { interval, map, retryWhen, tap, delayWhen, timer } from 'rxjs'; | 
|  * | 
|  * const source = interval(1000); | 
|  * const result = source.pipe( | 
|  *   map(value => { | 
|  *     if (value > 5) { | 
|  *       // error will be picked up by retryWhen | 
|  *       throw value; | 
|  *     } | 
|  *     return value; | 
|  *   }), | 
|  *   retryWhen(errors => | 
|  *     errors.pipe( | 
|  *       // log error message | 
|  *       tap(value => console.log(`Value ${ value } was too high!`)), | 
|  *       // restart in 5 seconds | 
|  *       delayWhen(value => timer(value * 1000)) | 
|  *     ) | 
|  *   ) | 
|  * ); | 
|  * | 
|  * result.subscribe(value => console.log(value)); | 
|  * | 
|  * // results: | 
|  * // 0 | 
|  * // 1 | 
|  * // 2 | 
|  * // 3 | 
|  * // 4 | 
|  * // 5 | 
|  * // 'Value 6 was too high!' | 
|  * // - Wait 5 seconds then repeat | 
|  * ``` | 
|  * | 
|  * @see {@link retry} | 
|  * | 
|  * @param notifier Function that receives an Observable of notifications with which a | 
|  * user can `complete` or `error`, aborting the retry. | 
|  * @return A function that returns an `ObservableInput` that mirrors the source | 
|  * Observable with the exception of an `error`. | 
|  * @deprecated Will be removed in v9 or v10, use {@link retry}'s `delay` option instead. | 
|  * Will be removed in v9 or v10. Use {@link retry}'s {@link RetryConfig#delay delay} option instead. | 
|  * Instead of `retryWhen(() => notify$)`, use: `retry({ delay: () => notify$ })`. | 
|  */ | 
| export function retryWhen<T>(notifier: (errors: Observable<any>) => ObservableInput<any>): MonoTypeOperatorFunction<T> { | 
|   return operate((source, subscriber) => { | 
|     let innerSub: Subscription | null; | 
|     let syncResub = false; | 
|     let errors$: Subject<any>; | 
|   | 
|     const subscribeForRetryWhen = () => { | 
|       innerSub = source.subscribe( | 
|         createOperatorSubscriber(subscriber, undefined, undefined, (err) => { | 
|           if (!errors$) { | 
|             errors$ = new Subject(); | 
|             innerFrom(notifier(errors$)).subscribe( | 
|               createOperatorSubscriber(subscriber, () => | 
|                 // If we have an innerSub, this was an asynchronous call, kick off the retry. | 
|                 // Otherwise, if we don't have an innerSub yet, that's because the inner subscription | 
|                 // call hasn't even returned yet. We've arrived here synchronously. | 
|                 // So we flag that we want to resub, such that we can ensure finalization | 
|                 // happens before we resubscribe. | 
|                 innerSub ? subscribeForRetryWhen() : (syncResub = true) | 
|               ) | 
|             ); | 
|           } | 
|           if (errors$) { | 
|             // We have set up the notifier without error. | 
|             errors$.next(err); | 
|           } | 
|         }) | 
|       ); | 
|   | 
|       if (syncResub) { | 
|         // Ensure that the inner subscription is torn down before | 
|         // moving on to the next subscription in the synchronous case. | 
|         // If we don't do this here, all inner subscriptions will not be | 
|         // torn down until the entire observable is done. | 
|         innerSub.unsubscribe(); | 
|         innerSub = null; | 
|         // We may need to do this multiple times, so reset the flag. | 
|         syncResub = false; | 
|         // Resubscribe | 
|         subscribeForRetryWhen(); | 
|       } | 
|     }; | 
|   | 
|     // Start the subscription | 
|     subscribeForRetryWhen(); | 
|   }); | 
| } |