import { BehaviorSubject, Observable, Subject, Subscriber } from 'rxjs';
import {
  WebSocketSubject,
  WebSocketSubjectConfig,
} from 'rxjs/observable/dom/WebSocketSubject';

import {
  BaseApiError,
  ConnectionStatus,
  IBaseApi,
  ITransferProgress,
} from './base-api-interfaces';
import { arrayBufferEncoding } from './binary-encodings';
import SawCrypto from './crypto';
import * as Base64 from './base64';
import {
  IChunkMessage,
  IEncryptedMessage,
  IMessage,
  IPayloadMessage,
  IStatusMessage,
  IBindNotification,
  IWakeupMessage,
  IPingMessage,
  IAckMessage,
} from './message-interfaces';
import { uuid } from './uuid';

interface IPendingRequest {
  source: IPayloadMessage;
  chunksRemaining: IMessage[];
  totalChunks: number;
}

/**
 * Default configuration options
 */
const kDefaultOpts = {
  // ctor function used for creating new websocket instances
  webSocketCtor: WebSocket,

  // encoder/decoder used to convert between utf8 and binary
  binaryEncoding: arrayBufferEncoding(),

  // duration used for detecting connection and sent timeouts
  timeout: 15000,

  // how long to wait for a response to a sent message
  defaultResponseTimeout: 35000,

  // how often 'ping' messages are sent (actual time varies between 100% and 125% of this value)
  defaultPollingInterval: 30000,

  // maximum amount of send requests that can be waiting for a response
  maxConcurrentSendRequests: 5,

  // number of automatic send retry attempts
  sendRetryAttampts: 1,

  // true if diagnostic logging should be enabled
  loggingEnabled: false,

  // useful for debugging messages on the server
  messageIdPrefix: 'AS',

  // default chunk size (this opt is used for testing, normally obtained from master)
  defaultChunkSize: 16384,

  // how many chunks to send before receiving an Ack
  uploadChunksPerAck: 20,
};

export type IBaseApiOptions = typeof kDefaultOpts;

/**
 * Constructor definition used to create websocket impelementations by Observable.websocket()
 */
export type WebSocketCtor = new (
  url: string,
  protocol?: string | string[],
) => WebSocket;

/**
 * The Map type used to track message chunks before assembling
 * them back into IPayloadMessages
 */
interface IChunkMessagesById {
  [id: string]: IChunkMessage[];
}

/**
 * Main implementation of IBaseApi. Uses websocket to connect to backend service.
 * Features message queuing mechanism and ability to retry certain failed requests.
 */
export class BaseApi implements IBaseApi {
  // --------------------------------------------------------------------------
  // Properties

  // IBaseApi.status$
  public get status$() {
    return this.statusSubject.asObservable();
  }
  private statusSubject = new BehaviorSubject<ConnectionStatus>('disconnected');

  // IBaseApi.status
  public get status(): ConnectionStatus {
    return this.statusSubject.value;
  }

  // IBaseApi.errors$
  public get errors$() {
    return this.errorsSubject.asObservable();
  }
  private errorsSubject = new BehaviorSubject<BaseApiError>(null);

  // IBaseApi.lastError
  public get lastError(): BaseApiError {
    return this.errorsSubject.value;
  }

  // IBaseApi.inbound$
  public get inbound$() {
    return this._inbound$;
  }
  private _inbound$: Observable<IMessage>;

  /** IBaseApi.progress$ */
  public get progress$() {
    return this._progress$;
  }
  private _progress$ = new Subject<ITransferProgress>();

  // Number of requests in upload queue
  public get pendingRequestQueueLength() {
    return this.pendingRequests.length;
  }

  // binary messages from websocket
  private blobReceivedSubject = new Subject<Blob>();

  // decoded (unassembled) binary messages from websocket
  private messages$ = new Observable<IMessage>();

  // --------------------------------------------------------------------------
  // State

  // configuration options
  public opts: IBaseApiOptions;

  // current endpoint
  private endpoint: string;

