import {
  catchError,
  distinctUntilChanged,
  filter,
  interval,
  map,
  Observable,
  Observer,
  of,
  share,
  Subject,
  SubscriptionLike,
  takeUntil,
  takeWhile,
} from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { Inject, Injectable } from '@angular/core';
import { IWsMessage, MarketplaceWebsocketEventData, TMarketplaceWsTopic, TMarketplaceWsTopicAction } from './websocket.interfaces';
import { APP_ENV, TnAppEnvironment } from '@transport/ui-interfaces';

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  private readonly config: WebSocketSubjectConfig<IWsMessage<any>>;

  private websocketSub: SubscriptionLike;
  private statusSub: SubscriptionLike;

  private reconnection$: Observable<number> | null = null;
  private websocket$: WebSocketSubject<IWsMessage<any>> | null = null;
  private connection$: Observer<boolean> | undefined;
  private wsMessages$: Subject<IWsMessage<any>>;

  private reconnectInterval: number;
  private readonly reconnectAttempts: number;
  private isConnected = false;

  private token: string | undefined;

  public status: Observable<boolean>;

  private readonly unsubscribe$ = new Subject<void>();

  get getWsUrl() {
    return this.wsProtocol() + this.env.apiDomain + '/ws/notifications/';
  }

  constructor( @Inject(APP_ENV) private env: TnAppEnvironment ) {
    this.wsMessages$ = new Subject<IWsMessage<any>>();
    this.reconnectInterval = 5000; // pause between connections
    this.reconnectAttempts = 5; // number of connection attempts
    this.config = {
      url: this.getWsUrl,
      closeObserver: {
        next: (event: CloseEvent) => {
          this.websocket$ = null;
          this.connection$?.next(false);
        },
      },
      openObserver: {
        next: (event: Event) => {
          this.connection$?.next(true);
        },
      },
    };

    // connection status
    this.status = new Observable<boolean>(observer => {
      this.connection$ = observer;
    }).pipe(share(), distinctUntilChanged());

    // run reconnect if not connection
    this.statusSub = this.status.subscribe(isConnected => {
      this.isConnected = isConnected;

      if (!this.reconnection$ && !isConnected) {
        this.reconnect();
      }
    });

    this.websocketSub = this.wsMessages$.subscribe(null, (error: ErrorEvent) => console.error('WebSocket error!', error));
  }

  /*
   * connect to WebSocked
   * */
  public connect(token?: string): void {
    if (this.websocket$) return;
    if (token) this.token = token;

    if (this.token) this.config.url = this.getWsUrl + `?token=${this.token}`;

    this.websocket$ = new WebSocketSubject({ ...this.config });
    this.websocket$
      .pipe(
        catchError(err => {
          console.error(err);
          if (!this.websocket$) this.reconnect();

          return of(err);
        }),
        takeUntil(this.unsubscribe$),
      )
      .subscribe(message => {
        this.wsMessages$.next(message);
      });
  }

  public disconnect() {
    this.token = undefined;

    this.unsubscribe$.next();
  }

  /*
   * reconnect if not connecting or errors
   * */
  private reconnect(): void {
    if (this.token) {
      this.reconnection$ = interval(this.reconnectInterval).pipe(
        takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$),
      );

      this.reconnection$.subscribe(
        () => {
          if (this.token) this.connect(this.token);
        },
        null,
        () => {
          // Subject complete if reconnect attemts ending
          this.reconnection$ = null;

          if (!this.websocket$) {
            this.wsMessages$.complete();
            this.connection$?.complete();
          }
        },
      );
    }
  }

  /*
   * on message event
   * */
  public on<T>(topic: TMarketplaceWsTopic, action: TMarketplaceWsTopicAction | TMarketplaceWsTopicAction[]) {
    return this.onTopic<T>(topic).pipe(
      filter((eventData: MarketplaceWebsocketEventData<T>) =>
        Array.isArray(action) && eventData.action ? action.includes(eventData.action) : eventData.action === action,
      ),
      map((eventData: MarketplaceWebsocketEventData<T>) => eventData.payload),
    );
  }

  public onTopic<T>(topic: TMarketplaceWsTopic): Observable<MarketplaceWebsocketEventData<T>> {
    return this.wsMessages$.pipe(
      filter((message: MarketplaceWebsocketEventData) => {
        return message.topic === topic;
      }),
      map((message: MarketplaceWebsocketEventData) => new MarketplaceWebsocketEventData<T>(message)),
    );
  }
  /*
   * on message to server
   * */
  public send(event: string, data: any = {}): void {
    if (event && this.isConnected) {
      this.websocket$?.next(<any>JSON.stringify({ event, data }));
    } else {
      console.error('Send error!');
    }
  }

  /**
   * Returns websocket protocol.
   */
  public wsProtocol(): string {
    return this.getWindowProtocol() === 'http:' ? 'ws://' : 'wss://';
  }

  /**
   * Gets protocol from window.location.
   */
  public getWindowProtocol(): string {
    return window.location.protocol;
  }
}
