import { Subscription, Subject, filter } from 'rxjs';
import { Err, Result } from 'ts-results';
import {
  SubscribeAction,
  parseWSNotification,
  WSNotification,
} from '@agoy/messages';
import AppConfig from '_shared/services/config';
import { getContext } from 'utils/AgoyAppClient/contextHolder';
import {
  NotificationErrors,
  NotificationFilter,
  NotificationService,
} from './types';

/**
 * Util function to create a comparator for a notification filter
 */
const equals =
  (a: NotificationFilter) =>
  (b: NotificationFilter): boolean =>
    a.clientId === b.clientId && a.topic === b.topic;

/**
 * Filters out the unique notification filters
 *
 * @param items
 * @returns unique filters
 */
const unique = (
  items: IterableIterator<NotificationFilter>
): NotificationFilter[] => {
  const result: NotificationFilter[] = [];
  Array.from(items).forEach((item) => {
    if (!result.find(equals(item))) {
      result.push(item);
    }
  });
  return result;
};

export default class NotificationServiceImpl implements NotificationService {
  ws: WebSocket | undefined;

  open = false;

  state: 'connecting' | 'connected' | 'not_connected' = 'not_connected';

  /**
   * All subscriptions made so far, needed to resubscribe on a reconnect.
   */
  subscriptions: Map<Subscription, NotificationFilter> = new Map();

  notification: Subject<Result<WSNotification, NotificationErrors>> =
    new Subject();

  /**
   * Time to wait before retrying the connection
   */
  waitForReconnect = 1000;

  constructor() {
    if (AppConfig.wsEndpoint) {
      this.init();
    } else {
      // eslint-disable-next-line no-console
      console.warn('The WS endpoint is not configured');
    }
  }

  private init(): void {
    this.state = 'connecting';
    try {
      const ws = new WebSocket(AppConfig.wsEndpoint);
      this.ws = ws;

      ws.onopen = this.onOpen.bind(this);
      ws.onerror = this.onError.bind(this);
      ws.onclose = this.onClose.bind(this);
      ws.onmessage = this.onMessage.bind(this);
    } catch (err) {
      // eslint-disable-next-line no-console
      console.error(err);
    }
  }

  onOpen(): void {
    this.open = true;
    this.state = 'connected';
    unique(this.subscriptions.values()).forEach((notificationFilter) => {
      this.sendSubscribeMessage(notificationFilter);
    });
  }

  onClose(): void {
    this.state = 'not_connected';
    setTimeout(() => this.init(), this.waitForReconnect);
  }

  onError(event: Event): void {
    // eslint-disable-next-line no-console
    console.warn(event);
    if (this.state === 'connecting') {
      this.notification.next(Err('WS_NO_CONNECTION'));
    } else {
      this.notification.next(Err('WS_UNKNOWN_ERROR'));
    }
  }

  onMessage(event: MessageEvent): void {
    const message = parseWSNotification(event.data.toString());
    if (message.ok) {
      this.notification.next(message);
    } else {
      // eslint-disable-next-line no-console
      console.warn('Received an invalid message', message.val);
    }
  }

  async sendSubscribeMessage(notificationFilter: NotificationFilter) {
    if (this.state === 'connected') {
      const headers: Record<string, string> = await getContext().headers();
      const { token, 'agoy-jwt': agoyJwt } = headers;
      if (token || agoyJwt) {
        const action: SubscribeAction = {
          action: 'subscribe',
          clientId: notificationFilter.clientId,
          topic: notificationFilter.topic,
          token: token || agoyJwt,
        };
        this.ws?.send(JSON.stringify(action));
      }
    }
  }

  subscribe(
    notificationFilter: NotificationFilter,
    handler: (message: Result<WSNotification, NotificationErrors>) => void
  ): Subscription {
    const sub = this.notification
      .pipe(
        filter(
          (msg) =>
            msg.err ||
            (msg.val.clientId === notificationFilter.clientId &&
              msg.val.topic === notificationFilter.topic)
        )
      )
      .subscribe(handler);

    if (
      !Array.from(this.subscriptions.values()).find(equals(notificationFilter))
    ) {
      this.sendSubscribeMessage(notificationFilter);
    }
    this.subscriptions.set(sub, notificationFilter);

    return sub;
  }

  unsubscribe(subscription: Subscription): void {
    subscription.unsubscribe();
    this.subscriptions.delete(subscription);
    // TODO Unsubscribe not implemented in lambdas
  }
}