  // maximum number of attempts allowed for establishing a connection
  private maxConnectionAttempts: number = 2;

  // maximum size of a message chunk (this value is retrieved during paring)
  private maxChunkSize: number;

  // web socket subject, created and initialized when connecting
  private socket: WebSocketSubject<any>;

  // dict of message chunks that are pending assembly into full payload messages
  private incomingChunks: IChunkMessagesById = {};

  // key to be used for encryption
  private symmetricKey: CryptoKey;

  // polling interval id returned by calling setInterval() to enable server polling
  private pollingIntervalId: any;

  // last polling request timestamp
  private pollingLastTime: number;

  // queue of requests that are awaiting responses
  private pendingRequests: IPendingRequest[] = [];

  /**
   * Creates BaseApi instance.
   * @param {IBaseApiOptions} opts - configuration options
   */
  constructor(opts: Partial<IBaseApiOptions> = kDefaultOpts) {
    // configuration
    this.opts = Object.assign({}, kDefaultOpts, opts);
    this.maxChunkSize = this.opts.defaultChunkSize;

    // decode websocket binary messages into IMessages
    this.messages$ = this.blobReceivedSubject
      .flatMap((blob: Blob) => this.receiveMessage(blob)) // convert blob into IMessage
      .share();

    // assemble messages
    this._inbound$ = this.messages$
      .flatMap(msg =>
        msg.type === 'message' && (msg as IChunkMessage).sequence
          ? this.processChunk(msg as IChunkMessage) // assemble message fragment
          : Observable.of(msg),
      )
      .filter(msg => !!msg)
      .do(msg => this.log(msg))
      .share();

    // Handle ACKS
    this._inbound$
      .filter(msg => msg.type === 'ack')
      .subscribe((ack: IAckMessage) => {
        const pending = this.pendingRequests.find(
          req => req.source.messageId === ack.inResponseTo,
        );
        this._progress$.next({
          type: 'upload',
          messageId: ack.inResponseTo,
          current: ack.lastChunkIndexReceived,
          total: pending ? pending.totalChunks : ack.lastChunkIndexReceived,
        });
        this.processPendingQueue();
      });

    // log status changes if logging is enabled
    this.status$.do(s => this.log('ConnectionStatus:', s)).subscribe();
  }

  /**
   * Opens the websocket connection.
   * Emits on successful connection then completes.
   */
  public connect(
    endpoint: string,
    symmetricKey?: string,
    attempts?: number,
  ): Observable<void> {
    return Observable.defer(() => {
      const connectionStatus = this.statusSubject.getValue();

      if (connectionStatus !== 'disconnected' && connectionStatus !== 'error') {
        throw new BaseApiError('ALREADY_CONNECTED');
      }

      this.endpoint = endpoint;
      this.maxConnectionAttempts = attempts || this.maxConnectionAttempts;

      if (this.lastError) {
        this.errorsSubject.next(null);
      }
      this.statusSubject.next('connecting');

      // create connection (retry connecting a few times if necessary)
      this.establishSession(this.maxConnectionAttempts - 1);

      // Observable for decoding key and saving it on the instance:
      const decodedKeyPromise = Promise.resolve(
        symmetricKey ? SawCrypto.base64ToKey(symmetricKey) : null,
      );
      const setKey$ = Observable.fromPromise(decodedKeyPromise).do(
        key => (this.symmetricKey = key),
      );

      // Observable for waiting for status to become connected:
      const waitForConnectionStatus$ = this.status$
        .flatMap(
          (status): Observable<string> => {
            if (status === 'error') {
              return Observable.throw(this.lastError);
            } else {
              return Observable.of(status);
            }
          },
        )
        .filter(status => status === 'connected');

      // Full sequence
      return setKey$
        .flatMapTo(waitForConnectionStatus$)
        .take(1)
        .map(() => undefined)
        .timeoutWith(
          this.opts.timeout,
          Observable.throw(new BaseApiError('CONNECTION_TIMEOUT')),
        );
    });
  }

