import uuid from 'uuid';
import { ProtectedTypedEvents } from 'src/lib/events';
import { Logger, createLog } from 'src/lib/logging';
import { encryptPayload, importKey, decryptPayload } from './crypto';
import { chunkify, ChunkAssembler } from './chunking';
import {
  IDataChannelOpts,
  IDataChannel,
  DataChannelEventMap,
  Data,
} from './data-channel';
import { ReadyState } from './ready-state';
import {
  SessionServiceClient,
  EventMap as SessionServiceEventMap,
  ISessionServiceClient,
  ISessionServiceClientFactory,
} from './session-service-client';
import {
  IRelayMessage,
  IStatusMessage,
  IAckMessage,
  IBindNotification,
  ISessionClosedMessage,
} from './session-service-messages';
import { getBindUrl } from './binding-info';
import { PriorityQueue } from './priority-queue';
import { SessionServiceStatusCode } from './session-service-status-code';

const defaultChunkSize = 8025;
const defaultSendPriority = 3;
const defaultChunksPerAck = 20;
const defaultMaxUnacknowledgedChunks = defaultChunksPerAck * 4;

interface IChunkProgress {
  totalBytes: number;
  chunkSize: number;
}

export interface ISessionServiceDataChannelOpts extends IDataChannelOpts {
  /** Prefix used for all generated message IDs */
  messagePrefix: string;

  /**
   * Max number of chunks that can be sent at a time before
   * being acknowledged by peer.
   */
  maxUnacknowledgedChunks?: number;

  /** Alternative SessionServiceClient factory */
  sessionServiceClientFactory?: ISessionServiceClientFactory;
}

