import { Injectable, OnDestroy, Inject } from '@angular/core';
import {
  Observable,
  SubscriptionLike,
  Subject,
  Observer,
  interval
} from 'rxjs';
import {
  filter,
  map,
  share,
  distinctUntilChanged,
  takeWhile
} from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

import {
  IWebsocketService,
  IWsMessage,
  WebSocketConfig
} from './websocket.interfaces';
import { config } from './websocket.config';
import { AuthService } from 'src/app/core/services/auth.service';
/**
 * Сервис веб-сокета
 */
@Injectable({
  providedIn: 'root'
})
export class WebsocketService implements IWebsocketService, OnDestroy {
  public status: Observable<boolean>;

  private config: WebSocketSubjectConfig<IWsMessage<any>>;

  private websocketSub: SubscriptionLike;
  private statusSub: SubscriptionLike;

  private reconnection$: Observable<number>;
  private websocket$: WebSocketSubject<IWsMessage<any>>;
  private connection$: Observer<boolean>;
  private wsMessages$: Subject<IWsMessage<any>>;

  private reconnectInterval: number;
  private reconnectAfterError: number;
  private reconnectAttempts: number;
  private isConnected: boolean;

  constructor(
    @Inject(config) private wsConfig: WebSocketConfig,
    private auth: AuthService
  ) {
    this.wsMessages$ = new Subject<IWsMessage<any>>();

    this.reconnectInterval = wsConfig.reconnectInterval || 5000; // pause between connections
    this.reconnectAfterError = 60000; // pause after error
    this.reconnectAttempts = wsConfig.reconnectAttempts || 10; // number of connection attempts

    this.config = {
      url: wsConfig.url + '?token=' + this.auth.getAccessToken(),
      closeObserver: {
        next: () => {
          this.websocket$ = null;
          setTimeout(function() {
            this.connection$.next(false);
          }, this.reconnectAfterError);
        }
      },
      openObserver: {
        next: () => {
          this.connection$.next(true);
        }
      }
    };

    // connection status
    this.status = new Observable<boolean>(observer => {
      this.connection$ = observer;
    }).pipe(share(), distinctUntilChanged());

    // run reconnect if not connection
    this.statusSub = this.status.subscribe(isConnected => {
      this.isConnected = isConnected;

      if (
        !this.reconnection$ &&
        typeof isConnected === 'boolean' &&
        !isConnected
      ) {
        // console.log(isConnected, this.reconnection$)
        this.reconnect();
      }
    });

    this.websocketSub = this.wsMessages$.subscribe(null, (error: ErrorEvent) =>
      console.error('WebSocket error!', error)
    );
  }

  /**
   * Хук дестроя
   *
   * @description отписывается от подписок
   */
  ngOnDestroy() {
    this.websocketSub.unsubscribe();
    this.statusSub.unsubscribe();
  }

  /**
   * Получение наблюдателя за событием получения сообщения
   *
   * @param events событие, за которым устанавливается наблдение
   * @returns наблюдателя за событием получения сообщения
   */
  public on<T>(events: string[]): Observable<T> {
    return this.wsMessages$.pipe(
      filter((message: IWsMessage<T>) =>
        events.some(event => message.event === event)
      ),
      map((message: IWsMessage<T>) => message.data)
    );
  }

  /**
   * Отсылка сообщения серверу
   *
   * @param event событие
   * @param data данные
   */
  public send(event: string, data: any = {}): void {
    if (!event || !this.isConnected) {
      console.error('Send error!');
      return;
    }
    this.websocket$.next({ event, data } as any);
  }

  /**
   * connect to WebSocked
   */
  public connect(): void {
    this.websocket$ = new WebSocketSubject(this.config);
    this.websocket$.subscribe(
      message => this.wsMessages$.next(message),
      (error: Event) => {
        console.error(error);
        if (!this.websocket$) {
          this.reconnect();
        }
      }
    );
  }

  /**
   * reconnect if not connecting or errors
   */
  private reconnect(): void {
    this.reconnection$ = interval(this.reconnectInterval).pipe(
      takeWhile(
        (v, index) => index < this.reconnectAttempts && !this.websocket$
      )
    );

    this.reconnection$.subscribe(
      () => this.connect(),
      null,
      () => {
        // Subject complete if reconnect attemts ending
        this.reconnection$ = null;

        if (!this.websocket$) {
          this.wsMessages$.complete();
          this.connection$.complete();
        }
      }
    );
  }
}
