import { uniq, difference } from 'lodash'
import { ThunkAction } from 'redux-thunk'
import { Subject } from 'rxjs'
import { bufferTime, map } from 'rxjs/operators'

import { notificationsCacheDefinition } from '../../lib/asyncSelector/notificationsCacheDefinition'
import { portcallsCacheDefinition } from '../../lib/asyncSelector/portcallsCacheDefinition'
import { pushToQueue } from '../../lib/asyncSelector/queueActions'
import { visitUpdatedEventIds, visitDeletedEventIds } from '../../lib/websocket/websocket.actions'
import { AppAction } from '../../modules/App/App.actions'
import { IAppState } from '../../modules/App/interfaces/IAppState'
import { expire } from '../../modules/CacheQueue/IExpire'
import { SECOND } from '../../shared/constants'

import { createBatch } from './IMessageBatch'
import { Message } from './Message'

const websocketQueue = new Subject<Message>()

export function pushWebsocketQueue(message: Message): void {
  websocketQueue.next(message)
}

export const handleWebsocketQueueMessages: ThunkAction<void, IAppState, void, AppAction> = dispatch => {
  websocketQueue.pipe(bufferTime(30 * SECOND), map(createBatch)).subscribe(batch => {
    if (batch.notificationIdsToUpdateOrRemove.length > 0) {
      dispatch(
        pushToQueue(
          notificationsCacheDefinition.cacheId,
          expire(notificationsCacheDefinition.cacheId, uniq(batch.notificationIdsToUpdateOrRemove))
        )
      )
    }

    // Only process updates for portcalls which have *not* been removed already.
    const portcallIdsToUpdate = difference(batch.portcallIdsToUpdate, batch.portcallIdsToRemove)
    if (portcallIdsToUpdate.length > 0) {
      const uniquePortcallIdsToUpdate = uniq(portcallIdsToUpdate)
      dispatch(
        pushToQueue(
          portcallsCacheDefinition.cacheId,
          expire(portcallsCacheDefinition.cacheId, uniquePortcallIdsToUpdate)
        )
      )
      dispatch(visitUpdatedEventIds(uniquePortcallIdsToUpdate, Date.now()))
    }

    if (batch.portcallIdsToRemove.length > 0) {
      const uniquePortcallIdsToRemove = uniq(batch.portcallIdsToRemove)
      // First, remove the portcall id's from the relevant places:
      dispatch(visitDeletedEventIds(uniquePortcallIdsToRemove, Date.now()))

      // Then expire the existing portcall, as people shouldn't be using that anymore:
      dispatch(
        pushToQueue(
          portcallsCacheDefinition.cacheId,
          expire(portcallsCacheDefinition.cacheId, uniquePortcallIdsToRemove)
        )
      )
    }
  })
}
