import { decompressSync } from "fflate";
import { wsMachine, WSMachine } from "ts-ws-machine";
import { concat, filter, forEach, fromValue, makeSubject, onEnd, onStart, pipe, skipWhile, take } from "wonka";
import { hash } from "../../utils/objects";
import { SubscriptionType as SubscriptionTypeDto, UnsubscribeAction as UnsubscribeActionDto } from "../client";

export { SubscriptionType } from "../client";

const WS_BASE_URL = process.env.NEXT_PUBLIC_WS_BASE_URI;

export enum WebsocketStatus {
  Initial = "INITIAL",
  Connecting = "CONNECTING",
  Open = "OPEN",
  Closed = "CLOSED",
  Reconnecting = "RECONNECTING",
}

export enum SubscriptionStatus {
  Pending = "PENDING",
  Subscribed = "SUBSCRIBED",
  Unsubscribed = "UNSUBSCRIBED",
  Disconnected = "DISCONNECTED",
}

export type Subscription<T extends SubscriptionTypeDto> = {
  subscriptionType: T;
};

export type WebSocketSubscribeAction<S extends Subscription<any>> = {
  action: "subscribe";
  subscription: S;
  subscriptionId?: string;
};

export type WebSocketUnsubscribeAction = UnsubscribeActionDto & {
  action: "unsubscribe";
  subscriptionId: string;
};

export interface AssistPlannedSubscription extends Subscription<SubscriptionTypeDto.AssistPlanned> {
  startTime: Date;
  endTime: Date;
}

export interface Envelope<D = any> {
  data: D; // TODO (ma): type this properly
  type: SubscriptionTypeDto;
  /**
   * @deprecated DO NOT USE: Deprecated, only remaining usage is in Tasks and will be replaced soon
   */
  delete?: boolean;
  compressed: boolean;
  subscriptionId: string;
  notificationKeys: string[];
}

export class ReclaimWS {
  private client: WSMachine;
  private reconnecting: boolean = false;
  private disconnectStart = 0;

  public subscriptions: Map<
    string,
    { watchers: Map<Subscription<any>, number>; type: SubscriptionTypeDto; status: SubscriptionStatus }
  > = new Map();

  private messageSubject = makeSubject<Envelope>();
  private statusSubject = makeSubject<WebsocketStatus>();

  constructor(types?: Array<SubscriptionTypeDto>) {
    try {
      this.client = wsMachine({
        url: `${WS_BASE_URL}${types ? `?${types.map((t, i) => `type=${t}`).join("&")}` : ""}`,
        pingTimeoutMillis: 1000 * 10, // ping WS every 10 seconds to keep connection open & determine if a disconnect occured
        pongTimeoutMillis: 1000 * 10,
        pingMsg: '{"action": "$default"}',
        pongMsg: "",
        backoffFn: (attempt, seed) => (attempt === 0 ? 0 : 1000 * 5), // if disconnected, try to immediately reconnect, otherwise try every 5 seconds
        onMessage: (msg) => {
          try {
            // If there is no data, this is likely a ping pong message, so just throw it away
            if (!msg.data) {
              return;
            }

            const envelope = JSON.parse(msg.data) as Envelope;

            // Decompress (base64, gzip)
            if (!!envelope?.compressed) {
              try {
                const compressed = Uint8Array.from(Buffer.from(envelope.data, "base64"));
                const decompressed = decompressSync(compressed);
                const data = new TextDecoder().decode(decompressed);

                envelope.data = JSON.parse(data);
              } catch (err) {
                console.error("Failed to decompress ws data", err);
                throw err;
              }
            }

            this.messageSubject.next(envelope);
          } catch (e) {
            console.error("Unhandled websocket message", msg, e);
          }
        },
        onStateChange: ({ previous, current }) => {
          if (previous.tag !== current.tag) console.log(`Websocket state changed: ${previous.tag} -> ${current.tag}`);

          // reflect disconnected status
          if (WebsocketStatus.Closed !== previous.tag && WebsocketStatus.Closed === current.tag) {
            console.info("Websocket disconnected");
            this.subscriptions.forEach((subscription) => {
              subscription.status = SubscriptionStatus.Disconnected;
            });

            // Start a timer to see how long we are disconnected for
            if (!this.disconnectStart) {
              this.disconnectStart = Date.now();
            }
          }

          // resubscribe after reconnect
          if (WebsocketStatus.Open === previous.tag && WebsocketStatus.Open !== current.tag) this.reconnecting = true;
          else if (this.reconnecting && WebsocketStatus.Open === current.tag) {
            console.info("Websocket reconnected");
            this.reconnecting = false;
            this.resubscribe();
          }

          this.statusSubject.next(current.tag as WebsocketStatus);
        },
      });
      this.client.connect();
    } catch (e) {
      console.error("Error connecting websocket", e);
    }

    if (this.client) {
      // watch for subscription confirmations
      pipe(
        this.source$,
        filter((envelope) => [SubscriptionTypeDto.NewSubscription].includes(envelope.type)),
        forEach((envelope) => {
          const subscriptionId = envelope.subscriptionId;
          const type = envelope.data;
          const record = this.subscriptions.get(subscriptionId);

          if (!record) {
            console.warn("Unexpected new subscription, maybe from another session?", subscriptionId, type, record);
            return;
          }

          if (type !== record.type) {
            console.warn("New subscription type mismatch", type, record.type);
          }

          this.subscriptions.set(subscriptionId, { ...record, status: SubscriptionStatus.Subscribed });
        })
      );
    }
  }

