import {Observable, OperatorFunction} from 'rxjs';
import {finalize, map} from 'rxjs/operators';

interface ITimedItem<T> {
  item: T;
  timestamp: number;
}

/**
 * Buffer input messages in some floating window.
 *
 * This method remember messages within `timeWindowMs` interval and emits list of messages emitted
 * in interval [now - timeWindowMs; now].
 * Additionally, you can provide notifier to reset cached messages when notifier emit the message.
 *
 * @param timeWindowMs - Size on the window in ms.
 * @param resetNotifier$ - Stream to reset cached messages.
 * @returns Operator function.
 */
export function bufferFloatingWindow<T>(
  timeWindowMs: number,
  resetNotifier$?: Observable<unknown>,
): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => {
    let items: ITimedItem<T>[] = [];

    const resetSub = resetNotifier$?.subscribe(() => (items = []));

    return source.pipe(
      map(value => {
        const timestamp = Date.now();

        items.push({item: value, timestamp});
        items = items.filter(timedItem => timedItem.timestamp >= timestamp - timeWindowMs);
        return items.map(i => i.item);
      }),
      finalize(() => resetSub?.unsubscribe()),
    );
  };
}
