import { ofType } from 'redux-observable';
import {
  mergeMap,
  map,
  catchError,
  switchMap,
  mapTo, tap, takeUntil, combineLatest,
} from 'rxjs/operators';

import { interval, of, Subject, timer } from 'rxjs';

import { webSocket } from 'rxjs/webSocket';

import {
  APP_LOAD,
} from '../../constants/actionTypes';

import { WS_PORT, PING_INTERVAL } from '../../constants/websocketTypes';
import {
  WS_CLOSED, WS_CONNECT,
  WS_CONNECTED, WS_DISCONNECTED,
  WS_MESSAGE_SENT,
  WS_PING_SERVER, WS_RETRY_EXHAUSTED,
  WS_SEND_MESSAGE,
} from '../../constants/actionTypes/ws';

const url = /s/.test(window.location.protocol) ? `wss://${window.location.hostname}` : `ws://${window.location.hostname}:${WS_PORT}`;

let webSocketSubject;
let onOpenSubject = new Subject();
let onCloseSubject = new Subject();

const connectSocket = (websocketUrl) => {
  onOpenSubject = new Subject();
  onCloseSubject = new Subject();
  webSocketSubject = webSocket({
    url: websocketUrl,
    openObserver: onOpenSubject,
    closeObserver: onCloseSubject,
  });
  return webSocketSubject;
};

export const makeConnection = action$ =>
  action$.pipe(
    ofType(APP_LOAD),
    mapTo({ type: WS_CONNECT }),
  );

export const connectTheSocket = action$ =>
  action$.pipe(
    ofType(WS_CONNECT),
    switchMap(({ value }) =>
      connectSocket(url).pipe(
        map(data => data),
        catchError(e => of({
          type: WS_DISCONNECTED,
          value,
        })),
      )),
  );

export const pingWebsocketServer = action$ =>
  action$.pipe(
    ofType(WS_CONNECTED),
    mergeMap(() =>
      interval(PING_INTERVAL + 1000).pipe(
        tap(() => webSocketSubject.next({ ping: true })),
        mapTo({ type: WS_PING_SERVER }),
        takeUntil(action$.pipe(ofType(WS_DISCONNECTED))),
      ),
    ),
  );

export const connection = action$ =>
  action$.pipe(
    ofType(WS_CONNECT),
    switchMap(() =>
      onOpenSubject.pipe(
        map(() => {
          onCloseSubject.pipe(
            map(() => ({
              type: WS_CLOSED,
            })),
          );
          return {
            type: WS_CONNECTED,
          };
        }),
      ),
    ),
  );

export const sendMessage = action$ =>
  action$.pipe(
    ofType(WS_SEND_MESSAGE),
    combineLatest(action$.pipe(
      ofType(WS_CONNECTED),
    ), action => action),
    map((action) => {
      webSocketSubject.next(action.value);
      return {
        type: WS_MESSAGE_SENT,
      };
    }),
  );

export const retryWSConnection = action$ => action$.pipe(
  ofType(WS_DISCONNECTED),
  mergeMap(({ value: previousCount }) => {
    const count = previousCount || 1;
    let delay;
    const scalingAttempt1 = 300; // first 5 min
    const scalingAttempt2 = 600;
    switch (true) {
      case count < scalingAttempt1:
        delay = 1000;
        break;
      case count >= scalingAttempt1 && count < scalingAttempt2:
        delay = 5000;
        break;
      default:
        delay = 15000;
    }
    if (count < 960) { // 2 hrs
      return timer(delay).pipe(
        mapTo({ type: WS_CONNECT, value: count + 1 }),
      );
    }
    onCloseSubject.complete();
    onOpenSubject.complete();
    return of({ type: WS_RETRY_EXHAUSTED });
  },
  ),
);