  get source$() {
    return this.messageSubject.source;
  }

  get status$() {
    return this.statusSubject.source;
  }

  subscription$$<T extends SubscriptionTypeDto, S extends Subscription<T>>(subscription: S) {
    const subscriptionId = this.getUid(subscription);

    return pipe(
      this.messageSubject.source,
      filter(
        (envelope) =>
          envelope.subscriptionId === subscriptionId && ![SubscriptionTypeDto.NewSubscription].includes(envelope.type)
      ),

      onStart(() => {
        this.subscribe(subscription);
      }),
      onEnd(() => {
        this.unsubscribe(subscription);
      })
    );
  }

  subscribe<T extends SubscriptionTypeDto, S extends Subscription<T>>(
    subscription: S,
    subscriptionId: string = this.getUid(subscription)
  ) {
    const record = this.subscriptions.get(subscriptionId);

    console.info("subscribe", subscriptionId, subscription);

    // If an active subscription already exists, register a new watcher
    if (!!record && ![SubscriptionStatus.Unsubscribed].includes(record.status)) {
      const count = record.watchers.get(subscription) || 0;
      record.watchers.set(subscription, count + 1);

      // If the subscription is pending or connected, return
      if ([SubscriptionStatus.Pending, SubscriptionStatus.Subscribed].includes(record.status)) return;
    }

    // If subscription got disconnected, resend the subscription
    if (!!record && SubscriptionStatus.Disconnected === record.status)
      console.info("Subscription disconnected, resubscribing", subscriptionId);

    this.subscriptions.set(subscriptionId, {
      watchers: record?.watchers || new Map([[subscription, 1]]),
      type: subscription.subscriptionType,
      status: SubscriptionStatus.Pending,
    });

    const payload: WebSocketSubscribeAction<S> = {
      action: "subscribe",
      subscriptionId,
      subscription,
    };

    this.send(payload);
  }

  unsubscribe<T extends SubscriptionTypeDto, S extends Subscription<T>>(
    subscription: S,
    subscriptionId: string = this.getUid(subscription)
  ): void {
    const record = this.subscriptions.get(subscriptionId);

    console.info("unsubscribe", subscriptionId, subscription);

    if (!record) {
      console.warn("No matching subscription to unsubscribe", subscriptionId, Array.from(this.subscriptions.values()));
      return;
    }
    if (!record.watchers.has(subscription)) {
      console.warn("No matching watcher to unsubscribe", subscription, Array.from(record.watchers.values()));
      return;
    }

    // Decrement the watcher count and remove when there are no more watchers
    const count = record.watchers.get(subscription) || 0;
    if (count > 1) record.watchers.set(subscription, count - 1);
    else record.watchers.delete(subscription);

    // Return if there are more watchers
    if (!!record.watchers.size) return;

    // When all watchers unsubscribe, send an unsubscribe message
    const payload: WebSocketUnsubscribeAction = {
      action: "unsubscribe",
      subscriptionId,
    };

    this.send(payload);
    this.subscriptions.delete(subscriptionId);
  }

  pushMessage(data: Object, type: SubscriptionTypeDto, subscriptionId: string) {
    this.messageSubject.next({
      data,
      type,
      compressed: false,
      subscriptionId,
      notificationKeys: [],
    });
  }

  private send(payload: Object) {
    // Wait for connection to open before sending
    // TODO (IW): This should really be smarter
    pipe(
      concat([fromValue(this.client.currentState().tag), this.status$]),
      skipWhile((status) => status !== "OPEN"),
      take(1),
      forEach(() => this.client.send(JSON.stringify(payload)))
    );
  }

  private resubscribe() {
    Array.from(this.subscriptions.entries()).forEach(([subscriptionId, meta]) => {
      Array.from(meta.watchers.entries()).forEach(([sub]) => this.subscribe(sub, subscriptionId));
    });
  }

  private getUid<T extends SubscriptionTypeDto, S extends Subscription<T>>(subscription: S) {
    return [subscription.subscriptionType, hash(subscription)].join("_");
  }
}
