import {EventEmitter, Injectable} from "@angular/core";
import {BehaviorSubject, combineLatest, forkJoin, Observable, of} from "rxjs";
import {PageInterface} from "./interfaces/data/page.interface";
import {PaginationInterface} from "./interfaces/data/pagination.interface";
import {FilterInterface} from "./interfaces/data/filter.interface";
import {SortInterface} from "./interfaces/data/sort.interface";
import {RestPaginationModel} from "./rest-pagination.model";
import {RestFilterModel} from "./rest-filter.model";
import {RestSortModel} from "./rest-sort.model";
import {
  concatMap,
  debounceTime,
  distinctUntilChanged,
  filter,
  map,
  skip,
  switchMap,
  tap,
  withLatestFrom,
} from "rxjs/operators";
import {BaseRestService} from "./base-rest.service";
import {RestPage} from "./rest-page";


@Injectable()
export class DataSourceService<T> {

  // Work with data
  data:             Observable<T[]>;

  pages:            BehaviorSubject<PageInterface<T>[]>;

  currentPage:      Observable<number>;
  totalPages:       BehaviorSubject<number | null>;
  // fetchPage:     EventEmitter<number>;
  fetchNextPage:    EventEmitter<void>;

  loading:          Observable<boolean>;

  hasNextPage:      Observable<boolean>;


  // Items which force send request to server, and invalidate current data (filter, sort)
  pagination:       PaginationInterface;
  filter:           BehaviorSubject<FilterInterface>;
  sort:             BehaviorSubject<SortInterface>;


  get service() {
    return this._service;
  }

  constructor(
    protected _service: BaseRestService<T, T, T, T, T>
  ) {
    this.pagination = new RestPaginationModel();
    this.filter     = new BehaviorSubject<FilterInterface>(new RestFilterModel({}));
    this.sort       = new BehaviorSubject<SortInterface>(new RestSortModel());

    this.pages  = new BehaviorSubject<PageInterface<T>[]>([]);

    this.fetchNextPage = new EventEmitter();
    this.hasNextPage   = new BehaviorSubject<boolean>(true);
    this.totalPages    = new BehaviorSubject<number | null>(null);

    this.service.dataChanged.subscribe(() => {
      this.pages.next([]);
      this.fetchNextPage.emit();
    });


    // Calculate next page
    this.currentPage = this.pages.pipe(
      switchMap((pages) => {
        return of(pages.reduce<number>((acc, p) => p.pagination.page > acc ? p.pagination.page : acc, 0));
      })
    );

    // Calculate if present any pending request
    this.loading = this.pages.pipe(
      switchMap(pages => combineLatest(pages.map(p => p.loading))),
      map(loadings => loadings.some(i => i === true))
    );

    // Check last page observable if next page exists
    this.hasNextPage = this.pages.pipe(
      switchMap((pages) => {
        return combineLatest([
          this.currentPage,
          pages.length ? pages[pages.length - 1].totalPages : of(null)
        ]).pipe(
          map(([current, totalPagesLast]) => {
            return totalPagesLast === null || totalPagesLast >= current + 1;
          })
        )
      }),
      distinctUntilChanged()
    );

    // Store request in pages observable
    this.fetchNextPage.pipe(
      withLatestFrom(this.hasNextPage),
      // filter(([,hasNexPage]) => hasNexPage),
      withLatestFrom(this.currentPage, this.filter, this.sort),
      concatMap(
        ([, page, filter, sort]) => {
          return of(
            new RestPage<T>(
              this._service.list.bind(this._service),
              new RestPaginationModel(page+1, this.pagination.perPage, this.pagination.pageParam, this.pagination.perPageParam),
              filter,
              sort
            )
          );
        }
      ),
      withLatestFrom(this.pages)
    ).subscribe(([page, pages]) => {
      page.loading.next(true);
      this.pages.next([...pages, page]);
    });

    // Transfer pages requests into data, perform http request
    this.data = this.pages.pipe(
      switchMap(
        (pages) => {
          return forkJoin(
            pages.reduce<Observable<T[]>[]>((acc, page) => [...acc, page.data], [])
          ).pipe(
            tap(() => pages.forEach(p => p.loading.next(false))),
          )
        }
      ),
      switchMap(d => of(d.reduce<T[]>((acc, i) => [...acc, ...i], []))),
      // share()
    );

    // Drop request data if filter accepted
    this.filter.pipe(
      debounceTime(300),
      skip(1),
      // distinctUntilChanged()
    ).subscribe(() => {
      this.service.dataChanged.emit()
    });

    // this.fetchNextPage.emit();

    this.currentPage.pipe(
      debounceTime(300)
    ).subscribe(
      p => p === 0 ? this.fetchNextPage.emit() : null
    )

  }
}
