import EventEmitter from 'events';
import * as ProjectAPI from 'api/project-api';
import LiveUpdateWebsocket, { messageTypes, isMessageType, } from 'api/live-update-websocket';
const messageTypesFromPattern = (pattern) => {
    if (pattern === '*') {
        return messageTypes;
    }
    if (typeof pattern === 'string') {
        return [pattern];
    }
    return pattern;
};
const subsystemTypesFromPattern = (pattern) => {
    if (pattern === '*') {
        return '*';
    }
    if (typeof pattern === 'string') {
        return [pattern];
    }
    return pattern;
};
const reportBufferRange = (buffer) => {
    if (buffer.length === 0) {
        return '[empty]';
    }
    const firstMessage = buffer[0];
    if (buffer.length === 1) {
        return `[${firstMessage.id}]`;
    }
    const lastMessage = buffer[buffer.length - 1];
    return `[${firstMessage.id}-${lastMessage.id}]`;
};
export class ConnectedMessageStream {
    constructor(connection, projectId, emitters, messageBuffer = []) {
        this.serverMessageBuffer = [];
        this.clientMessageBuffer = null;
        this.handleSocketRevive = () => {
            this.fromMessageId = this.socket.lastMessageId === -1
                ? null
                : this.socket.lastMessageId + 1;
            this.clientMessageBuffer = null;
        };
        this.handleSocketMessage = async (message) => {
            if (this.clientMessageBuffer === null) {
                // First live message from websocket just arrived.
                // Start buffering live messages; Fetch and process all messages from history.
                this.processBufferedMessages();
                this.clientMessageBuffer = [message];
                this.serverMessageBuffer = await ProjectAPI.getProjectMessages(this.connection, this.projectId, this.fromMessageId === null ? undefined : String(this.fromMessageId), String(message.id));
                global.logger.info(`msgstream: loaded history ${reportBufferRange(this.serverMessageBuffer)}`);
                this.processBufferedMessages();
                return;
            }
            if (this.clientMessageBuffer.length > 0) {
                // Buffered messages not yet digested. Add the new message to queue.
                this.clientMessageBuffer.push(message);
                return;
            }
            // All buffers are already processed, emit directly.
            this.emitMessage(message);
        };
        this.handleSocketError = () => {
            global.logger.error('Unknown websocket error');
        };
        this.connection = connection;
        this.projectId = projectId;
        this.serverMessageBuffer = [...messageBuffer];
        this.fromMessageId = messageBuffer.length === 0
            ? null
            : messageBuffer[messageBuffer.length - 1].id + 1;
        this.emitters = emitters;
        this.socket = new LiveUpdateWebsocket(connection, `&projectId=${projectId}&usermessages=false`);
        this.socket.onReady = () => { };
        this.socket.onError = this.handleSocketError;
        this.socket.onBroken = () => { };
        this.socket.onGiveup = () => { };
        this.socket.onMessage = this.handleSocketMessage;
        this.socket.onReviveAttempt = this.handleSocketRevive;
        global.logger.info(`msgstream: created with buffer ${reportBufferRange(this.serverMessageBuffer)}`);
    }
    async close() {
        return this.socket.close();
    }
    emitMessage(message) {
        const { subsystem, type } = message;
        if (!isMessageType(type)) {
            // NOTE: Message types such as ChainCommand and Panic are ignored.
            return;
        }
        this.emitters[type].emit(subsystem, message);
        this.emitters[type].listeners('*').forEach((listener) => listener(subsystem, message));
    }
    processBufferedMessages() {
        this.serverMessageBuffer.forEach((message) => this.emitMessage(Object.assign(Object.assign({}, message), { source: 'buffer' })));
        global.logger.info(`msgstream: flushed server buffer ${reportBufferRange(this.serverMessageBuffer)}`);
        this.serverMessageBuffer = [];
        if (this.clientMessageBuffer === null) {
            return;
        }
        // Process all messages even if some appear at the end during processing
        for (let i = 0; i < this.clientMessageBuffer.length; i += 1) {
            const message = this.clientMessageBuffer[i];
            this.emitMessage(message);
        }
        global.logger.info(`msgstream: flushed client buffer ${reportBufferRange(this.clientMessageBuffer)}`);
        this.clientMessageBuffer = [];
    }
}
export class MessageStream {
    constructor(connection, projectId) {
        this.connection = connection;
        this.projectId = projectId;
        this.emitters = {
            Started: new EventEmitter(),
            Progress: new EventEmitter(),
            Completed: new EventEmitter(),
            Failed: new EventEmitter(),
            ChainStatus: new EventEmitter(),
        };
    }
    async connect() {
        const messageBuffer = await ProjectAPI.getProjectMessages(this.connection, this.projectId);
        return new ConnectedMessageStream(this.connection, this.projectId, this.emitters, messageBuffer);
    }
    addMessageListener(subsystemPattern, typesPattern, listener) {
        const subsystems = subsystemTypesFromPattern(subsystemPattern);
        if (subsystems === '*') {
            this.addSubsystemListener('*', typesPattern, listener);
        }
        else {
            subsystems.forEach((subsystem) => this.addSubsystemListener(subsystem, typesPattern, listener));
        }
    }
    removeMessageListener(subsystemPattern, typesPattern, listener) {
        const subsystems = subsystemTypesFromPattern(subsystemPattern);
        if (subsystems === '*') {
            this.removeSubsystemListener('*', typesPattern, listener);
        }
        else {
            subsystems.forEach((subsystem) => this.removeSubsystemListener(subsystem, typesPattern, listener));
        }
    }
    addSubsystemListener(subsystem, types, listener) {
        messageTypesFromPattern(types).forEach((type) => {
            this.emitters[type].on(subsystem, listener);
        });
    }
    removeSubsystemListener(subsystem, types, listener) {
        messageTypesFromPattern(types).forEach((type) => {
            this.emitters[type].off(subsystem, listener);
        });
    }
}
