'use client';

import { WrappedEvent } from '@drift-labs/sdk';
import { MarketId, MarketKey } from '@drift/common';
import { useContext, useEffect, useMemo, useRef } from 'react';
import { useInterval } from 'react-use';
import { Subscription } from 'rxjs/internal/Subscription';
import { dlog } from '../../dev';
import Env from '../../environmentVariables/EnvironmentVariables';
import {
	DriftBlockchainEventSubjectContext,
	MERGE_USER_AND_GLOBAL_EVENT_SUBJECTS,
} from '../../providers/driftEvents/driftEventSubjectProvider';
import DLOB_SERVER_WEBSOCKET_UTILS, {
	DlobServerChannel,
} from '../../providers/websockets/dlobServerWebsocketUtils';
import { WebsocketSubjectContext } from '../../providers/websockets/websocketSubscriptionProvider';
import useMarketStateStore, {
	DlobListeningSelection,
} from '../../stores/useMarketStateStore';
import { ResultSlotIncrementer } from '../../utils/resultSlotIncrementer';
import useInfoForCurrentlySelectedMarket from '../useInfoForCurrentlySelectedMarket';
import { deserializeTradeResponse } from './dlobServerUtils';
import { useCurrentUserForMarketOrderEventsRef } from '../../components/MarketOrderToasts/MarketOrderToastEventEmitterProvider';
import useSubscriptionStateStore from '../../stores/useSubscriptionStateStore';

// # See README.md in src/hooks/driftEvents for more information on the subscription state

const DEV_TRIGGER_FALLBACKS = false;

const websocketUrl = Env.dlobWsServerToUse;

const resultIncrementer = new ResultSlotIncrementer();

const UPDATE_TICK_MS = 100; // We want the tick rate for this to be fast, because the purpose of the websocket is to be highlighy responsive. But we still use a tick-rate to prevent UI lag during cascades of state changes.

const HEARTBEAT_TIMEOUT_MS = 10000; // Trigger fallback if no heartbeat received within 10 seconds
const HEARTBEAT_CHECK_INTERVAL_MS = 1000; // Check for heartbeats every second
const HEARTBEAT_GRACE_PERIOD_MS = 15_000; // On startup, we give the websocket a grace period to start sending heartbeats

const UPDATES_BUFFER: {
	marketId: MarketId;
	data: WrappedEvent<'OrderActionRecord'>;
}[] = [];
const WS_RECONNECT_TIMES = 3;

// TODO :: This class is in a weird limbo state where some of the variables are configured as if it should support websockets for multiple markets at the same time, but in the critical parts it is ONLY USING CurrentMarketId .. so we should refactor things to make it clear that it only works for the current market

