import type { StructuredToolCall, TextToolCall, ToolCall } from '@kanbu/schema';
import type { StreamChunk } from '@kanbu/schema/contracts';
import { ChatRole, ToolType } from '@kanbu/schema/enums';
import { parse, Allow } from 'partial-json';

import type { ChatMessageItem } from '@/hooks/chatTypes';

type StreamedToolCall =
  | (StructuredToolCall & {
      rawArgs: string;
    })
  | TextToolCall;

/**
 * Creates a new stream chunk decoder. Currently we only support
 * text streaming using TextDecoder.
 */
function createChunkDecoder() {
  const decoder = new TextDecoder();

  return (chunk: Uint8Array | undefined): string => {
    if (!chunk) {
      return '';
    }

    return decoder.decode(chunk, { stream: true });
  };
}

/**
 * Helper for processing chat text stream data. This function will
 * read the stream and update the chat messages based on the onUpdate
 * method provided.
 */
export async function processChatStream(args: {
  reader: ReadableStreamDefaultReader<Uint8Array>;
  onUpdate: (newMessage: ChatMessageItem) => void;
  abortController: () => null | AbortController;
}) {
  const { reader, onUpdate, abortController } = args;
  const decoder = createChunkDecoder();

  const toolCalls: ToolCall[] = [];
  let currentToolCall: StreamedToolCall | null = null;
  let resultMessage: ChatMessageItem | null = null;

  // Closes any open tool call before creating new one
  const closeToolCall = () => {
    if (currentToolCall && Object.keys(currentToolCall).length > 0) {
      toolCalls.push(currentToolCall);
      currentToolCall = null;
    }
  };

  // Updates the message with the current tool calls, when there is some content
  const handleUpdate = () => {
    if (!resultMessage) {
      return;
    }

    if (resultMessage.content || toolCalls.length > 0 || currentToolCall) {
      const updatedToolCalls = [...toolCalls];

      if (currentToolCall) {
        if (currentToolCall.type === ToolType.Tool) {
          // Remove rawArgs from the tool call
          const { rawArgs, ...rest } = currentToolCall;

          updatedToolCalls.push({ ...rest });
        } else {
          updatedToolCalls.push({ ...currentToolCall });
        }
      }

      onUpdate({
        ...resultMessage,
        toolCalls: updatedToolCalls,
      });
    }
  };

  while (true) {
    const { done, value } = await reader.read();

    if (done) {
      break;
    }

    const chunk = decoder(value);
    const lines = chunk.split('\n').filter(line => line.trim() !== '');

    for (const line of lines) {
      const streamChunk: StreamChunk = JSON.parse(line);

      switch (streamChunk.type) {
        case 'init':
          resultMessage = {
            id: streamChunk.id,
            createdAt: new Date().toISOString(),
            updatedAt: new Date().toISOString(),
            role: ChatRole.Assistant,
          };
          break;

        case 'text':
          if (resultMessage) {
            if (!currentToolCall || currentToolCall.type !== ToolType.Text) {
              // Close any open tool call before opening a new one
              closeToolCall();

              // Open new text chunk
              currentToolCall = {
                id: resultMessage.id,
                type: ToolType.Text,
                content: '',
              };
            }

            // Concatenate the text content
            currentToolCall.content += streamChunk.content;
          }
          break;

        case 'tool':
          if (resultMessage) {
            if (
              !currentToolCall ||
              currentToolCall.type !== ToolType.Tool ||
              currentToolCall.id !== streamChunk.id
            ) {
              // Close any open tool call before opening a new one
              closeToolCall();

              // Open new tool call
              currentToolCall = {
                type: ToolType.Tool,
                args: {},
                rawArgs: '',
                name: streamChunk.name,
                id: streamChunk.id,
                index: streamChunk.index,
              };
            }

            // Concatenate the tool call args
            currentToolCall.rawArgs += streamChunk.args;

            // Parse the tool call args as partial JSON
            try {
              currentToolCall.args = parse(currentToolCall.rawArgs, Allow.ALL);
            } catch {
              // If the JSON is not valid, we will just ignore it and wait for the next chunk.
            }
          }
          break;

        default:
          break;
      }

      // Call onUpdate only when there is some content to show
      handleUpdate();
    }

    // The request has been aborted, stop reading the stream.
    if (abortController?.() === null) {
      reader.cancel();
      break;
    }
  }

  // Close any open existing tool calls on stream end
  closeToolCall();

  // Update the message with the final tool calls
  handleUpdate();

  return resultMessage;
}