  /**
   * Establishes a connection to the data channel
   */
  public connectData(
    endpoint: string,
    symmetricKey?: string,
    attempts?: number,
  ): Observable<IBindNotification> {
    return this.connect(
      endpoint,
      symmetricKey,
      attempts,
    )
      .flatMap(this.waitForBindingNotification)
      .do(bindNotification => {
        this.maxChunkSize = bindNotification.maxMessageSize;
      });
  }

  /**
   * Waits for and then emits binding notification.
   */
  public waitForBindingNotification = (): Observable<IBindNotification> => {
    return this.inbound$
      .map(msg => {
        if (msg.type === 'bindingNotification') {
          return msg as IBindNotification;
        } else if (msg.type === 'status') {
          const statusMsg = msg as IStatusMessage;
          if (statusMsg.status === 453) {
            throw new BaseApiError('INVALID_BINDING_TOKEN', msg);
          }
        } else {
          throw new BaseApiError('UNEXPECTED', msg);
        }
      })
      .take(1);
  };

  /**
   * Closes the websocket connection.
   */
  public disconnect(): Observable<void> {
    if (this.statusSubject.getValue() === 'disconnected') {
      return Observable.of(undefined);
    }

    if (this.status === 'error') {
      this.statusSubject.next('disconnected');
    } else {
      this.socket.unsubscribe();
    }

    return this.status$
      .filter(status => status === 'disconnected' || status === 'error')
      .mapTo(undefined)
      .take(1);
  }

  /** IBaseApi.generateMessageId() */
  public generateMessageId() {
    return this.opts.messageIdPrefix + uuid();
  }

  /**
   * Sends a message via websocket.
   */
  public send(msg: IMessage, timeout?: number): Observable<IMessage> {
    return Observable.defer(() => {
      if (this.status !== 'connected') {
        return Observable.throw(new BaseApiError('SEND_DISCONNECTED'));
      }

      if (!msg.messageId) {
        msg.messageId = this.generateMessageId();
      }

      timeout =
        typeof timeout === 'undefined'
          ? this.opts.defaultResponseTimeout
          : timeout;

      this.log('sending message: ', msg);

      // Send sequence will be different depending on message type
      const send$ =
        msg.type === 'message'
          ? this.sendAppMessage(msg as IPayloadMessage)
          : this.sendToWebsocket([msg]);

      // Response sequence emits a successful response message, or throws on error status
      const response$ = this.inbound$
        .filter(m => m.type !== 'ack' && m.inResponseTo === msg.messageId)
        .map(res => {
          if (res.type === 'status') {
            const status = res as IStatusMessage;
            if (status.status === 550) {
              throw new BaseApiError('SEND_DELIVERY_ERROR', res);
            } else if (status.status === 551) {
              throw new BaseApiError('MESSAGE_TRUNCATED_ERROR', res);
            } else if (status.status !== 200) {
              throw new BaseApiError('SEND_STATUS_ERROR', res);
            }
          }
          return res;
        });

      // Sequence that throws any connection errors
      const connectionErrors$ = this.errors$
        .filter(e => !!e)
        .flatMap(err => Observable.throw(err));

      // Timeout sequence
      const timeout$ =
        timeout <= 0
          ? Observable.empty() // empty sequence
          : this.inbound$
              .filter(item => item.messageId === msg.messageId)
              .timeoutWith(
                timeout,
                Observable.throw(new BaseApiError('RESPONSE_TIMEOUT', timeout)),
              )
              .ignoreElements();

      // Ensure request is removed from pending requests
      const cleanUp = () => {
        this.pendingRequests = this.pendingRequests.filter(
          pendingReq => pendingReq.source.messageId !== msg.messageId,
        );
      };

      // Full sequence
      return send$
        .flatMapTo(response$)
        .merge(connectionErrors$)
        .merge(timeout$)
        .take(1)
        .finally(cleanUp);
    });
  }