const useSyncRecentTradesWithServerWebsocket = () => {
	const currentUserRef = useCurrentUserForMarketOrderEventsRef();

	const subscriptionSetupTimeRef = useRef<number>(Date.now());
	const lastHeartbeatRef = useRef<number>(Date.now());
	const lastFallbackTimeRef = useRef<number>(Date.now());
	const hasFallenBackBeforeRef = useRef<boolean>(false);

	const currentMarketId = useInfoForCurrentlySelectedMarket()?.info?.marketId;

	const currentDataSource = useMarketStateStore(
		(s) => s.marketSubscriptionState?.[currentMarketId?.key]?.listeningSelection
	);

	const isEnabled = useSubscriptionStateStore(
		(s) => s.globalEventsState.subscriptionType === 'trade-publisher'
	);

	const marketsToSubscribe = useMemo(() => {
		if (
			currentDataSource === DlobListeningSelection.BLOCKCHAIN ||
			!currentMarketId
		) {
			return [];
		} else {
			return [currentMarketId.key];
		}
	}, [currentMarketId, currentDataSource]);

	const driftEventSubjectContext = useContext(
		DriftBlockchainEventSubjectContext
	);

	const websocketProvider = useContext(WebsocketSubjectContext);

	const unsubscribeFromMarket = (marketId: MarketId) => {
		const subscription = subscriptions.current.get(marketId.key);
		if (subscription) {
			subscription.unsubscribe();
			subscriptions.current.delete(marketId.key);
		}
	};

	const handleDatasourceFallback = (marketId: MarketId) => {
		dlog(
			`optimised_event_subscriptions`,
			`recent_market_trades_websocket_fallback_triggered`
		);
		hasFallenBackBeforeRef.current = true;
		lastFallbackTimeRef.current = Date.now();

		useSubscriptionStateStore.getState().handleGlobalEventsFallback();

		// Unsubscribe from the websocket for the given market
		unsubscribeFromMarket(marketId);
	};

	const subscriptions = useRef<
		Map<MarketKey, { subscription: Subscription; unsubscribe: () => void }>
	>(new Map());

	const handleNewTrade = (
		marketId: MarketId,
		data: WrappedEvent<'OrderActionRecord'>
	) => {
		dlog(
			`optimised_event_subscriptions`,
			`received::event_from_websocket_recent_trades`
		);

		UPDATES_BUFFER.push({ marketId, data });
	};

	const bufferUpdateMutex = useRef(false);
	const wsErrorCounter = useRef(0);

	const appendTradeRecord = (
		newOrderActionRecord: WrappedEvent<'OrderActionRecord'>
	) => {
		const globalFillEventSubject =
			driftEventSubjectContext.globalFillEventSubject;

		const eventIsForCurrentUser =
			currentUserRef.current &&
			(newOrderActionRecord.taker?.equals(currentUserRef.current) ||
				newOrderActionRecord.maker?.equals(currentUserRef.current));

		if (!globalFillEventSubject) {
			return;
		}

		dlog(
			`optimised_event_subscriptions`,
			`piping::websocket_recent_trade=>global_fill_subject`
		);

		globalFillEventSubject.next(newOrderActionRecord);

		if (MERGE_USER_AND_GLOBAL_EVENT_SUBJECTS && eventIsForCurrentUser) {
			dlog(
				`optimised_event_subscriptions`,
				`piping::websocket_recent_trade=>user_event_subject`
			);
			driftEventSubjectContext.userEventSubject?.next(newOrderActionRecord);
		}
	};

	const runUpdates = () => {
		if (bufferUpdateMutex.current) return;

		bufferUpdateMutex.current = true;

		// Get the buffered updates to handle
		const updatesToHandle = [...UPDATES_BUFFER];

		// Clear the buffer
		UPDATES_BUFFER.splice(0, UPDATES_BUFFER.length);

		updatesToHandle.forEach((update) => appendTradeRecord(update.data));

		bufferUpdateMutex.current = false;
	};

	// This loops through at the desired tick rate, updating recent trades with the latest updates. The reason for doing this is to prevent cascades of store state changes for each individual websocket response.
	useInterval(() => {
		runUpdates();
	}, UPDATE_TICK_MS);

	// Add heartbeat monitoring interval
	useInterval(() => {
		// Skip heartbeat checks if the websocket is disabled
		if (!isEnabled) return;

		// Skip heartbeat checks during the grace period. Only allow the grace period when we haven't fallen back yet
		if (
			!hasFallenBackBeforeRef.current &&
			Date.now() - subscriptionSetupTimeRef.current < HEARTBEAT_GRACE_PERIOD_MS
		) {
			return;
		}

		if (currentMarketId && subscriptions.current.size > 0) {
			const timeSinceLastHeartbeat = Date.now() - lastHeartbeatRef.current;

			dlog(
				`websocket_debugging`,
				`${currentMarketId.key} HEARTBEAT TIME: ${timeSinceLastHeartbeat}ms`
			);

			if (timeSinceLastHeartbeat > HEARTBEAT_TIMEOUT_MS) {
				dlog(
					`websocket_debugging`,
					`${currentMarketId.key} NO HEARTBEAT RECEIVED FOR ${timeSinceLastHeartbeat}ms: TRIGGERING FALLBACK`
				);
				handleDatasourceFallback(currentMarketId);
			}
		}
	}, HEARTBEAT_CHECK_INTERVAL_MS);

	const setUpSubscriptions = () => {
		subscriptionSetupTimeRef.current = Date.now();
		lastHeartbeatRef.current = Date.now();

		if (!currentMarketId || !isEnabled) {
			dlog(
				`optimised_event_subscriptions`,
				`websocket_recent_trades_channel_disabled`
			);
			return;
		}

		dlog(
			`optimised_event_subscriptions`,
			`using_trades_publisher_for_global_events`
		);

		// # Handle any necessary unsubscriptions
		const newSubscriptionsSet = new Set<MarketKey>(marketsToSubscribe);

		const oldSubscriptionsSet = new Set<MarketKey>(
			subscriptions.current.keys()
		);

		// Unsubscribe from old subscriptions which are no longer needed
		for (const oldSubscriptionKey of oldSubscriptionsSet) {
			if (!newSubscriptionsSet.has(oldSubscriptionKey)) {
				// Unsubscribe
				const subscription = subscriptions.current.get(oldSubscriptionKey);
				if (subscription) {
					dlog(
						`optimised_event_subscriptions`,
						`unsubscribing_websocket .. ${oldSubscriptionKey}`
					);
					subscription.unsubscribe();
					subscriptions.current.delete(oldSubscriptionKey);
				}
			}
		}

		if (oldSubscriptionsSet.has(currentMarketId.key)) {
			// Don't double subscribe
			return;
		}

		try {
			const onErrorCallback = () => {
				dlog(
					`websocket_debugging`,
					`${currentMarketId.key} CAUGHT ERROR IN WEBSOCKET: SWITCHING FROM DLOB-SERVER WEBSOCKET TO POLLING`
				);
				handleDatasourceFallback(currentMarketId);
			};

			const { subscription: marketSubscription, unsubscribe } =
				websocketProvider.subscribe(
					websocketUrl,
					`recent_trades_${currentMarketId.key}`,
					{
						onSubMessage: () => {
							return JSON.stringify(
								DLOB_SERVER_WEBSOCKET_UTILS.getSubscriptionProps({
									type: 'trades',
									market: currentMarketId,
								})
							);
						},
						onUnSubMessage: () => {
							return JSON.stringify(
								DLOB_SERVER_WEBSOCKET_UTILS.getUnsubscriptionProps({
									type: 'trades',
									market: currentMarketId,
								})
							);
						},
						onMessageCallback: (message: {
							channel: DlobServerChannel;
							data: string;
						}) => {
							if (message.channel === 'heartbeat') {
								if (DEV_TRIGGER_FALLBACKS) {
									if (Math.random() > 0.5) {
										return;
									}
								}

								dlog(
									`websocket_debugging`,
									`recent_trades_websocket_heartbeat_received`
								);
								lastHeartbeatRef.current = Date.now();
								return;
							}

							const orderActionRecord = deserializeTradeResponse(
								JSON.parse(message.data)
							);

							const resultKey = `${message.channel}_${currentMarketId.key}`;

							const lastSlotReceived = orderActionRecord.slot;

							const validResult = resultIncrementer.handleResult(
								resultKey,
								lastSlotReceived
							);

							if (!validResult) {
								return; // Skip results which aren't slot-increasing
							}

							handleNewTrade(currentMarketId, orderActionRecord);
						},
						messageFilter: (message: { channel: string; data: string }) => {
							return DLOB_SERVER_WEBSOCKET_UTILS.getMessageFilter({
								type: 'trades',
								market: currentMarketId,
							})(message);
						},
						onErrorCallback: onErrorCallback,
						errorMessageFilter: (message?: any) => {
							if (message.error) {
								console.log(`ERROR IN ERROR MESSAGE FILTER`, message);
								return true;
							}

							return false;
						},
					}
				);

			subscriptions.current.set(currentMarketId.key, {
				subscription: marketSubscription,
				unsubscribe,
			});
		} catch (e) {
			if (subscriptions.current.get(currentMarketId.key)) {
				subscriptions.current.get(currentMarketId.key).unsubscribe();
				subscriptions.current.delete(currentMarketId.key);
			}

			// try to resubscribe to websocket unless the error is hitting repeatedly
			if (wsErrorCounter.current > WS_RECONNECT_TIMES) {
				dlog(
					`websocket_debugging`,
					`${currentMarketId.key} CAUGHT ERROR SUBSCRIBING TO WEBSOCKET: SWITCHING FROM DLOB-SERVER WEBSOCKET TO POLLING`
				);
				wsErrorCounter.current = 0;
				handleDatasourceFallback(currentMarketId);
			} else {
				dlog(
					`websocket_debugging`,
					`${currentMarketId.key} CAUGHT ERROR SUBSCRIBING TO WEBSOCKET: RETRYING`
				);
				wsErrorCounter.current++;
				setUpSubscriptions();
			}

			return;
		}
	};

	// Handle changes in the subscription state
	useEffect(() => {
		if (!currentMarketId) return;

		if (!isEnabled) {
			// TODO Should unsubscribe from everything
			return;
		}

		dlog(
			`recent_trades_subscriptions`,
			`expected_websocket_subscriptions .. ${currentMarketId.key}`
		);

		setUpSubscriptions();
	}, [marketsToSubscribe, isEnabled]);
};

export default useSyncRecentTradesWithServerWebsocket;