export class SessionServiceDataChannel
  extends ProtectedTypedEvents<DataChannelEventMap>
  implements IDataChannel {
  /** IDataChannel.ready */
  public readonly ready: Promise<IBindNotification['peerDescription']>;

  /** Log */
  private logger: Logger;

  /** Client used for data transport */
  private ssClient: ISessionServiceClient;

  /** Max chunk size for chunking */
  private chunkSize = defaultChunkSize;

  /** Helper to assemble chunked messages */
  private chunkAssembler = new ChunkAssembler();

  /** Key used for symmetric message encryption/decryption */
  private key: CryptoKey;

  /** How many chunks we need to send before expecting an 'ack' message */
  private chunksPerAck = defaultChunksPerAck;

  /** Messages that haven't been fully acknowledged */
  private chunkProgress: { [messageId: string]: IChunkProgress } = {};

  /** Number of chunks sent that we haven't received an acknowledgement for yet */
  private numOfUnacknowledgedChunks = 0;

  /** Chunks waiting to be sent */
  private sendQueue = new PriorityQueue<any>(defaultSendPriority);

  /** Ctor */
  public constructor(private opts: ISessionServiceDataChannelOpts) {
    super();

    opts.maxUnacknowledgedChunks =
      opts.maxUnacknowledgedChunks || defaultMaxUnacknowledgedChunks;
    opts.sessionServiceClientFactory =
      opts.sessionServiceClientFactory ||
      ((url, clientOpts) => new SessionServiceClient(url, clientOpts));

    this.logger = createLog('SSDataChannel', this.opts.logLevel);

    this.ssClient = opts.sessionServiceClientFactory(
      getBindUrl(opts.bindingInfo),
      {
        messagePrefix: opts.messagePrefix,
        logLevel: opts.logLevel,
      },
    );
    this.ssClient.addListener('readyState', this.ssClientReadyState);
    this.ssClient.addListener('message', this.ssClientMessage);

    this.ready = this.asyncInit();
  }

  /** Async initialization */
  private async asyncInit() {
    await this.ssClient.ready;
    const bindNotification = await this.ssClient.waitForNotificationMessage(
      'bindingNotification',
    );
    this.chunkSize = bindNotification.maxMessageSize || defaultChunkSize;

    this.key = await importKey(this.opts.bindingInfo.symmetricKey);

    return bindNotification.peerDescription;
  }

  /** IDataChannel.readyState */
  public get readyState() {
    return this.ssClient.readyState;
  }

  /** IDataChannel.generateMessageId */
  public generateMessageId(): string {
    return this.opts.messagePrefix + uuid.v4();
  }

  /** IDataChannel.sendRequest */
  public async sendData(
    id: string,
    request: Data,
    opts: { priority?: number } = {},
  ) {
    this.logger.debug('--> Sending', request);

    const encryptedPayload = await encryptPayload(this.key, request);
    const { totalBytes, chunks } = chunkify(encryptedPayload, this.chunkSize);

    const messagesToSend = chunks.map(chunk => ({
      type: 'message',
      messageId: id,
      sequence: chunk.sequence,
      message: chunk.data,
    }));

    this.chunkProgress[id] = {
      chunkSize: this.chunkSize,
      totalBytes,
    };

    messagesToSend.forEach(message =>
      this.sendQueue.enqueue(message, opts.priority),
    );

    this.processSendQueue();
  }

  /** Sends data from queue if possible according to `maxUnacknowledgedChunks` */
  private processSendQueue() {
    const chunksToSend = Math.min(
      this.opts.maxUnacknowledgedChunks - this.numOfUnacknowledgedChunks,
      this.sendQueue.length,
    );

    for (let i = 0; i < chunksToSend; i++) {
      const next = this.sendQueue.dequeue();
      this.ssClient.send(next);
      this.numOfUnacknowledgedChunks++;
    }
    this.logger.debug('SENT', chunksToSend, this.numOfUnacknowledgedChunks);
  }

  /** IDataChannel.close */
  public close(): void {
    this.emit('readyState', { readyState: ReadyState.Closed });
    this.ssClient.removeListener('message', this.ssClientMessage);
    this.ssClient.removeListener('readyState', this.ssClientReadyState);
    this.ssClient.close();
  }

  /** SessionServiceClient `readyState` event handler */
  private ssClientReadyState = (
    event: SessionServiceEventMap['readyState'],
  ) => {
    // Raise all session service client readyState events as our own
    this.emit('readyState', event);
  };

  /** SessionServiceClient `message` event handler */
  private ssClientMessage = (event: SessionServiceEventMap['message']) => {
    const { message } = event;
    this.logger.debug('--> Reveiving', message);

    switch (message.type) {
      case 'ack':
        this.receiveAckMessage(message as IAckMessage);
        return;
      case 'status':
        this.receiveStatusMessage(message as IStatusMessage);
        return;
      case 'message':
        this.receiveRelayMessage(message as IRelayMessage);
        return;
      case 'sessionClosed':
        this.receiveSessionClosedMessage(message as ISessionClosedMessage);
        return;
      default:
    }
  };

  /**
   * Processes a status message from Session Service
   */
  private receiveAckMessage(ackMessage: IAckMessage) {
    // Determine how many chunks have been acknowledged by
    // looking at lastChunkIndexReceived in relation to
    // chunksPerAck.
    const chunksReceived =
      ackMessage.lastChunkIndexReceived % this.chunksPerAck ||
      this.chunksPerAck;
    this.numOfUnacknowledgedChunks -= chunksReceived;
    this.logger.debug('ACK', chunksReceived, this.numOfUnacknowledgedChunks);

    const messageId = ackMessage.inResponseTo;
    const info = this.chunkProgress[messageId];
    if (!info) {
      return;
    }

    const { totalBytes, chunkSize } = info;
    const receivedBytes = Math.min(
      totalBytes,
      ackMessage.lastChunkIndexReceived * chunkSize,
    );
    if (receivedBytes === totalBytes) {
      delete this.chunkProgress[messageId];
    }

    this.emit('progress', {
      messageId,
      transferredBytes: receivedBytes,
      totalBytes,
    });

    this.processSendQueue();
  }

  /**
   * Processes a status message from Session Service
   */
  private receiveStatusMessage(statusMessage: IStatusMessage) {
    // Don't emit connection unavailable statuses
    if (statusMessage.status === SessionServiceStatusCode.PeerDisconnected) {
      return;
    }

    this.emit('message', {
      inResponseTo: statusMessage.inResponseTo,
      status: statusMessage.status,
    });
  }

  /**
   * Processes a message from peer.
   * Message might be a chunk of a larger message.
   * Message might be encrypted.
   */
  private async receiveRelayMessage(relayMessage: IRelayMessage) {
    let fullMessage: IRelayMessage;
    if (relayMessage.sequence) {
      let payload: any;
      try {
        payload = this.chunkAssembler.receive(
          relayMessage.message,
          relayMessage.sequence,
          relayMessage.messageId,
        );
      } catch {
        this.logger.warn(
          `Error assembling message chunk ${relayMessage.messageId}`,
        );
        return;
      }

      if (!payload) {
        return;
      }

      fullMessage = {
        type: 'message',
        messageId: relayMessage.messageId,
        inResponseTo: relayMessage.inResponseTo,
        message: payload,
      };
    } else {
      fullMessage = relayMessage;
    }

    if (fullMessage.message.encrypted) {
      fullMessage.message = await decryptPayload(this.key, fullMessage.message);
    }

    this.emit('message', {
      data: fullMessage.message,
      inResponseTo: fullMessage.inResponseTo,
      status: 200,
    });
  }

  private receiveSessionClosedMessage(_message: ISessionClosedMessage) {
    this.emit('sessionClosed', {});
  }

  public async wakeUp() {
    await this.ssClient.sendRequest({
      messageId: this.generateMessageId(),
      type: 'wakeup',
      payload: '{}',
    });
  }
}
