import { Observable, Subject } from 'rxjs';
import { debounceTime, filter, switchMap } from 'rxjs/operators';

export type AsyncProcessorConfig<T, U, E> = {
	debounceTimeMs: number;
	expiryEvents: Set<E>;
	processingFunction: (input: T) => Promise<U>;
	inputValidator?: (input: T) => boolean;
};

/**
 * Utility class which handles ansyncronous processing of input data.
 *
 * This class will forward all processed data to an ouput observable, you can get the output observable by calling the `getOutput` method.
 *
 * Any update event which falls into the `expiryEvents` set will cause the output observable to emit `null` instantaneously, while the processing of the new input is being done.
 */
export class AsyncProcessor<T, U, E> {
	private inputSubject: Subject<T>;
	private outputSubject: Subject<U | null>;

	constructor(private config: AsyncProcessorConfig<T, U, E>) {
		this.inputSubject = new Subject<T>();
		this.outputSubject = new Subject<U | null>();
		this.setupProcessingPipeline();
	}

	public updateInput(updateType: E, input: T): void {
		if (this.config.expiryEvents.has(updateType)) {
			this.handleExpiryUpdate();
		}
		this.inputSubject.next(input);
	}

	public getOutput(): Observable<U | null> {
		return this.outputSubject.asObservable();
	}

	private handleExpiryUpdate() {
		this.outputSubject.next(null);
	}

	private setupProcessingPipeline() {
		this.inputSubject
			.pipe(
				filter((input) => this.validateInput(input)),
				debounceTime(this.config.debounceTimeMs),
				switchMap((input) => this.processInput(input))
			)
			.subscribe((output) => {
				this.outputSubject.next(output);
			});
	}

	private validateInput(input: T): boolean {
		return this.config.inputValidator
			? this.config.inputValidator(input)
			: true;
	}

	private async processInput(input: T): Promise<U | null> {
		try {
			return await this.config.processingFunction(input);
		} catch (error) {
			console.error('Error processing input:', error);
			return null;
		}
	}
}
