import { Connection, PublicKey } from '@solana/web3.js';
import { Observable, BehaviorSubject } from 'rxjs';
import { DEFAULT_COMMITMENT_LEVEL } from 'src/constants/constants';
import { dlog } from 'src/dev';

const getTokenAccountBalance = async (
	tokenAccountAddress: PublicKey,
	connection: Connection
): Promise<number> => {
	try {
		const tokenAccountBalance = await connection.getTokenAccountBalance(
			tokenAccountAddress
		);

		const returnNum = tokenAccountBalance.value.uiAmount
			?.toString()
			?.includes('e')
			? 0
			: tokenAccountBalance.value.uiAmount;

		return returnNum;
	} catch (e) {
		return -1;
	}
};

export const createTokenBalanceObservable = (
	tokenAccountAddress: string,
	connection: Connection
): Observable<number> => {
	const tokenBalanceSubject = new BehaviorSubject<number | undefined>(
		undefined
	);

	let accountChangeListener: number | undefined;

	const observable = new Observable<number>((subscriber) => {
		// Subscribe to the BehaviorSubject
		const subscription = tokenBalanceSubject.subscribe(subscriber);

		// A promise for when the initial token balance has been fetched
		let accountExistsPromise: Promise<boolean> | undefined = undefined;

		// Get the initial token balance and push to the subject
		accountExistsPromise = new Promise((resolve) =>
			getTokenAccountBalance(new PublicKey(tokenAccountAddress), connection)
				.then((balance) => {
					const accountExists = balance !== -1;
					tokenBalanceSubject.next(accountExists ? balance : 0);
					resolve(accountExists);
				})
				.catch((e) => {
					tokenBalanceSubject.next(0);
					resolve(false);
					dlog(
						`spl_subscription`,
						`caught_error_fetching_initial_balance_for_spl_account`,
						e
					);
				})
		);

		accountExistsPromise.then((accountExists) => {
			if (!accountExists) {
				dlog(
					`spl_subscription`,
					`skipping_account_change_listener_for_non_existent_spl_account `
				);
			} else {
				dlog(
					`spl_subscription`,
					`creating_account_change_listener_for_spl_account :: ${tokenAccountAddress}`
				);
				// Subscribe to updates and push to the subject
				accountChangeListener = connection.onAccountChange(
					new PublicKey(tokenAccountAddress),
					(_accountData) => {
						getTokenAccountBalance(
							new PublicKey(tokenAccountAddress),
							connection
						).then((balance) => {
							tokenBalanceSubject.next(balance ?? 0);
						});
					},
					DEFAULT_COMMITMENT_LEVEL
				);
			}
		});

		// Return the teardown logic
		return () => {
			subscription.unsubscribe();
			if (accountChangeListener !== undefined) {
				connection.removeAccountChangeListener(accountChangeListener);
			}
		};
	});

	return observable;
};
