import { Inject, Injectable, InjectionToken } from "@angular/core";
import { webSocket, WebSocketSubject } from "rxjs/webSocket";
import {
  combineLatest,
  filter,
  interval,
  map,
  NEVER,
  Observable,
  Subject,
  tap,
} from "rxjs";
import { Message } from "./rstypes/Message";
import { ChannelMessage } from "./rstypes/ChannelMessage";
import { WsMessage } from "./rstypes/WsMessage";
import { BroadcastMessage } from "./rstypes/BroadcastMessage";
import { DataMessageService } from "@solidev/data";
import { MessagePriority } from "./rstypes/MessagePriority";
import { ProgressPayload } from "./rstypes/MessagePayload";
import { RI } from "../../constants";

export const LIVE_URL = new InjectionToken<string>("live.url");
export type Wsm = Message | WsMessage;

export const message_display: {
  [k in MessagePriority]: {
    timeout: number;
  };
} = {
  critical: { timeout: 0 },
  error: { timeout: 0 },
  warning: { timeout: 0 },
  info: { timeout: 0 },
  verbose: { timeout: 0 },
  debug: { timeout: 5000 },
};

export interface MessageStatus {
  message: Message;
  timeout: Date;
  ack: boolean;
}

export interface ProgressAction {
  label: string;
  icon?: keyof typeof RI;
  route?: (string | number)[];
  action?: (p: Progress) => void;
}

export interface Progress {
  channel: string;
  name: string;
  total: number;
  slot: "global" | string;
  current: number;
  step: string;
  message: string;
  result?: string;
  link?: string;
  done: boolean;
  closed: boolean;
  timeout: Date;
  actions: ProgressAction[];
}

@Injectable({
  providedIn: "root",
})
export class WsService {
  private ws?: WebSocketSubject<Wsm>;
  /** Subject for **all** received messages */
  private _messages = new Subject<Message>();
  private _progress = new Map<string, Progress>();
  private _alive = false;

  constructor(
    @Inject(LIVE_URL) private liveUrl: string,
    private _msgs: DataMessageService,
  ) {
    interval(5000).subscribe(() => {
      this.connect(this.liveUrl);
    });

    // Clean progress every 2 minutes
    interval(120000)
      .pipe(tap(() => this.clean_progress()))
      .subscribe();
  }

  /**
   * Connect to the websocket server, and subscribe to all messages.
   * @param url
   */
  public connect(url: string) {
    if (this._alive) {
      return;
    }
    if (this.ws) {
      try {
        this.ws.complete();
      } catch (e) {
        console.log("Websocket error while closing websocket", e);
      }
    }
    try {
      console.log("Websocket connecting to", url);
      this.ws = webSocket(url);
      this._alive = true;
    } catch (e) {
      console.info("Websocket connection failed", e);
      return;
    }
    try {
      this.ws.subscribe({
        next: (msg) => {
          // Called whenever there is a message from the server.
          this._messages.next(msg as Message);
        },
        error: (err: unknown) => {
          // Called if at any point WebSocket API signals some kind of error.
          if (err instanceof CloseEvent) {
            console.log("Websocket connection closed", err);
            this._alive = false;
          } else {
            console.error("Websocket uncatched error", err);
            // TODO: see if we need to reconnect (check ping/pong ??)
            this._alive = false;
          }
        },
        complete: () => {
          this._alive = false;
          console.log("Websocket connection completed");
        }, // Called when connection is closed (for whatever reason).
      });
    } catch (e) {
      console.info("Websocket connection error", e);
      this._alive = false;
    }
  }

  /** Starts a progress monitoring session, using a channel name, a slot name, a total number of steps and a name
   * @param channel channel name
   * @param slot slot type (global or specific)
   * @param total total number of steps
   * @param name name of the progress, displayed in the progress bar
   * @param actions list of actions to display in the progress bar
   */
  public progress(
    channel: string,
    slot: "global" | string,
    total: number = 100,
    name: string = "",
    actions: ProgressAction[] = [],
  ): void {
    if (this.ws) {
      this.ws.next({ type: "subscribe", channel: channel });
      this._progress.set(channel, {
        channel: channel,
        total: total,
        name: name,
        slot: slot,
        current: 0,
        step: "",
        closed: false,
        message: "",
        done: false,
        // default timeout : now + 1h
        timeout: new Date(new Date().getTime() + 60 * 60 * 1000),
        actions,
      });
    }
  }

  /** Returns an observable of progress messages for a given slot. The observable emits an array of progress messages
   * on each progress message received, and on each second (for timeout management).
   *
   * If a channel name is provided, only progress messages for this channel are emitted.
   *
   * @param slot slot type (global or specific)
   * @param channel channel name (optional)
   * @returns observable of progress messages
   */
  public progress$(
    slot: "global" | string,
    channel: string | null = null,
  ): Observable<Progress[]> {
    const msgs = this._messages.asObservable().pipe(
      filter(
        (m: Message) =>
          m.type === "channel" &&
          this._progress.get(m.channel)?.slot === slot &&
          m.payload.type === "progress",
      ),
      map((m) => m as ChannelMessage),
      tap((m) => {
        // Filter for progress messages
        const pl = m.payload as ProgressPayload;
        const p = this._progress.get(m.channel)!;
        p.current += pl.progress;
        if (p.total < p.current) {
          p.total = p.current;
        }
        if (pl.percent !== undefined) {
          p.current = pl.percent;
          p.total = 100;
        }
        p.done = pl.done;
        p.step = pl.step;
        p.link = pl.link;
        p.result = pl.result;
        p.message = m.text ?? "";
        if (pl.done) {
          p.current = p.total;
          // On done, timeout is now + 120s
          p.timeout = new Date(new Date().getTime() + 120 * 1000);
        }
        this._progress.set(m.channel, p);
      }),
    );
    return combineLatest([msgs, interval(1000)]).pipe(
      map(() =>
        Array.from(this._progress.values()).filter(
          (p) => p.slot === slot && (!channel || p.channel === channel),
        ),
      ),
    );
  }

  public subscribe$(channel: string): Observable<ChannelMessage> {
    if (this.ws) {
      this.ws.next({ type: "subscribe", channel: channel });
      return this._messages.asObservable().pipe(
        filter((m) => m.type === "channel" && m.channel === channel),
        map((message: Message) => message as ChannelMessage),
      );
    }
    return NEVER;
  }

  public broadcasts$(): Observable<BroadcastMessage> {
    return this._messages.asObservable().pipe(
      filter((m) => m.type === "broadcast"),
      map((message: Message) => message as BroadcastMessage),
      tap((m: BroadcastMessage) => {
        this._msgs.info(m.title ?? "", m.text ?? "");
      }),
    );
  }

  public unsubscribe(channel: string): void {
    if (this.ws) {
      this.ws.next({ type: "unsubscribe", channel: channel });
    }
  }

  public messages$(): Observable<Message> {
    return this._messages.asObservable();
  }

  public unprogress(channel: string) {
    this.ws?.next({ type: "unsubscribe", channel });
    this._progress.delete(channel);
  }

  private clean_progress() {
    const now = new Date();
    this._progress.forEach((p) => {
      if (p.timeout < now) {
        p.closed = true;
        this.ws?.next({ type: "unsubscribe", channel: p.channel });
        this._progress.delete(p.channel);
      }
    });
  }
}
