import { Subscriber } from '../Subscriber';
|
|
/**
|
* Creates an instance of an `OperatorSubscriber`.
|
* @param destination The downstream subscriber.
|
* @param onNext Handles next values, only called if this subscriber is not stopped or closed. Any
|
* error that occurs in this function is caught and sent to the `error` method of this subscriber.
|
* @param onError Handles errors from the subscription, any errors that occur in this handler are caught
|
* and send to the `destination` error handler.
|
* @param onComplete Handles completion notification from the subscription. Any errors that occur in
|
* this handler are sent to the `destination` error handler.
|
* @param onFinalize Additional teardown logic here. This will only be called on teardown if the
|
* subscriber itself is not already closed. This is called after all other teardown logic is executed.
|
*/
|
export function createOperatorSubscriber<T>(
|
destination: Subscriber<any>,
|
onNext?: (value: T) => void,
|
onComplete?: () => void,
|
onError?: (err: any) => void,
|
onFinalize?: () => void
|
): Subscriber<T> {
|
return new OperatorSubscriber(destination, onNext, onComplete, onError, onFinalize);
|
}
|
|
/**
|
* A generic helper for allowing operators to be created with a Subscriber and
|
* use closures to capture necessary state from the operator function itself.
|
*/
|
export class OperatorSubscriber<T> extends Subscriber<T> {
|
/**
|
* Creates an instance of an `OperatorSubscriber`.
|
* @param destination The downstream subscriber.
|
* @param onNext Handles next values, only called if this subscriber is not stopped or closed. Any
|
* error that occurs in this function is caught and sent to the `error` method of this subscriber.
|
* @param onError Handles errors from the subscription, any errors that occur in this handler are caught
|
* and send to the `destination` error handler.
|
* @param onComplete Handles completion notification from the subscription. Any errors that occur in
|
* this handler are sent to the `destination` error handler.
|
* @param onFinalize Additional finalization logic here. This will only be called on finalization if the
|
* subscriber itself is not already closed. This is called after all other finalization logic is executed.
|
* @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe.
|
* NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription
|
* to the resulting observable does not actually disconnect from the source if there are active subscriptions
|
* to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!)
|
*/
|
constructor(
|
destination: Subscriber<any>,
|
onNext?: (value: T) => void,
|
onComplete?: () => void,
|
onError?: (err: any) => void,
|
private onFinalize?: () => void,
|
private shouldUnsubscribe?: () => boolean
|
) {
|
// It's important - for performance reasons - that all of this class's
|
// members are initialized and that they are always initialized in the same
|
// order. This will ensure that all OperatorSubscriber instances have the
|
// same hidden class in V8. This, in turn, will help keep the number of
|
// hidden classes involved in property accesses within the base class as
|
// low as possible. If the number of hidden classes involved exceeds four,
|
// the property accesses will become megamorphic and performance penalties
|
// will be incurred - i.e. inline caches won't be used.
|
//
|
// The reasons for ensuring all instances have the same hidden class are
|
// further discussed in this blog post from Benedikt Meurer:
|
// https://benediktmeurer.de/2018/03/23/impact-of-polymorphism-on-component-based-frameworks-like-react/
|
super(destination);
|
this._next = onNext
|
? function (this: OperatorSubscriber<T>, value: T) {
|
try {
|
onNext(value);
|
} catch (err) {
|
destination.error(err);
|
}
|
}
|
: super._next;
|
this._error = onError
|
? function (this: OperatorSubscriber<T>, err: any) {
|
try {
|
onError(err);
|
} catch (err) {
|
// Send any errors that occur down stream.
|
destination.error(err);
|
} finally {
|
// Ensure finalization.
|
this.unsubscribe();
|
}
|
}
|
: super._error;
|
this._complete = onComplete
|
? function (this: OperatorSubscriber<T>) {
|
try {
|
onComplete();
|
} catch (err) {
|
// Send any errors that occur down stream.
|
destination.error(err);
|
} finally {
|
// Ensure finalization.
|
this.unsubscribe();
|
}
|
}
|
: super._complete;
|
}
|
|
unsubscribe() {
|
if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
|
const { closed } = this;
|
super.unsubscribe();
|
// Execute additional teardown if we have any and we didn't already do so.
|
!closed && this.onFinalize?.();
|
}
|
}
|
}
|