import {Injectable, Injector, OnDestroy} from '@angular/core';
import {SECONDS_IN_MINUTE} from '@app/core/constants/common';
import {GridsterProviders} from '@app/core/models/gridster';
import {betterThrottle} from '@app/core/utils/better-throttle';
import {IDateRange} from '@app/trading-board/components/orders-filter/orders-filter';
import {ICreateOrderResponse} from '@app/trading-board/enum/b2trader/create-order-response';
import {ICreatedStopOrderResponse} from '@app/trading-board/enum/b2trader/created-stop-order-response';
import {EEventHandlersRequest} from '@app/trading-board/event-handlers/event-handlers.enum';
import {IChartHistoryRequest} from '@app/trading-board/interfaces/b2trader/chart-history-request';
import {IHistoryOrdersRequest} from '@app/trading-board/interfaces/b2trader/history-orders-request';
import {IOrderCreateRequest} from '@app/trading-board/interfaces/b2trader/order-create-request';
import {IPublicLoginDialogData} from '@app/trading-board/interfaces/b2trader/public-login-dialog-data';
import {IServerTimeInfo} from '@app/trading-board/interfaces/b2trader/server-time-info';
import {ISubscribeChartRequest} from '@app/trading-board/interfaces/b2trader/subscribe-chart-request';
import {IDataEvent} from '@app/trading-board/interfaces/data-event.ts';
import {IFacadeSubscription} from '@app/trading-board/interfaces/facade-subscription';
import {IReplaySubjectWithCount} from '@app/trading-board/interfaces/replay-subject-with-count';
import {ISingleRequestMessage} from '@app/trading-board/interfaces/single-request-message';
import {IStreamWithRefCount} from '@app/trading-board/interfaces/stream-with-ref-count';
import {Asset} from '@app/trading-board/models/b2trader/asset';
import {B2TraderOrder} from '@app/trading-board/models/b2trader/b2-trader-order';
import {B2traderStopOrder} from '@app/trading-board/models/b2trader/b2trader-stop-order';
import {Balance} from '@app/trading-board/models/b2trader/balance';
import {ChartRequest} from '@app/trading-board/models/b2trader/chart-request';
import {MarketFee} from '@app/trading-board/models/b2trader/market-fee';
import {B2TraderInstrument} from '@app/trading-board/models/instrument';
import {Level1Collector} from '@app/trading-board/models/level1-collector';
import {Trade} from '@app/trading-board/models/trade';
import {ApiLibraryApiOrderInfoRequest, ApiLibraryApiOrderInfoResponse} from '@b2broker/b2trader-trade-models';
import {ResolutionString} from '@b2broker/trading.view.charting.library/charting_library/datafeed-api';
import {plainToClass} from 'class-transformer';
import {nanoid} from 'nanoid';
import {BehaviorSubject, Observable, ReplaySubject, Subject, firstValueFrom} from 'rxjs';
import {bufferCount, filter, finalize, map, switchMap, take, takeUntil} from 'rxjs/operators';

import {EConnectionLevel} from '../enum/connection-level';
import {
  EB2TraderEventHandlersReceived,
  EB2TraderEventHandlersRequest,
} from '../event-handlers/b2trader/b2trader-event-handlers.enum';
import {IHistoryOrders} from '../interfaces/b2trader/history-orders';
import {IInstrument} from '../interfaces/instrument';
import {IBarsHistoryRequest} from '../interfaces/moex/bars-history-request.interface';
import {Bar} from '../models/bar';
import {Level2} from '../models/level2';
import {WorkerConnectionService} from '../services/worker-connection.service';
import {ATradeDatafeed} from './trade-datafeed.abstract';

import StatusEnum = ApiLibraryApiOrderInfoResponse.StatusEnum;

@Injectable()
export class B2TraderDatafeedService extends ATradeDatafeed implements OnDestroy {
  private readonly destroyer$ = new Subject<void>();

  private readonly sortOrdersByTimeFn = <T extends {time: Date}>(orders: Map<number, T>): T[] =>
    Array.from(orders.values()).sort((a, b) => (a.time < b.time ? 1 : -1));

  public get provider(): GridsterProviders {
    return GridsterProviders.b2trader;
  }

  private readonly level2Ids = new Map<string, string>();

  protected static sharedWorkerPath = '/assets/b2trader.shared-worker.js';
  protected static workerPath = '/assets/b2trader.web-worker.js';

  public balances$ = new BehaviorSubject<Balance[]>([]);

  public instruments$ = new BehaviorSubject<B2TraderInstrument[]>([]);
  public level2: Map<string, IStreamWithRefCount<Level2>> = new Map();