  /**
   * Sends a data message via websocket. Session Service will relay these
   * messages to the device.
   */
  public sendData(
    msg: IPayloadMessage,
    timeout?: number,
  ): Observable<IPayloadMessage> {
    return this.send(msg, timeout) as Observable<IPayloadMessage>;
  }

  /**
   * Connects to web socket, tries `retries` times.
   * Returned observable emits upon successful connection and
   * completes when the session completes.
   */
  private establishSession(attemptsRemaining: number) {
    // configure the connection
    const config: WebSocketSubjectConfig = {
      url: this.endpoint,
      openObserver: Subscriber.create<Event>(ev => this.onWebSocketOpen(ev)),
      closeObserver: Subscriber.create<CloseEvent>(ev =>
        // does not get called on this.socket.unsubscribe
        this.onWebSocketClose(ev),
      ),
      resultSelector: (e: MessageEvent) => e.data,
      WebSocketCtor: this.opts.webSocketCtor,
    };

    // create new websocket instance
    this.socket = Observable.webSocket(config);

    // subscribe to the socket to initiate the connection
    this.socket.subscribe(
      blob => this.blobReceivedSubject.next(blob),
      err => this.onWebSocketError(err, attemptsRemaining),
      () => {
        // allow any inbound messages to be emitted before signalling disconnect
        setTimeout(() => {
          this.statusSubject.next('disconnected');
        });
      },
    );
  }

  private startPolling() {
    this.stopPolling();

    this.pollingIntervalId = setInterval(async () => {
      if (
        Date.now() >=
        this.pollingLastTime + this.opts.defaultPollingInterval
      ) {
        // Send ping message without any timeout/retry handling
        this.sendToWebsocket([
          {
            type: 'ping',
          } as IPingMessage,
        ]).subscribe();
      }
    }, this.opts.defaultPollingInterval / 4);
  }

  private refreshPolling() {
    this.pollingLastTime = Date.now();
  }

  private stopPolling() {
    this.pollingIntervalId = clearInterval(this.pollingIntervalId);
    this.refreshPolling();
  }

  /**
   * Handles successful open of websocket
   */
  private onWebSocketOpen(ev: Event) {
    this.socket.socket.binaryType = this.opts.binaryEncoding.binaryType;
    this.statusSubject.next('connected');

    this.startPolling();
  }

  /**
   * Handles close of websocket
   */
  private onWebSocketClose(ev: CloseEvent) {
    this.stopPolling();

    if (ev.code >= 4000) {
      // Safari and the Node websocket client both report 4000+ codes as unclean,
      // but Chrome reports them as clean. For consistency, we'll treat them as unclean always.

      // HACK: Force the `wasClean` property of the event to be `false`, which will ensure that
      // the web socket observable will subsequently raise an error instead of completing cleanly.
      // For more info, see the `socket.onclose` handler in rxjs `WebSocketSubject.js`
      Object.defineProperty(ev, 'wasClean', { value: false });
    }
  }

  /**
   * Handles a websocket error.
   * Attempts to reestablish connection on certain error conditions.
   */
  private onWebSocketError(err: CloseEvent, attemptsRemaining: number) {
    if (this.status === 'connecting') {
      // Error attempting to connect
      if (attemptsRemaining > 0) {
        return this.establishSession(attemptsRemaining - 1);
      } else {
        this.setError(new BaseApiError('CONNECTION_FAILED', null, err));
      }
    } else if (err.type === 'close') {
      // Connection was terminated remotely
      this.setError(new BaseApiError('CONNECTION_LOST', null, err));
    } else {
      this.setError(new BaseApiError('UNEXPECTED', null, err));
    }
  }

