| import { Subscription } from '../Subscription'; | 
| import { OperatorFunction, SchedulerLike } from '../types'; | 
| import { operate } from '../util/lift'; | 
| import { createOperatorSubscriber } from './OperatorSubscriber'; | 
| import { arrRemove } from '../util/arrRemove'; | 
| import { asyncScheduler } from '../scheduler/async'; | 
| import { popScheduler } from '../util/args'; | 
| import { executeSchedule } from '../util/executeSchedule'; | 
|   | 
| /* tslint:disable:max-line-length */ | 
| export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>; | 
| export function bufferTime<T>( | 
|   bufferTimeSpan: number, | 
|   bufferCreationInterval: number | null | undefined, | 
|   scheduler?: SchedulerLike | 
| ): OperatorFunction<T, T[]>; | 
| export function bufferTime<T>( | 
|   bufferTimeSpan: number, | 
|   bufferCreationInterval: number | null | undefined, | 
|   maxBufferSize: number, | 
|   scheduler?: SchedulerLike | 
| ): OperatorFunction<T, T[]>; | 
| /* tslint:enable:max-line-length */ | 
|   | 
| /** | 
|  * Buffers the source Observable values for a specific time period. | 
|  * | 
|  * <span class="informal">Collects values from the past as an array, and emits | 
|  * those arrays periodically in time.</span> | 
|  * | 
|  *  | 
|  * | 
|  * Buffers values from the source for a specific time duration `bufferTimeSpan`. | 
|  * Unless the optional argument `bufferCreationInterval` is given, it emits and | 
|  * resets the buffer every `bufferTimeSpan` milliseconds. If | 
|  * `bufferCreationInterval` is given, this operator opens the buffer every | 
|  * `bufferCreationInterval` milliseconds and closes (emits and resets) the | 
|  * buffer every `bufferTimeSpan` milliseconds. When the optional argument | 
|  * `maxBufferSize` is specified, the buffer will be closed either after | 
|  * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements. | 
|  * | 
|  * ## Examples | 
|  * | 
|  * Every second, emit an array of the recent click events | 
|  * | 
|  * ```ts | 
|  * import { fromEvent, bufferTime } from 'rxjs'; | 
|  * | 
|  * const clicks = fromEvent(document, 'click'); | 
|  * const buffered = clicks.pipe(bufferTime(1000)); | 
|  * buffered.subscribe(x => console.log(x)); | 
|  * ``` | 
|  * | 
|  * Every 5 seconds, emit the click events from the next 2 seconds | 
|  * | 
|  * ```ts | 
|  * import { fromEvent, bufferTime } from 'rxjs'; | 
|  * | 
|  * const clicks = fromEvent(document, 'click'); | 
|  * const buffered = clicks.pipe(bufferTime(2000, 5000)); | 
|  * buffered.subscribe(x => console.log(x)); | 
|  * ``` | 
|  * | 
|  * @see {@link buffer} | 
|  * @see {@link bufferCount} | 
|  * @see {@link bufferToggle} | 
|  * @see {@link bufferWhen} | 
|  * @see {@link windowTime} | 
|  * | 
|  * @param {number} bufferTimeSpan The amount of time to fill each buffer array. | 
|  * @param {number} [bufferCreationInterval] The interval at which to start new | 
|  * buffers. | 
|  * @param {number} [maxBufferSize] The maximum buffer size. | 
|  * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the | 
|  * intervals that determine buffer boundaries. | 
|  * @return A function that returns an Observable of arrays of buffered values. | 
|  */ | 
| export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): OperatorFunction<T, T[]> { | 
|   const scheduler = popScheduler(otherArgs) ?? asyncScheduler; | 
|   const bufferCreationInterval = (otherArgs[0] as number) ?? null; | 
|   const maxBufferSize = (otherArgs[1] as number) || Infinity; | 
|   | 
|   return operate((source, subscriber) => { | 
|     // The active buffers, their related subscriptions, and removal functions. | 
|     let bufferRecords: { buffer: T[]; subs: Subscription }[] | null = []; | 
|     // If true, it means that every time we emit a buffer, we want to start a new buffer | 
|     // this is only really used for when *just* the buffer time span is passed. | 
|     let restartOnEmit = false; | 
|   | 
|     /** | 
|      * Does the work of emitting the buffer from the record, ensuring that the | 
|      * record is removed before the emission so reentrant code (from some custom scheduling, perhaps) | 
|      * does not alter the buffer. Also checks to see if a new buffer needs to be started | 
|      * after the emit. | 
|      */ | 
|     const emit = (record: { buffer: T[]; subs: Subscription }) => { | 
|       const { buffer, subs } = record; | 
|       subs.unsubscribe(); | 
|       arrRemove(bufferRecords, record); | 
|       subscriber.next(buffer); | 
|       restartOnEmit && startBuffer(); | 
|     }; | 
|   | 
|     /** | 
|      * Called every time we start a new buffer. This does | 
|      * the work of scheduling a job at the requested bufferTimeSpan | 
|      * that will emit the buffer (if it's not unsubscribed before then). | 
|      */ | 
|     const startBuffer = () => { | 
|       if (bufferRecords) { | 
|         const subs = new Subscription(); | 
|         subscriber.add(subs); | 
|         const buffer: T[] = []; | 
|         const record = { | 
|           buffer, | 
|           subs, | 
|         }; | 
|         bufferRecords.push(record); | 
|         executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan); | 
|       } | 
|     }; | 
|   | 
|     if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { | 
|       // The user passed both a bufferTimeSpan (required), and a creation interval | 
|       // That means we need to start new buffers on the interval, and those buffers need | 
|       // to wait the required time span before emitting. | 
|       executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true); | 
|     } else { | 
|       restartOnEmit = true; | 
|     } | 
|   | 
|     startBuffer(); | 
|   | 
|     const bufferTimeSubscriber = createOperatorSubscriber( | 
|       subscriber, | 
|       (value: T) => { | 
|         // Copy the records, so if we need to remove one we | 
|         // don't mutate the array. It's hard, but not impossible to | 
|         // set up a buffer time that could mutate the array and | 
|         // cause issues here. | 
|         const recordsCopy = bufferRecords!.slice(); | 
|         for (const record of recordsCopy) { | 
|           // Loop over all buffers and | 
|           const { buffer } = record; | 
|           buffer.push(value); | 
|           // If the buffer is over the max size, we need to emit it. | 
|           maxBufferSize <= buffer.length && emit(record); | 
|         } | 
|       }, | 
|       () => { | 
|         // The source completed, emit all of the active | 
|         // buffers we have before we complete. | 
|         while (bufferRecords?.length) { | 
|           subscriber.next(bufferRecords.shift()!.buffer); | 
|         } | 
|         bufferTimeSubscriber?.unsubscribe(); | 
|         subscriber.complete(); | 
|         subscriber.unsubscribe(); | 
|       }, | 
|       // Pass all errors through to consumer. | 
|       undefined, | 
|       // Clean up | 
|       () => (bufferRecords = null) | 
|     ); | 
|   | 
|     source.subscribe(bufferTimeSubscriber); | 
|   }); | 
| } |