implement proper sse

This commit is contained in:
Simon Knott 2025-02-13 10:05:10 +01:00
parent 9dc816f197
commit 75c239bf16
No known key found for this signature in database
GPG key ID: 8CEDC00028084AEC

View file

@ -26,21 +26,68 @@ export interface LLM {
chatCompletion(messages: LLMMessage[], signal: AbortSignal): AsyncGenerator<string>;
}
async function *parseSSE(body: Response['body']): AsyncGenerator<string> {
const reader = body!.pipeThrough(new TextDecoderStream()).getReader();
// https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
async function *parseSSE(body: NonNullable<Response['body']>): AsyncGenerator<{ type: string, data: string, id: string }> {
const reader = body.pipeThrough(new TextDecoderStream()).getReader();
let buffer = '';
let lastEventId = '';
let type: string = '';
let data = '';
while (true) {
const { value, done } = await reader.read();
if (done)
break;
buffer += value;
const events = buffer.split('\n\n');
buffer = events.pop()!;
for (const event of events) {
const contentStart = event.indexOf('data: ');
if (contentStart === -1)
const lines = buffer.split('\n');
buffer = lines.pop()!; // last line is either empty or incomplete
for (const line of lines) {
if (line.length === 0) {
if (data === '') {
data = '';
type = '';
continue;
}
if (data[data.length - 1] === '\n')
data = data.substring(0, data.length - 1);
const event = { type: type || 'message', data, id: lastEventId };
type = '';
data = '';
yield event;
}
if (line[0] === ':')
continue;
yield event.substring(contentStart + 'data: '.length);
let name = '';
let value = '';
const colon = line.indexOf(':');
if (colon === -1) {
name = line;
} else {
name = line.substring(0, colon);
value = line[colon + 1] === ' ' ? line.substring(colon + 2) : line.substring(colon + 1);
}
switch (name) {
case 'event':
type = value;
break;
case 'data':
data += value + '\n';
break;
case 'id':
lastEventId = value;
break;
case 'retry':
default:
// not implemented
break;
}
}
}
}
@ -68,10 +115,8 @@ export class OpenAI implements LLM {
if (response.status !== 200 || !response.body)
throw new Error('Failed to chat with OpenAI, unexpected status: ' + response.status + await response.text());
for await (const eventString of parseSSE(response.body)) {
if (eventString === '[DONE]')
break;
const event = JSON.parse(eventString);
for await (const sseEvent of parseSSE(response.body)) {
const event = JSON.parse(sseEvent.data);
if (event.object === 'chat.completion.chunk') {
if (event.choices[0].finish_reason)
break;
@ -104,8 +149,8 @@ export class Anthropic implements LLM {
if (response.status !== 200 || !response.body)
throw new Error('Failed to chat with Anthropic, unexpected status: ' + response.status + await response.text());
for await (const eventString of parseSSE(response.body)) {
const event = JSON.parse(eventString);
for await (const sseEvent of parseSSE(response.body)) {
const event = JSON.parse(sseEvent.data);
if (event.type === 'content_block_delta')
yield event.delta.text;
}