  public readonly level1$ = new BehaviorSubject<Level1Collector>(new Level1Collector());
  public readonly orders$ = new BehaviorSubject<Map<number, B2TraderOrder>>(new Map());
  public readonly stopOrders$ = new BehaviorSubject<Map<number, B2traderStopOrder>>(new Map());
  public readonly ordersValues$ = this.orders$.pipe(map(orders => this.sortOrdersByTimeFn(orders)));
  public readonly stopOrdersValues$ = this.stopOrders$.pipe(map(orders => this.sortOrdersByTimeFn(orders)));
  public readonly trades = new Map<string, IStreamWithRefCount<Trade[], BehaviorSubject<Trade[]>>>();
  public readonly assets$ = new BehaviorSubject<Asset[]>([]);
  public readonly serverTimeInfo$ = new BehaviorSubject<IServerTimeInfo | null>(null);

  public isInitialized = false;

  public chartDataTopics = new Map<string, IReplaySubjectWithCount<Bar>>();
  public tiers$ = new BehaviorSubject(new Map<string, MarketFee>());

  constructor(private readonly injector: Injector, private readonly workerConnectionService: WorkerConnectionService) {
    super();
  }

  public hasConnect(): boolean {
    return this.isInitialized;
  }

  public initialize(level: EConnectionLevel = EConnectionLevel.Main): void {
    this.workerConnectionService.initialize({
      level,
      webWorkerPath: B2TraderDatafeedService.workerPath,
      sharedWorkerPath: B2TraderDatafeedService.sharedWorkerPath,
    });

    this.isInitialized = true;
    this.workerConnectionService.messages$
      .pipe(
        filter(m => m.type === EB2TraderEventHandlersReceived.Connected),
        take(1),
        takeUntil(this.destroyer$),
      )
      .subscribe(() => {
        this.finishedInitialize$.next();
      });
  }

  public afterConnect(): void {
    if (this.level2Ids.size) {
      this.workerConnectionService.sendMessage<IFacadeSubscription>({
        type: EB2TraderEventHandlersRequest.Book,
        payload: {symbolsToAdd: Array.from(this.level2Ids.values()), symbolsToRemove: []},
      });
    }
  }

  public terminate(): void {
    this.workerConnectionService.terminate();

    B2TraderDatafeedService.clearMapSubject(this.trades);

    this.isInitialized = false;
    this.serverTimeInfo$.next(null);
    this.instruments$.next([]);
    this.balances$.next([]);
    this.assets$.next([]);

    this.level1$.next(new Level1Collector());
    this.orders$.next(new Map());
    this.stopOrders$.next(new Map());
    this.chartDataTopics.clear();
    this.tiers$.next(new Map());
  }

  public getLevel2({id, symbolWithSeparator, priceScale, amountScale}: IInstrument): Observable<Level2> {
    let obj = this.level2.get(symbolWithSeparator);

    if (!obj) {
      const level2: Level2 = plainToClass(Level2, {
        asks: [],
        bids: [],
        symbolWithSeparator,
        priceScale,
        amountScale,
      });

      obj = {subscribers: 0, stream: new BehaviorSubject(level2)};

      this.level2.set(symbolWithSeparator, obj);
      this.level2Ids.set(symbolWithSeparator, id.toLowerCase());
      this.workerConnectionService.sendMessage<IFacadeSubscription>({
        type: EB2TraderEventHandlersRequest.Book,
        payload: {symbolsToAdd: [id.toLowerCase()], symbolsToRemove: []},
      });
    }

    obj.subscribers++;

    return obj.stream.pipe(
      betterThrottle(ATradeDatafeed.throttleTime),
      finalize(() => {
        obj.subscribers--;

        if (obj.subscribers < 1) {
          this.level2.delete(symbolWithSeparator);
          this.level2Ids.delete(symbolWithSeparator);

          this.workerConnectionService.sendMessage<IFacadeSubscription>({
            type: EB2TraderEventHandlersRequest.Book,
            payload: {symbolsToRemove: [id.toLowerCase()], symbolsToAdd: []},
          });
        }
      }),
    );
  }

  public getBar$(symbol: string, resolution: ResolutionString): Observable<Bar> {
    if (!this.chartDataTopics.get(symbol)) {
      this.chartDataTopics.set(symbol, {
        count: 0,
        subject: new ReplaySubject<Bar>(),
      });

      this.workerConnectionService.sendMessage<ISubscribeChartRequest>({
        type: EB2TraderEventHandlersRequest.SubscribeChart,
        payload: {
          symbol,
          resolution,
        },
      });
    }

    const subscription = this.chartDataTopics.get(symbol);

    subscription.count++;

    return subscription.subject.pipe(
      filter(v => !!v),
      finalize(() => {
        subscription.count--;

        if (!subscription.count) {
          this.workerConnectionService.sendMessage({
            type: EB2TraderEventHandlersRequest.UnsubscribeChart,
            payload: symbol,
          });
          subscription.subject.unsubscribe();
          this.chartDataTopics.delete(symbol);
        }
      }),
    );
  }

