import { Observable } from '../Observable';
|
import { AsyncSubject } from '../AsyncSubject';
|
import { ConnectableObservable } from '../observable/ConnectableObservable';
|
import { UnaryFunction } from '../types';
|
|
/**
|
* Returns a connectable observable sequence that shares a single subscription to the
|
* underlying sequence containing only the last notification.
|
*
|
* 
|
*
|
* Similar to {@link publish}, but it waits until the source observable completes and stores
|
* the last emitted value.
|
* Similarly to {@link publishReplay} and {@link publishBehavior}, this keeps storing the last
|
* value even if it has no more subscribers. If subsequent subscriptions happen, they will
|
* immediately get that last stored value and complete.
|
*
|
* ## Example
|
*
|
* ```ts
|
* import { ConnectableObservable, interval, publishLast, tap, take } from 'rxjs';
|
*
|
* const connectable = <ConnectableObservable<number>>interval(1000)
|
* .pipe(
|
* tap(x => console.log('side effect', x)),
|
* take(3),
|
* publishLast()
|
* );
|
*
|
* connectable.subscribe({
|
* next: x => console.log('Sub. A', x),
|
* error: err => console.log('Sub. A Error', err),
|
* complete: () => console.log('Sub. A Complete')
|
* });
|
*
|
* connectable.subscribe({
|
* next: x => console.log('Sub. B', x),
|
* error: err => console.log('Sub. B Error', err),
|
* complete: () => console.log('Sub. B Complete')
|
* });
|
*
|
* connectable.connect();
|
*
|
* // Results:
|
* // 'side effect 0' - after one second
|
* // 'side effect 1' - after two seconds
|
* // 'side effect 2' - after three seconds
|
* // 'Sub. A 2' - immediately after 'side effect 2'
|
* // 'Sub. B 2'
|
* // 'Sub. A Complete'
|
* // 'Sub. B Complete'
|
* ```
|
*
|
* @see {@link ConnectableObservable}
|
* @see {@link publish}
|
* @see {@link publishReplay}
|
* @see {@link publishBehavior}
|
*
|
* @return A function that returns an Observable that emits elements of a
|
* sequence produced by multicasting the source sequence.
|
* @deprecated Will be removed in v8. To create a connectable observable with an
|
* {@link AsyncSubject} under the hood, use {@link connectable}.
|
* `source.pipe(publishLast())` is equivalent to
|
* `connectable(source, { connector: () => new AsyncSubject(), resetOnDisconnect: false })`.
|
* If you're using {@link refCount} after `publishLast`, use the {@link share} operator instead.
|
* `source.pipe(publishLast(), refCount())` is equivalent to
|
* `source.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))`.
|
* Details: https://rxjs.dev/deprecations/multicasting
|
*/
|
export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
|
// Note that this has *never* supported a selector function like `publish` and `publishReplay`.
|
return (source) => {
|
const subject = new AsyncSubject<T>();
|
return new ConnectableObservable(source, () => subject);
|
};
|
}
|