import { Inject, Injectable, OnDestroy } from '@angular/core';
import { ENVIRONMENT, Environment } from '@yeekatee/shared-util-environment';
import { Auth } from 'aws-amplify';
import { Socket } from 'phoenix';
import { Observable, from, switchMap } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class RtHubService implements OnDestroy {
  private socket?: Socket;
  private params = { token: '' };

  constructor(@Inject(ENVIRONMENT) private readonly environment: Environment) {}

  ngOnDestroy(): void {
    this.socket?.disconnect();
  }

  channel<T>(topic: string): Observable<T> {
    return from(this.connectIfNeeded()).pipe(
      switchMap(
        () =>
          new Observable<T>((subscriber) => {
            const ch = this.socket?.channel(topic);
            ch
              ?.join()
              // .receive('ok', (response) => subscriber.next(response))
              .receive('error', (response) => subscriber.error(response))
              .receive('timeout', (response) => subscriber.error(response));

            ch?.on('rthub_msg', (response) => subscriber.next(response));

            return () => {
              ch?.leave();
            };
          }),
      ),
    );
  }

  private async connectIfNeeded(): Promise<void> {
    if (!this.environment.featuresConfig.enableRthub) {
      throw new Error('Rthub not enabled');
    }

    await this.fetchJWT();
    if (!this.socket || !this.socket.isConnected) {
      this.socket = new Socket(`${this.environment.rthubURL}/socket`, {
        params: this.params,
      });
      this.connect();
    }
  }

  private connect() {
    this.socket?.onError(async () => {
      this.socket?.disconnect(); // cancel auto connection recovery
      await this.fetchJWT();
      this.socket?.connect();
    });
    this.socket?.connect();
  }

  private async fetchJWT() {
    this.params.token = (await Auth.currentSession())
      .getAccessToken()
      .getJwtToken();
  }
}