  /**
   * Attempts to send a `message` message which is will be relayed
   * by the Session Service to the master.
   */
  private sendAppMessage(msg: IPayloadMessage): Observable<void> {
    if (msg.type !== 'message') {
      throw new Error('Invalid message type for `sendAppMessage`');
    }

    // Encrypt and then chunkify message for sending
    const payload = msg.message;
    return this.encryptPayload(payload)
      .map(encryptedPayload => ({
        ...(msg as any),
        message: encryptedPayload,
      }))
      .map(encryptedMessage => this.chunkifyMessage(encryptedMessage))
      .flatMap(messageChunks => {
        // Retain this request until it has been fully acknowledged
        this.pendingRequests.push({
          source: msg,
          chunksRemaining: messageChunks,
          totalChunks: messageChunks.length,
          lastAckChunk: 0,
        } as IPendingRequest);

        return this.processPendingQueue();
      });
  }

  /**
   * Send the next message on the queue.
   * Providing resume info will allow for resuming transfers
   * if possible.
   */
  private processPendingQueue() {
    const nextRequest = this.pendingRequests[0];
    if (!nextRequest) {
      return;
    }

    const chunksToSend = nextRequest.chunksRemaining.splice(
      0,
      this.opts.uploadChunksPerAck,
    );

    this.pendingRequests = this.pendingRequests.filter(
      req => req.chunksRemaining.length,
    );

    this.log(
      'Process',
      nextRequest.source.messageId,
      nextRequest.chunksRemaining.length,
      nextRequest.source.message.type +
        '/' +
        (nextRequest.source.message as any).operation,
    );
    return this.sendToWebsocket(chunksToSend);
  }

  /**
   * Sends messages to Session Service via websocket
   */
  private sendToWebsocket(messages: IMessage[]): Observable<void> {
    // Extend time until next Session Service ping
    this.refreshPolling();

    messages.forEach(message => {
      if (!message.messageId) {
        message.messageId = uuid();
      }

      const jsonString = JSON.stringify(message);
      this.opts.binaryEncoding.encode(jsonString, (_err: any, binary?: any) => {
        this.socket.next(binary);
      });
    });

    return Observable.of(undefined);
  }

  /**
   * Sends a wakeup push notification to the device via the Session Service
   */
  public wakeUpMaster(): Observable<void> {
    const msg: IWakeupMessage = {
      type: 'wakeup',
      payload: '{}',
    };
    return this.send(msg).map(() => undefined);
  }

  /**
   * Encrypts a message payload using pre-negotiated symmetric key.
   */
  public encryptPayload(
    payload: IPayloadMessage['message'],
  ): Observable<IEncryptedMessage['message']> {
    const iv = SawCrypto.generateIv();
    const encryptedPayloadPromise = SawCrypto.encrypt(
      iv,
      this.symmetricKey,
      JSON.stringify(payload),
    );

    return Observable.fromPromise(encryptedPayloadPromise).map(encrypted => ({
      iv,
      encrypted,
    }));
  }

  /**
   * Splits an IPayloadMessage into chunks
   */
  public chunkifyMessage(
    msg: IPayloadMessage | IEncryptedMessage,
  ): IChunkMessage[] {
    const encodedPayload = Base64.encode(JSON.stringify(msg.message));
    const allChunkData: string[] = [];

    let cursor = 0;
    while (cursor < encodedPayload.length) {
      const chunkData = encodedPayload.substring(
        cursor,
        cursor + this.maxChunkSize,
      );
      allChunkData.push(chunkData);
      cursor += chunkData.length;
    }

    return allChunkData.map(
      (chunkData, i) =>
        ({
          type: 'message',
          messageId: msg.messageId,
          inResponseTo: msg.inResponseTo,
          message: chunkData,
          sequence: `${i + 1}/${allChunkData.length}`,
        } as IChunkMessage),
    );
  }

  /**
   * Deserializes a raw message blob and emits that IMessage
   */
  private receiveMessage(binaryData: any): Observable<IMessage> {
    this.refreshPolling();

    return Observable.create((observer: Subscriber<IMessage>) => {
      this.opts.binaryEncoding.decode(binaryData, (err: any, data?: string) => {
        if (err) {
          observer.error(new BaseApiError('BAD_MESSAGE', 'Binary', err));
        } else {
          let obj: IMessage = null;
          try {
            obj = JSON.parse(data);
          } catch (err) {
            observer.error(new BaseApiError('BAD_MESSAGE', 'JSON', err));
            return;
          }

          observer.next(obj);
        }
      });
    });
  }

