import { MonoTypeOperatorFunction, timer, Observable } from 'rxjs';
import { delay as delayOperator, retryWhen, scan, tap, last, first } from 'rxjs/operators';
import { notEmpty } from './utility-functions';

export function retryWithDelay<T>(delay: number, count = 1): MonoTypeOperatorFunction<T> {
  return input =>
    input.pipe(
      retryWhen(errors =>
        errors.pipe(
          scan((acc, error) => ({ count: acc.count + 1, error }), {
            count: 0,
            error: undefined as any
          }),
          tap(current => {
            if (current.count > count) {
              throw current.error;
            }
          }),
          delayOperator(delay)
        )
      )
    );
}

export function poll<T>(delayMs: number, maxAttempts = 5, isComplete = (res: T) => notEmpty(res), emitOnlyLast = true, ignoreErrors = true): MonoTypeOperatorFunction<T> {
  return source$ => {
    const result = new Observable<T>(observer => {
      let retries = 0;
      let complete = false;

      const resub = () => {
        source$.pipe(first()).subscribe({
          next: val => {
            observer.next(val);
            complete = !!isComplete(val);
          },
          error: err => {
            if (!ignoreErrors) {
              observer.error(err);
            }
          }
        });

        if (retries < maxAttempts && !complete) {
          retries++;
          timer(delayMs).subscribe(resub);
        } else if (retries === maxAttempts) {
          observer.complete();
        }
      };
      resub();
    });

    return emitOnlyLast ? result.pipe(last()) : result;
  };
}
