import { EMPTY } from '../observable/empty';
|
import { MonoTypeOperatorFunction } from '../types';
|
import { operate } from '../util/lift';
|
import { createOperatorSubscriber } from './OperatorSubscriber';
|
|
/**
|
* Waits for the source to complete, then emits the last N values from the source,
|
* as specified by the `count` argument.
|
*
|
* 
|
*
|
* `takeLast` results in an observable that will hold values up to `count` values in memory,
|
* until the source completes. It then pushes all values in memory to the consumer, in the
|
* order they were received from the source, then notifies the consumer that it is
|
* complete.
|
*
|
* If for some reason the source completes before the `count` supplied to `takeLast` is reached,
|
* all values received until that point are emitted, and then completion is notified.
|
*
|
* **Warning**: Using `takeLast` with an observable that never completes will result
|
* in an observable that never emits a value.
|
*
|
* ## Example
|
*
|
* Take the last 3 values of an Observable with many values
|
*
|
* ```ts
|
* import { range, takeLast } from 'rxjs';
|
*
|
* const many = range(1, 100);
|
* const lastThree = many.pipe(takeLast(3));
|
* lastThree.subscribe(x => console.log(x));
|
* ```
|
*
|
* @see {@link take}
|
* @see {@link takeUntil}
|
* @see {@link takeWhile}
|
* @see {@link skip}
|
*
|
* @param count The maximum number of values to emit from the end of
|
* the sequence of values emitted by the source Observable.
|
* @return A function that returns an Observable that emits at most the last
|
* `count` values emitted by the source Observable.
|
*/
|
export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
|
return count <= 0
|
? () => EMPTY
|
: operate((source, subscriber) => {
|
// This buffer will hold the values we are going to emit
|
// when the source completes. Since we only want to take the
|
// last N values, we can't emit until we're sure we're not getting
|
// any more values.
|
let buffer: T[] = [];
|
source.subscribe(
|
createOperatorSubscriber(
|
subscriber,
|
(value) => {
|
// Add the most recent value onto the end of our buffer.
|
buffer.push(value);
|
// If our buffer is now larger than the number of values we
|
// want to take, we remove the oldest value from the buffer.
|
count < buffer.length && buffer.shift();
|
},
|
() => {
|
// The source completed, we now know what are last values
|
// are, emit them in the order they were received.
|
for (const value of buffer) {
|
subscriber.next(value);
|
}
|
subscriber.complete();
|
},
|
// Errors are passed through to the consumer
|
undefined,
|
() => {
|
// During finalization release the values in our buffer.
|
buffer = null!;
|
}
|
)
|
);
|
});
|
}
|