  public getBarsData$(request: IBarsHistoryRequest): Observable<Bar[]> {
    const id = nanoid();
    const {symbol, resolution, rangeStartDate, rangeEndDate} = request;

    const resolutionInMinutes = ChartRequest.getResolutionInMinutes(resolution);
    const maxBarsCount = 960;
    const maxTimeRangeForSingleRequest = SECONDS_IN_MINUTE * resolutionInMinutes * maxBarsCount;
    const intervalsCount = Math.ceil((rangeEndDate - rangeStartDate) / maxTimeRangeForSingleRequest);

    for (let i = 0; i < intervalsCount; i++) {
      this.workerConnectionService.sendMessage<IChartHistoryRequest>({
        type: EB2TraderEventHandlersRequest.ChartHistory,
        payload: {
          id,
          resolution,
          symbol,
          rangeEndDate: rangeEndDate - maxTimeRangeForSingleRequest * i,
          rangeStartDate: Math.max(rangeStartDate, rangeEndDate - maxTimeRangeForSingleRequest * (i + 1) + 1),
        },
      });
    }

    return this.workerConnectionService.messages$.pipe(
      filter(
        (message: IDataEvent<ISingleRequestMessage<Bar[]>>) =>
          message.type === EB2TraderEventHandlersReceived.ChartHistory && message.payload.id === id,
      ),
      bufferCount(intervalsCount),
      take(1),
      map((response: IDataEvent<ISingleRequestMessage<Bar[]>>[]) =>
        response.reduce((collector: Bar[], arr: IDataEvent<ISingleRequestMessage<Bar[]>>) => {
          if (collector[0]?.time < arr.payload.data[arr.payload.data.length - 1]?.time) {
            collector.push(...arr.payload.data);
          } else {
            collector.unshift(...arr.payload.data);
          }

          return collector;
        }, []),
      ),
    );
  }

  public cancelOrder(orderId: number): Promise<boolean> {
    const id = nanoid();

    this.workerConnectionService.sendMessage({type: EEventHandlersRequest.CloseOrder, payload: {id, data: orderId}});

    return firstValueFrom(
      this.workerConnectionService.messages$.pipe(
        filter(
          (message: IDataEvent<ISingleRequestMessage<boolean>>) =>
            message.type === EB2TraderEventHandlersRequest.CloseOrder && message.payload.id === id,
        ),
        take(1),
        map(
          ({
            payload: {
              // eslint-disable-next-line @typescript-eslint/naming-convention
              data,
            },
          }) => data,
        ),
      ),
    );
  }

  public cancelStopOrder(orderId: number): Promise<boolean> {
    const id = nanoid();

    this.workerConnectionService.sendMessage({
      type: EB2TraderEventHandlersRequest.CloseStopOrder,
      payload: {id, data: orderId},
    });

    return firstValueFrom(
      this.workerConnectionService.messages$.pipe(
        filter(
          (message: IDataEvent<ISingleRequestMessage<boolean>>) =>
            message.type === EB2TraderEventHandlersRequest.CloseStopOrder && message.payload.id === id,
        ),
        take(1),
        map(
          ({
            payload: {
              // eslint-disable-next-line @typescript-eslint/naming-convention
              data,
            },
          }) => data,
        ),
      ),
    );
  }

  public getCanceledOrders(params: Omit<IHistoryOrdersRequest, 'status'>): Observable<IHistoryOrders> {
    const id = nanoid();

    this.workerConnectionService.sendMessage({
      type: EB2TraderEventHandlersRequest.HistoryOrder,
      payload: {id, data: {...params, status: StatusEnum.Cancelled}},
    });

    return this.workerConnectionService.messages$.pipe(
      filter(
        (message: IDataEvent<ISingleRequestMessage<boolean>>) =>
          message.type === EB2TraderEventHandlersRequest.HistoryOrder && message.payload.id === id,
      ),
      map(
        ({
          payload: {
            // eslint-disable-next-line @typescript-eslint/naming-convention
            data,
          },
        }) => data,
      ),
      take(1),
      switchMap(isLastPage =>
        this.ordersValues$.pipe(
          map(orders => ({
            orders: orders
              .filter(order => B2TraderOrder.compare(params, order) && order.isCanceled())
              .slice(0, params.ordersOnPage * params.page),
            // eslint-disable-next-line @typescript-eslint/naming-convention
            lastPage: isLastPage,
          })),
        ),
      ),
    );
  }

