import { eventChannel, END } from 'redux-saga';
import { put, putResolve, take, takeMaybe, call, race, delay } from 'redux-saga/effects';
import 'paho-mqtt';
import { sign } from 'aws4';
import * as defaults from '../constants/defaults';
import apiActions, { types as apiTypes } from '../actions/api';
import iotNotificationsActions, { types as iotNotificationsTypes } from '../actions/iotNotifications';

// TODO: Replace ./iotNotifcations.js with this saga and implement topic notification filtering logic

let lastRetry;

// Force a reconnect every 10 minutes
const CHANNEL_EXPIRATION = 10 * 60 * 1000;

// Prevent reconnecting sooner than every minute
const CHANNEL_RETRY_MIN = 1 * 60 * 1000;

// ensure IoT channel creds are valid for at least 5 second
const BUFFER = 5000;

function validateChannelData (data) {
  if (data) {
    if (new Date() - BUFFER <= new Date(data.expires)) {
      return true;
    }
  }
  return false;
}

function * getChannelData (owner) {
  if (lastRetry) {
    const timeElapsed = Date.now() - lastRetry;
    if (timeElapsed < CHANNEL_RETRY_MIN) {
      yield delay(CHANNEL_RETRY_MIN - timeElapsed);
    }
  }

  lastRetry = Date.now();

  yield putResolve(apiActions.getIotNotificationsChannelInfo(owner));

  const channelInfoAction = yield take([
    apiTypes.GET_IOT_NOTIFICATIONS_CHANNEL_INFO.SUCCESS,
    apiTypes.GET_IOT_NOTIFICATIONS_CHANNEL_INFO.FAILURE
  ]);

  if (channelInfoAction.type === apiTypes.GET_IOT_NOTIFICATIONS_CHANNEL_INFO.FAILURE) {
    return { error: channelInfoAction.error };
  }

  return { data: channelInfoAction.data };
}

function createChannel ({ topic, accessKeyId, secretAccessKey, sessionToken, region, iotEndpointAddress, owner }) {
  return eventChannel((emitter) => {
    const clientId = Math.floor(Math.random() * 1000000000).toString();

    const signingOptions = {
      service: 'iotdevicegateway',
      region,
      headers: {
        host: iotEndpointAddress
      },
      path: '/mqtt?X-Amz-Expires=86400',
      signQuery: true
    };

    const req = sign(signingOptions, { accessKeyId, secretAccessKey });
    const url = `wss://${req.hostname}${req.path}&X-Amz-Security-Token=${encodeURIComponent(sessionToken)}`;

    const client = new window.Paho.MQTT.Client(url, clientId);

    client.onConnectionLost = err => {
      emitter(put({ type: iotNotificationsTypes.DISCONNECT, err }));

      /* Delay 10 seconds as this could occur when it's a new account and the
       * AWS IoT endpoint is still provisioning. */
      emitter(delay(10000));
      emitter(END);
    };

    client.onMessageArrived = message => {
      let payload;
      try {
        payload = JSON.parse(message.payloadString);
      } catch (e) {
        console.warn(
          `Failed to parse notification message: ${message.payloadString}`
        );
        return;
      }

      const {
        type,
        ...notification
      } = payload;

      emitter(put(iotNotificationsActions.notification({
        ...notification,
        owner,
        // Re-assign type if it's an ephemeral deployment
        eventType: (payload.ephemeralId && payload.ephemeralId !== defaults.EPHEMERAL_ID_DEFAULT) ? 'ephemeral' : type
      })));
    };

    const options = {
      useSSL: true,
      mqttVersion: 4,
      keepAliveInterval: 20 * 60, // Maximum allowed by AWS IoT for websockets
      pingInterval: 10, // Ping service every 10 seconds to see if we're still alive
      onFailure: err => {
        emitter(put({ type: iotNotificationsTypes.CONNECT.FAILURE, owner, error: err }));
        emitter(END);
      },
      onSuccess: () => {
        let subOptions = {
          qos: 1,
          onSuccess: () => {
            emitter(put({ type: iotNotificationsTypes.CONNECT.SUCCESS, owner }));
          },
          onFailure: err => {
            emitter(put({ type: iotNotificationsTypes.CONNECT.FAILURE, owner, error: err }));

            /* Delay 10 seconds as this shouldn't fail and we don't want to
             * go into an infinite loop */
            emitter(delay(10000));
            emitter(END);
          }
        };

        client.subscribe(topic, subOptions);
      }
    };

    client.connect(options);

    return () => {
      if (client.isConnected()) client.disconnect();
    };
  });
}

function * connect (owner) {
  let channel;
  let oldChannel;
  let data;
  let error;

  try {
    while (true) {
      if (!validateChannelData(data)) {
        ({ data, error } = yield call(getChannelData, owner));
        if (error) {
          yield put({ type: iotNotificationsTypes.CONNECT.FAILURE });
          yield delay(10000);
          continue;
        }
      }

      const expiration = Date.now() + CHANNEL_EXPIRATION;

      channel = yield call(createChannel, { ...data, owner });

      /* An old channel may exist because it expired. Close it now that we have
       * reconnected with a new channel. */
      if (oldChannel) {
        oldChannel.close();
        oldChannel = undefined;
      }

      while (true) {
        /* Wait until the channel expires. This is to work around a temporary
         * AWS IoT service issue. It appears our notification channels only
         * stay up for 15 minutes. */
        const waitUntil = expiration - Date.now();
        const { message } = yield race({
          message: takeMaybe(channel),
          timeout: delay(waitUntil)
        });

        /* When we expire out, force a reconnect, but don't shut this channel
         * down just yet in case messages might come before we are reconnected.
         */
        if (!message) {
          oldChannel = channel;
          channel = undefined;
          break;
        }

        if (message.type === END.type) {
          channel = undefined;
          break;
        }

        yield message;
      }
    }
  } finally {
    if (channel) {
      channel.close();
      yield put({ type: iotNotificationsTypes.DISCONNECT });
    }
  }
}

export default function * iotNotifications (action) {
  while (true) {
    const { owner } = yield take(iotNotificationsTypes.START_IOT_NOTIFICATIONS);
    yield race({
      task: call(connect, owner),
      cancel: take(iotNotificationsTypes.STOP_IOT_NOTIFICATIONS)
    });
  }
}
