import { Subscriber } from '../Subscriber'; 
 | 
import { ObservableInput, OperatorFunction } from '../types'; 
 | 
import { operate } from '../util/lift'; 
 | 
import { noop } from '../util/noop'; 
 | 
import { createOperatorSubscriber } from './OperatorSubscriber'; 
 | 
import { innerFrom } from '../observable/innerFrom'; 
 | 
  
 | 
/** 
 | 
 * Buffers the source Observable values, using a factory function of closing 
 | 
 * Observables to determine when to close, emit, and reset the buffer. 
 | 
 * 
 | 
 * <span class="informal">Collects values from the past as an array. When it 
 | 
 * starts collecting values, it calls a function that returns an Observable that 
 | 
 * tells when to close the buffer and restart collecting.</span> 
 | 
 * 
 | 
 *  
 | 
 * 
 | 
 * Opens a buffer immediately, then closes the buffer when the observable 
 | 
 * returned by calling `closingSelector` function emits a value. When it closes 
 | 
 * the buffer, it immediately opens a new buffer and repeats the process. 
 | 
 * 
 | 
 * ## Example 
 | 
 * 
 | 
 * Emit an array of the last clicks every [1-5] random seconds 
 | 
 * 
 | 
 * ```ts 
 | 
 * import { fromEvent, bufferWhen, interval } from 'rxjs'; 
 | 
 * 
 | 
 * const clicks = fromEvent(document, 'click'); 
 | 
 * const buffered = clicks.pipe( 
 | 
 *   bufferWhen(() => interval(1000 + Math.random() * 4000)) 
 | 
 * ); 
 | 
 * buffered.subscribe(x => console.log(x)); 
 | 
 * ``` 
 | 
 * 
 | 
 * @see {@link buffer} 
 | 
 * @see {@link bufferCount} 
 | 
 * @see {@link bufferTime} 
 | 
 * @see {@link bufferToggle} 
 | 
 * @see {@link windowWhen} 
 | 
 * 
 | 
 * @param {function(): Observable} closingSelector A function that takes no 
 | 
 * arguments and returns an Observable that signals buffer closure. 
 | 
 * @return A function that returns an Observable of arrays of buffered values. 
 | 
 */ 
 | 
export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> { 
 | 
  return operate((source, subscriber) => { 
 | 
    // The buffer we keep and emit. 
 | 
    let buffer: T[] | null = null; 
 | 
    // A reference to the subscriber used to subscribe to 
 | 
    // the closing notifier. We need to hold this so we can 
 | 
    // end the subscription after the first notification. 
 | 
    let closingSubscriber: Subscriber<T> | null = null; 
 | 
  
 | 
    // Ends the previous closing notifier subscription, so it 
 | 
    // terminates after the first emission, then emits 
 | 
    // the current buffer  if there is one, starts a new buffer, and starts a 
 | 
    // new closing notifier. 
 | 
    const openBuffer = () => { 
 | 
      // Make sure to finalize the closing subscription, we only cared 
 | 
      // about one notification. 
 | 
      closingSubscriber?.unsubscribe(); 
 | 
      // emit the buffer if we have one, and start a new buffer. 
 | 
      const b = buffer; 
 | 
      buffer = []; 
 | 
      b && subscriber.next(b); 
 | 
  
 | 
      // Get a new closing notifier and subscribe to it. 
 | 
      innerFrom(closingSelector()).subscribe((closingSubscriber = createOperatorSubscriber(subscriber, openBuffer, noop))); 
 | 
    }; 
 | 
  
 | 
    // Start the first buffer. 
 | 
    openBuffer(); 
 | 
  
 | 
    // Subscribe to our source. 
 | 
    source.subscribe( 
 | 
      createOperatorSubscriber( 
 | 
        subscriber, 
 | 
        // Add every new value to the current buffer. 
 | 
        (value) => buffer?.push(value), 
 | 
        // When we complete, emit the buffer if we have one, 
 | 
        // then complete the result. 
 | 
        () => { 
 | 
          buffer && subscriber.next(buffer); 
 | 
          subscriber.complete(); 
 | 
        }, 
 | 
        // Pass all errors through to consumer. 
 | 
        undefined, 
 | 
        // Release memory on finalization 
 | 
        () => (buffer = closingSubscriber = null!) 
 | 
      ) 
 | 
    ); 
 | 
  }); 
 | 
} 
 |