  public getFilledOrders(params: Omit<IHistoryOrdersRequest, 'status'>): Observable<IHistoryOrders> {
    const id = nanoid();

    this.workerConnectionService.sendMessage({
      type: EB2TraderEventHandlersRequest.HistoryOrder,
      payload: {id, data: {...params, status: StatusEnum.Completed}},
    });

    return this.workerConnectionService.messages$.pipe(
      filter(
        (message: IDataEvent<ISingleRequestMessage<boolean>>) =>
          message.type === EB2TraderEventHandlersRequest.HistoryOrder && message.payload.id === id,
      ),
      map(
        ({
          payload: {
            // eslint-disable-next-line @typescript-eslint/naming-convention
            data,
          },
        }) => data,
      ),
      take(1),
      switchMap(isLastPage =>
        this.ordersValues$.pipe(
          map(orders => ({
            orders: orders
              .filter(order => B2TraderOrder.compare(params, order) && order.isFilled())
              .slice(0, params.ordersOnPage * params.page),
            // eslint-disable-next-line @typescript-eslint/naming-convention
            lastPage: isLastPage,
          })),
        ),
      ),
    );
  }

  public getStopOrders(range: IDateRange): Observable<B2traderStopOrder[]> {
    return this.stopOrdersValues$.pipe(
      map(orders => orders.filter(order => order.isWaitingForActivation() && B2traderStopOrder.compare(range, order))),
    );
  }

  public getOpenOrders(range: IDateRange | undefined): Observable<B2TraderOrder[]> {
    return this.ordersValues$.pipe(map(orders => orders.filter(o => o.isActive() && B2TraderOrder.compare(range, o))));
  }

  public getTrades$(symbol: string): Observable<Trade[]> {
    let obj = this.trades.get(symbol);

    if (!obj) {
      obj = {subscribers: 0, stream: new BehaviorSubject<Trade[]>([])};

      this.trades.set(symbol, obj);
      this.workerConnectionService.sendMessage<IFacadeSubscription>({
        type: EB2TraderEventHandlersRequest.Trades,
        payload: {symbolsToAdd: [symbol], symbolsToRemove: []},
      });
    }

    obj.subscribers++;

    return obj.stream.pipe(
      finalize(() => {
        obj.subscribers--;
        if (obj.subscribers < 1) {
          this.trades.delete(symbol);
          this.workerConnectionService.sendMessage<IFacadeSubscription>({
            type: EB2TraderEventHandlersRequest.Trades,
            payload: {symbolsToAdd: [], symbolsToRemove: [symbol]},
          });
        }
      }),
    );
  }

  public createOrder(orderParams: IOrderCreateRequest): Promise<ICreateOrderResponse> {
    const id = nanoid();
    this.workerConnectionService.sendMessage<ISingleRequestMessage<IOrderCreateRequest>>({
      type: EB2TraderEventHandlersRequest.CreateOrder,
      payload: {id, data: Object.assign(orderParams)},
    });

    return firstValueFrom(
      this.workerConnectionService.messages$.pipe(
        filter(
          (message: IDataEvent<ISingleRequestMessage<ICreateOrderResponse>>) =>
            message.type === EB2TraderEventHandlersRequest.CreateOrder && message.payload.id === id,
        ),
        take(1),
        map(({payload: {data}}) => data),
      ),
    );
  }

  public createStopOrder(orderParams: unknown): Promise<ICreatedStopOrderResponse> {
    const id = nanoid();
    this.workerConnectionService.sendMessage<ISingleRequestMessage<ApiLibraryApiOrderInfoRequest>>({
      type: EB2TraderEventHandlersRequest.CreateStopOrder,
      payload: {id, data: orderParams},
    });

    return firstValueFrom(
      this.workerConnectionService.messages$.pipe(
        filter(
          (message: IDataEvent<ISingleRequestMessage<ICreatedStopOrderResponse>>) =>
            message.type === EB2TraderEventHandlersRequest.CreateStopOrder && message.payload.id === id,
        ),
        take(1),
        map(({payload: {data}}) => data),
      ),
    );
  }

  public publicLogin(email: string, password: string): void {
    const id = nanoid();
    this.workerConnectionService.sendMessage<ISingleRequestMessage<IPublicLoginDialogData>>({
      type: EB2TraderEventHandlersRequest.PublicLogin,
      payload: {
        id,
        data: {email, password},
      },
    });
  }

  public publicLogout(): void {
    const id = nanoid();
    this.workerConnectionService.sendMessage<ISingleRequestMessage>({
      type: EB2TraderEventHandlersRequest.PublicLogout,
      payload: {
        id,
        data: null,
      },
    });
  }

  public updateQuotesSubscriptions(): void {
    throw new Error('Method not implemented.');
  }

  public ngOnDestroy(): void {
    this.terminate();
    B2TraderDatafeedService.clearMapSubject(this.level2);
    this.level2Ids.clear();
    this.destroyer$.next();
    this.destroyer$.complete();
  }
}
