import invariant from "tiny-invariant";

/**
 * A stream of messages. Buffers all messages received and re-sends them to a new listeners.
 */
export class MessageStream<T> {
    private readonly buffer: T[] = [];
    private listener: ((message: T) => void) | undefined = undefined;

    private publishBuffer() {
        queueMicrotask(() => {
            const { buffer } = this;
            let i = 0;
            for (; i < buffer.length; i++) {
                if (!this.callListener(buffer[i])) {
                    break;
                }
            }
            buffer.splice(0, i);
        });
    }

    private callListener(message: T) {
        try {
            // Might have been unsubscribed while handling a message
            if (!this.listener) {
                return false;
            }
            this.listener(message);
            return true;
        } catch (e) {
            console.error("Error in message listener", e);
            // Errors count as handled messages
            return true;
        }
    }

    subscribe(listener: (message: T) => void) {
        invariant(!this.listener, "Listener already subscribed");
        this.listener = listener;
        this.publishBuffer();
        return () => {
            this.listener = undefined;
        };
    }

    publish(message: T) {
        queueMicrotask(() => {
            if (!this.callListener(message)) {
                this.buffer.push(message);
            }
        });
    }
}
