import { ChatRole, ToolType } from '@kanbu/schema/enums';
import { StreamProcessor, StreamSerializer } from '@kanbu/shared';
import type { ChatMessageItem } from '@kanbu/shared-ui';

/**
 * Creates a new stream chunk decoder. Currently we only support
 * text streaming using TextDecoder.
 */
function createChunkDecoder() {
  const decoder = new TextDecoder();

  return (chunk: Uint8Array | undefined): string => {
    if (!chunk) {
      return '';
    }

    return decoder.decode(chunk, { stream: true });
  };
}

/**
 * Helper for processing chat text stream data. This function will
 * read the stream and update the chat messages based on the onUpdate
 * method provided.
 */
export async function processChatStream(args: {
  reader: ReadableStreamDefaultReader<Uint8Array>;
  onUpdate: (newMessage: ChatMessageItem) => void;
  abortController: () => null | AbortController;
  initialDelayMs?: number;
}): Promise<void> {
  const { reader, onUpdate, abortController, initialDelayMs = 500 } = args;
  const decoder = createChunkDecoder();

  const serializer = new StreamSerializer();
  const processor = new StreamProcessor();

  let createdAt = new Date().toISOString();
  let lastMessageId: string | null = null;
  let isFirstUpdate = true;
  let timeout: NodeJS.Timeout | null = null;

  /**
   * We want to delay the first message update so it prevents the UI
   * from flashing so much. All other updates are instant.
   */
  const delayedUpdate = (
    newMessage: Omit<ChatMessageItem, 'createdAt' | 'updatedAt'>,
  ) => {
    // Reset the createdAt for each new message.
    if (lastMessageId !== newMessage.id) {
      lastMessageId = newMessage.id;
      createdAt = new Date().toISOString();
    }

    // Add timestamps to the message
    const data: ChatMessageItem = {
      ...newMessage,
      createdAt,
      updatedAt: createdAt,
    };

    if (isFirstUpdate) {
      timeout = setTimeout(() => {
        onUpdate(data);
        isFirstUpdate = false;
      }, initialDelayMs);
    } else {
      if (timeout) {
        clearTimeout(timeout);
      }

      onUpdate(data);
    }
  };

  /**
   * Register handlers for the processor events.
   */
  processor
    .on('init', event => {
      if (lastMessageId === event.messageId) {
        lastMessageId = event.messageId;
        createdAt = new Date().toISOString();
      }

      /**
       * We use the message ID to show loading state for the first message.
       * Since the message ID is the same as the first message ID, we can
       * use it to show loading state for the first message. After we start
       * receiving content, the message will be updated with the actual content.
       *
       * This is actually not a good solution and doesn't work for more complex
       * tool calls, since the tool call might not be complete yet.
       */
      delayedUpdate({
        id: event.messageId,
        role: ChatRole.Assistant,
        status: 'start',
      });
    })
    .on('text', event => {
      delayedUpdate({
        id: event.id,
        role: ChatRole.Assistant,
        content: event.content,
        toolCalls: [],
        status: event.status,
      });
    })
    .on('toolCall', event => {
      delayedUpdate({
        id: event.id,
        role: ChatRole.Assistant,
        toolCalls: [
          {
            id: event.id,
            type: ToolType.Tool,
            name: event.name,
            args: event.parsedArgs,
            index: event.index,
            artifact: event.artifact,
            content: event.content,
          },
        ],
        status: event.status,
      });
    });

  /**
   * To prevent situations where we may have incomplete lines, we
   * buffer the data and process it in chunks.
   */
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();

    if (done) {
      // Process any remaining data in buffer
      if (buffer.trim()) {
        try {
          processor.processEvents([serializer.deserialize(buffer)]);
        } catch (error) {
          console.error('Error processing final buffer content', error);
        }
      }
      break;
    }

    const chunk = decoder(value);
    buffer += chunk;

    // Split buffer by newlines
    const lines = buffer.split('\n');

    // The last line might be incomplete, so we keep it in the buffer
    buffer = lines.pop() || '';

    // Process all complete lines
    const completeLines = lines.filter(line => line.trim() !== '');

    try {
      // Process the stream events
      processor.processEvents(completeLines.map(serializer.deserialize));
    } catch (error) {
      console.error('Error processing stream events', error);
    }

    // The request has been aborted, stop reading the stream.
    if (abortController?.() === null) {
      reader.cancel();
      break;
    }
  }

  // Cleanup event listeners
  processor.removeAllHandlers();
}
