import { parse, Allow } from 'partial-json';

import type { StreamAdapterEvent, StreamVersions } from './streamTypes';
import { Emitter } from '../lib/Emitter';

export type StreamStateStatus = 'start' | 'streaming' | 'end' | 'error';

export interface TextEventState {
  id: string;
  delta: string;
  content: string;
  status: StreamStateStatus;
}

export interface ToolCallState {
  id: string;
  index: number;
  toolCallId: string;
  name: string;
  args: string;
  deltaArgs: string;
  parsedArgs: Record<string, unknown>;
  status: StreamStateStatus;
  content?: unknown;
  artifact?: unknown;
}

/**
 * Holds current stream state. This is reset on stream start.
 * It is only used internally to manage concatenation of text and tool calls.
 */
export interface ProcessorState {
  id?: string;
  isComplete: boolean;
  texts: Map<string, TextEventState>;
  toolCalls: Map<number, ToolCallState>;
}

/**
 * Event handlers for the StreamProcessor.
 */
export interface StreamProcessorEvents {
  init: (event: {
    id: string;
    messageId: string;
    version: StreamVersions;
  }) => void;
  text: (event: TextEventState) => void;
  toolCall: (event: ToolCallState) => void;
  error: (error: unknown) => void;
  end: () => void;
}

/**
 * Base StreamProcessor that handles the core logic of processing
 * stream events and maintaining state.
 */
export class StreamProcessor extends Emitter<StreamProcessorEvents> {
  protected state: ProcessorState = this.#getInitialState();
  protected currentOpenTextId: string | null = null;

  /**
   * Process a stream event and update internal state
   */
  // eslint-disable-next-line sonarjs/cognitive-complexity
  #processEvent(event: StreamAdapterEvent): void {
    try {
      switch (event.type) {
        /**
         * Handle generic events
         */
        case 'init':
          {
            // Make sure we reset the state
            this.state = this.#getInitialState();
            this.emit('init', {
              id: event.id,
              messageId: event.messageId,
              version: event.version,
            });
          }
          break;

        case 'end':
          this.emit('end');
          break;

        case 'error':
          this.emit('error', event.error);
          break;

        /**
         * Handle text streams
         */
        case 'text-start':
          {
            // Create new text state
            this.currentOpenTextId = event.id;
            const text: TextEventState = {
              id: event.id,
              delta: '',
              content: '',
              status: 'start',
            };

            this.state.texts.set(event.id, text);
            this.emit('text', text);
          }
          break;

        case 'text-chunk':
          {
            if (!this.currentOpenTextId) {
              throw new Error(
                'No text is currently opened to stream. The text-start event was missed or not streamed.',
              );
            }

            const text = this.state.texts.get(this.currentOpenTextId);

            if (!text) {
              throw new Error(
                `Text with id: ${this.currentOpenTextId} not found in the stream state.`,
              );
            }

            text.content += event.content;
            text.status = 'streaming';

            this.state.texts.set(this.currentOpenTextId, text);
            this.emit('text', {
              ...text,
              delta: event.content,
            });
          }
          break;

        case 'text-end':
          {
            if (!this.currentOpenTextId) {
              throw new Error(
                'Cannot close text stream properly. There is no currently open text stream.',
              );
            }

            const text = this.state.texts.get(this.currentOpenTextId);

            if (!text) {
              throw new Error(
                `Text with id: ${this.currentOpenTextId} not found in the stream state.`,
              );
            }

            text.status = 'end';

            this.state.texts.set(this.currentOpenTextId, text);
            this.emit('text', text);
          }
          break;

        /**
         * Handle tool calls
         */
        case 'tool-call-start':
          {
            // Create new tool call state
            const toolCall: ToolCallState = {
              id: event.id,
              index: event.index,
              toolCallId: event.toolCallId,
              name: event.name,
              args: '',
              deltaArgs: '',
              parsedArgs: {},
              status: 'start',
            };

            this.state.toolCalls.set(event.index, toolCall);
            this.emit('toolCall', toolCall);
          }
          break;

        case 'tool-call-chunk':
          {
            const toolCall = this.state.toolCalls.get(event.index);

            if (!toolCall) {
              throw new Error(
                `Tool call with index: ${event.index} not found in the stream state.`,
              );
            }

            toolCall.args += event.args;
            toolCall.status = 'streaming';
            toolCall.parsedArgs = this.#parsePartialJson(
              toolCall.args,
              toolCall.parsedArgs,
            );

            this.state.toolCalls.set(event.index, toolCall);
            this.emit('toolCall', {
              ...toolCall,
              deltaArgs: event.args,
            });
          }
          break;

        case 'tool-call-end':
          {
            const toolCall = this.state.toolCalls.get(event.index);

            if (!toolCall) {
              throw new Error(
                `Tool call with index: ${event.index} not found in the stream state.`,
              );
            }

            toolCall.status = 'end';

            this.state.toolCalls.set(event.index, toolCall);
            this.emit('toolCall', toolCall);
          }
          break;

        case 'tool-result':
          {
            let toolCall: ToolCallState | undefined;

            // Get corresponding tool call
            for (const [_, t] of this.state.toolCalls.entries()) {
              if (t.toolCallId === event.toolCallId) {
                toolCall = t;
                break;
              }
            }

            if (!toolCall) {
              throw new Error(
                `Tool call with toolCallId: ${event.toolCallId} not found in the stream state.`,
              );
            }

            toolCall.content = event.content;
            toolCall.artifact = event.artifact;
            toolCall.parsedArgs = event.args;

            this.state.toolCalls.set(toolCall.index, toolCall);
            this.emit('toolCall', toolCall);
          }
          break;
      }
    } catch (error) {
      this.emit('error', error);
    }
  }

  /**
   * Process multiple events in sequence
   */
  processEvents(events: StreamAdapterEvent[]): void {
    for (const event of events) {
      this.#processEvent(event);
    }
  }

  /**
   * Get initial processor state
   */
  #getInitialState(): ProcessorState {
    return {
      isComplete: false,
      texts: new Map<string, TextEventState>(),
      toolCalls: new Map<number, ToolCallState>(),
    };
  }

  /**
   * Try to parse partial JSON from the given string
   */
  #parsePartialJson(
    json: string,
    fallback: Record<string, unknown> = {},
  ): Record<string, unknown> {
    try {
      return parse(json, Allow.ALL);
    } catch {
      return fallback;
    }
  }
}
