import { Subscription } from '../Subscription'; 
 | 
import { OperatorFunction, ObservableInput } from '../types'; 
 | 
import { operate } from '../util/lift'; 
 | 
import { innerFrom } from '../observable/innerFrom'; 
 | 
import { createOperatorSubscriber } from './OperatorSubscriber'; 
 | 
import { noop } from '../util/noop'; 
 | 
import { arrRemove } from '../util/arrRemove'; 
 | 
  
 | 
/** 
 | 
 * Buffers the source Observable values starting from an emission from 
 | 
 * `openings` and ending when the output of `closingSelector` emits. 
 | 
 * 
 | 
 * <span class="informal">Collects values from the past as an array. Starts 
 | 
 * collecting only when `opening` emits, and calls the `closingSelector` 
 | 
 * function to get an Observable that tells when to close the buffer.</span> 
 | 
 * 
 | 
 *  
 | 
 * 
 | 
 * Buffers values from the source by opening the buffer via signals from an 
 | 
 * Observable provided to `openings`, and closing and sending the buffers when 
 | 
 * a Subscribable or Promise returned by the `closingSelector` function emits. 
 | 
 * 
 | 
 * ## Example 
 | 
 * 
 | 
 * Every other second, emit the click events from the next 500ms 
 | 
 * 
 | 
 * ```ts 
 | 
 * import { fromEvent, interval, bufferToggle, EMPTY } from 'rxjs'; 
 | 
 * 
 | 
 * const clicks = fromEvent(document, 'click'); 
 | 
 * const openings = interval(1000); 
 | 
 * const buffered = clicks.pipe(bufferToggle(openings, i => 
 | 
 *   i % 2 ? interval(500) : EMPTY 
 | 
 * )); 
 | 
 * buffered.subscribe(x => console.log(x)); 
 | 
 * ``` 
 | 
 * 
 | 
 * @see {@link buffer} 
 | 
 * @see {@link bufferCount} 
 | 
 * @see {@link bufferTime} 
 | 
 * @see {@link bufferWhen} 
 | 
 * @see {@link windowToggle} 
 | 
 * 
 | 
 * @param openings A Subscribable or Promise of notifications to start new 
 | 
 * buffers. 
 | 
 * @param closingSelector A function that takes 
 | 
 * the value emitted by the `openings` observable and returns a Subscribable or Promise, 
 | 
 * which, when it emits, signals that the associated buffer should be emitted 
 | 
 * and cleared. 
 | 
 * @return A function that returns an Observable of arrays of buffered values. 
 | 
 */ 
 | 
export function bufferToggle<T, O>( 
 | 
  openings: ObservableInput<O>, 
 | 
  closingSelector: (value: O) => ObservableInput<any> 
 | 
): OperatorFunction<T, T[]> { 
 | 
  return operate((source, subscriber) => { 
 | 
    const buffers: T[][] = []; 
 | 
  
 | 
    // Subscribe to the openings notifier first 
 | 
    innerFrom(openings).subscribe( 
 | 
      createOperatorSubscriber( 
 | 
        subscriber, 
 | 
        (openValue) => { 
 | 
          const buffer: T[] = []; 
 | 
          buffers.push(buffer); 
 | 
          // We use this composite subscription, so that 
 | 
          // when the closing notifier emits, we can tear it down. 
 | 
          const closingSubscription = new Subscription(); 
 | 
  
 | 
          const emitBuffer = () => { 
 | 
            arrRemove(buffers, buffer); 
 | 
            subscriber.next(buffer); 
 | 
            closingSubscription.unsubscribe(); 
 | 
          }; 
 | 
  
 | 
          // The line below will add the subscription to the parent subscriber *and* the closing subscription. 
 | 
          closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(createOperatorSubscriber(subscriber, emitBuffer, noop))); 
 | 
        }, 
 | 
        noop 
 | 
      ) 
 | 
    ); 
 | 
  
 | 
    source.subscribe( 
 | 
      createOperatorSubscriber( 
 | 
        subscriber, 
 | 
        (value) => { 
 | 
          // Value from our source. Add it to all pending buffers. 
 | 
          for (const buffer of buffers) { 
 | 
            buffer.push(value); 
 | 
          } 
 | 
        }, 
 | 
        () => { 
 | 
          // Source complete. Emit all pending buffers. 
 | 
          while (buffers.length > 0) { 
 | 
            subscriber.next(buffers.shift()!); 
 | 
          } 
 | 
          subscriber.complete(); 
 | 
        } 
 | 
      ) 
 | 
    ); 
 | 
  }); 
 | 
} 
 |