import { Subscription } from '../Subscription';
|
import { OperatorFunction, SchedulerLike } from '../types';
|
import { operate } from '../util/lift';
|
import { createOperatorSubscriber } from './OperatorSubscriber';
|
import { arrRemove } from '../util/arrRemove';
|
import { asyncScheduler } from '../scheduler/async';
|
import { popScheduler } from '../util/args';
|
import { executeSchedule } from '../util/executeSchedule';
|
|
/* tslint:disable:max-line-length */
|
export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
|
export function bufferTime<T>(
|
bufferTimeSpan: number,
|
bufferCreationInterval: number | null | undefined,
|
scheduler?: SchedulerLike
|
): OperatorFunction<T, T[]>;
|
export function bufferTime<T>(
|
bufferTimeSpan: number,
|
bufferCreationInterval: number | null | undefined,
|
maxBufferSize: number,
|
scheduler?: SchedulerLike
|
): OperatorFunction<T, T[]>;
|
/* tslint:enable:max-line-length */
|
|
/**
|
* Buffers the source Observable values for a specific time period.
|
*
|
* <span class="informal">Collects values from the past as an array, and emits
|
* those arrays periodically in time.</span>
|
*
|
* 
|
*
|
* Buffers values from the source for a specific time duration `bufferTimeSpan`.
|
* Unless the optional argument `bufferCreationInterval` is given, it emits and
|
* resets the buffer every `bufferTimeSpan` milliseconds. If
|
* `bufferCreationInterval` is given, this operator opens the buffer every
|
* `bufferCreationInterval` milliseconds and closes (emits and resets) the
|
* buffer every `bufferTimeSpan` milliseconds. When the optional argument
|
* `maxBufferSize` is specified, the buffer will be closed either after
|
* `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
|
*
|
* ## Examples
|
*
|
* Every second, emit an array of the recent click events
|
*
|
* ```ts
|
* import { fromEvent, bufferTime } from 'rxjs';
|
*
|
* const clicks = fromEvent(document, 'click');
|
* const buffered = clicks.pipe(bufferTime(1000));
|
* buffered.subscribe(x => console.log(x));
|
* ```
|
*
|
* Every 5 seconds, emit the click events from the next 2 seconds
|
*
|
* ```ts
|
* import { fromEvent, bufferTime } from 'rxjs';
|
*
|
* const clicks = fromEvent(document, 'click');
|
* const buffered = clicks.pipe(bufferTime(2000, 5000));
|
* buffered.subscribe(x => console.log(x));
|
* ```
|
*
|
* @see {@link buffer}
|
* @see {@link bufferCount}
|
* @see {@link bufferToggle}
|
* @see {@link bufferWhen}
|
* @see {@link windowTime}
|
*
|
* @param {number} bufferTimeSpan The amount of time to fill each buffer array.
|
* @param {number} [bufferCreationInterval] The interval at which to start new
|
* buffers.
|
* @param {number} [maxBufferSize] The maximum buffer size.
|
* @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the
|
* intervals that determine buffer boundaries.
|
* @return A function that returns an Observable of arrays of buffered values.
|
*/
|
export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): OperatorFunction<T, T[]> {
|
const scheduler = popScheduler(otherArgs) ?? asyncScheduler;
|
const bufferCreationInterval = (otherArgs[0] as number) ?? null;
|
const maxBufferSize = (otherArgs[1] as number) || Infinity;
|
|
return operate((source, subscriber) => {
|
// The active buffers, their related subscriptions, and removal functions.
|
let bufferRecords: { buffer: T[]; subs: Subscription }[] | null = [];
|
// If true, it means that every time we emit a buffer, we want to start a new buffer
|
// this is only really used for when *just* the buffer time span is passed.
|
let restartOnEmit = false;
|
|
/**
|
* Does the work of emitting the buffer from the record, ensuring that the
|
* record is removed before the emission so reentrant code (from some custom scheduling, perhaps)
|
* does not alter the buffer. Also checks to see if a new buffer needs to be started
|
* after the emit.
|
*/
|
const emit = (record: { buffer: T[]; subs: Subscription }) => {
|
const { buffer, subs } = record;
|
subs.unsubscribe();
|
arrRemove(bufferRecords, record);
|
subscriber.next(buffer);
|
restartOnEmit && startBuffer();
|
};
|
|
/**
|
* Called every time we start a new buffer. This does
|
* the work of scheduling a job at the requested bufferTimeSpan
|
* that will emit the buffer (if it's not unsubscribed before then).
|
*/
|
const startBuffer = () => {
|
if (bufferRecords) {
|
const subs = new Subscription();
|
subscriber.add(subs);
|
const buffer: T[] = [];
|
const record = {
|
buffer,
|
subs,
|
};
|
bufferRecords.push(record);
|
executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan);
|
}
|
};
|
|
if (bufferCreationInterval !== null && bufferCreationInterval >= 0) {
|
// The user passed both a bufferTimeSpan (required), and a creation interval
|
// That means we need to start new buffers on the interval, and those buffers need
|
// to wait the required time span before emitting.
|
executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true);
|
} else {
|
restartOnEmit = true;
|
}
|
|
startBuffer();
|
|
const bufferTimeSubscriber = createOperatorSubscriber(
|
subscriber,
|
(value: T) => {
|
// Copy the records, so if we need to remove one we
|
// don't mutate the array. It's hard, but not impossible to
|
// set up a buffer time that could mutate the array and
|
// cause issues here.
|
const recordsCopy = bufferRecords!.slice();
|
for (const record of recordsCopy) {
|
// Loop over all buffers and
|
const { buffer } = record;
|
buffer.push(value);
|
// If the buffer is over the max size, we need to emit it.
|
maxBufferSize <= buffer.length && emit(record);
|
}
|
},
|
() => {
|
// The source completed, emit all of the active
|
// buffers we have before we complete.
|
while (bufferRecords?.length) {
|
subscriber.next(bufferRecords.shift()!.buffer);
|
}
|
bufferTimeSubscriber?.unsubscribe();
|
subscriber.complete();
|
subscriber.unsubscribe();
|
},
|
// Pass all errors through to consumer.
|
undefined,
|
// Clean up
|
() => (bufferRecords = null)
|
);
|
|
source.subscribe(bufferTimeSubscriber);
|
});
|
}
|