import { SseService } from 'app/shared/services/sse/sse.service';
import { environment } from 'environments/environment';
import { Observable, Observer } from 'rxjs';

export interface Message<T> {
  id: number;
  type: string;
  ref: string;
  data: T;
}

export class SseConnection {
  private static staticReconnectInterval = 15000;

  private observers: Map<string, Observer<any>>;
  private reconnectInterval: number | null = null;

  private connectionTries = 0;

  constructor(
    private sseService: SseService,
    private topics: string[],
    private source: EventSource,
  ) {
    this.observers = new Map<string, Observer<any>>();
    this.subscribe();
  }

  on<T>(event: string): Observable<Message<T>> {
    return new Observable<Message<T>>(observer => {
      this.observers.set(event, observer);
      this.subscribeToEvent(event, observer);
    });
  }

  close(): void {
    if (this.reconnectInterval) {
      clearInterval(this.reconnectInterval);
    }

    this.source.close();

    for (const observer of this.observers.values()) {
      observer.complete();
    }
  }

  private subscribe(): void {
    this.source.onopen = (): void => this.onOpen();
    this.source.onerror = (err): void => this.onError(err);

    for (const [ event, observer ] of this.observers.entries()) {
      this.subscribeToEvent(event, observer);
    }
  }

  private subscribeToEvent(event: string, observer: Observer<any>): void {
    this.source.addEventListener(event, (message: MessageEvent) => {
      const data = JSON.parse(message.data);
      SseConnection.log('Received event', { event, data });
      observer.next(data);
    });
  }

  private stopReconnect(): void {
    if (this.reconnectInterval) {
      clearInterval(this.reconnectInterval);
      this.reconnectInterval = null;
    }
    this.connectionTries = 0;
  }

  private reconnect(): void {
    this.stopReconnect();

    this.reconnectInterval = setInterval(async () => {
      const connected = await this.tryReconnect();

      if (connected) {
        this.stopReconnect();
        return;
      }

      this.connectionTries++;
      if (this.connectionTries >= 10) {
        this.stopReconnect();
        // eslint-disable-next-line no-console
        console.error('SSE connection lost. Please reload the windows to reconnect');
      }
    }, SseConnection.staticReconnectInterval) as unknown as number;
  }

  private async tryReconnect(): Promise<boolean> {
    return new Promise(resolve => {
      SseConnection.log('Trying to reconnect');

      this.sseService.connect(this.topics).then(value => {
        this.source = value;

        this.source.onopen = (): void => {
          this.subscribe();
          this.onOpen();

          resolve(true);
        };

        this.source.onerror = (): void => {
          this.source.close();
          resolve(false);
        };
      }).catch(() => {
        resolve(false);
      });
    });
  }

  private onOpen(): void {
    SseConnection.log('Connected', { url: this.source.url });
  }

  private onError(event: Event): void {
    SseConnection.log('Disconnected', { url: this.source.url, event });
    this.source.close();

    this.reconnect();
  }

  private static log(group: string, data: Record<string, unknown> = {}): void {
    if (!environment.production) {
      /* eslint-disable no-console */
      console.groupCollapsed(`[SSE] ${group}`);
      for (const [ key, value ] of Object.entries(data)) {
        console.log(`${key}:`, value);
      }
      console.groupEnd();
       /* eslint-enable no-console */
    }
  }
}