  /**
   * Assembles IMessageChunks into their original IPayloadMessage form.
   * Chunks are retained until an IPayloadMessage has been fully received and can be emitted.
   */
  public processChunk(msgChunk: IChunkMessage): Observable<IPayloadMessage> {
    const [currentMessageNum, numMessages] = msgChunk.sequence
      .split('/')
      .map(num => parseInt(num, 10));
    const msgId = msgChunk.messageId;
    const chunks =
      this.incomingChunks[msgId] || (this.incomingChunks[msgId] = []);
    const isLastMessage = currentMessageNum === numMessages;

    chunks.push(msgChunk);

    this._progress$.next({
      type: 'download',
      current: currentMessageNum,
      total: numMessages,
      messageId: msgChunk.inResponseTo,
    });

    if (!isLastMessage) {
      // not fully assembled
      return Observable.of(null);
    } else {
      // message has arrived completely
      delete this.incomingChunks[msgId];

      const rawPayloadData = chunks.reduce(
        (acc, curr) => acc + curr.message,
        '',
      );

      let payloadJson: string;
      try {
        payloadJson = Base64.decode(rawPayloadData);
      } catch (err) {
        throw new BaseApiError('BAD_MESSAGE', ['Base64', 'Payload'], err);
      }

      let possiblyEncryptedPayload:
        | IPayloadMessage['message']
        | IEncryptedMessage['message'];
      try {
        possiblyEncryptedPayload = JSON.parse(payloadJson);
      } catch (err) {
        throw new BaseApiError(
          'BAD_MESSAGE',
          ['JSON', 'PossiblyEncryptedPayload'],
          err,
        );
      }

      if (!possiblyEncryptedPayload) {
        throw new BaseApiError('BAD_MESSAGE', ['Empty', 'Payload']);
      }

      const isEncrypted = !!(possiblyEncryptedPayload as IEncryptedMessage['message'])
        .encrypted;
      const payload$ = isEncrypted
        ? this.decryptPayload(
            possiblyEncryptedPayload as IEncryptedMessage['message'],
          )
        : Observable.of(possiblyEncryptedPayload);

      return payload$.map(
        payload =>
          ({
            type: 'message',
            messageId: msgId,
            inResponseTo: msgChunk.inResponseTo,
            message: payload,
          } as IPayloadMessage),
      );
    }
  }

  /**
   * Decrypts a message payload using pre-negotiated symmetric key
   */
  public decryptPayload(
    payload: IEncryptedMessage['message'],
  ): Observable<IPayloadMessage['message']> {
    const decryptedPayloadPromise = SawCrypto.decrypt(
      payload.iv,
      this.symmetricKey,
      payload.encrypted,
    );
    return Observable.fromPromise(decryptedPayloadPromise)
      .catch(
        (err: any): Observable<string> => {
          throw new BaseApiError(
            'BAD_MESSAGE',
            ['DecryptionFailed', 'Payload'],
            err,
          );
        },
      )
      .map(decryptedPayload => JSON.parse(decryptedPayload))
      .catch(
        (err: Error): Observable<IPayloadMessage['message']> => {
          throw new BaseApiError(
            'BAD_MESSAGE',
            ['JSON', 'DecryptedPayload'],
            err,
          );
        },
      );
  }

  /**
   * Sets and notifies an error.
   */
  private setError(error: BaseApiError) {
    // Order is important here
    this.errorsSubject.next(error);
    this.statusSubject.next('error');
  }

  /**
   * Logging helper
   */
  private log(...args: any[]) {
    if (this.opts.loggingEnabled) {
      // tslint:disable-next-line:no-console
      console.log.apply(
        null,
        [new Date().toLocaleTimeString(), 'BASEAPI'].concat(args),
      );
    }
  }
}
