ollama/server/routes_harmony_streaming_te...

693 lines
21 KiB
Go

package server
// this test file is to test integration of harmony parser into routes.go (as
// opposed to harmonyparser_test.go, which tests the parser in isolation)
import (
"bytes"
"context"
"encoding/json"
"net/http"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/ollama/ollama/api"
"github.com/ollama/ollama/discover"
"github.com/ollama/ollama/fs/ggml"
"github.com/ollama/ollama/llm"
)
func getTestTools() []api.Tool {
return []api.Tool{
{
Type: "function",
Function: api.ToolFunction{
Name: "get_weather",
Description: "Get the current weather in a given location",
Parameters: struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]api.ToolProperty `json:"properties"`
}{
Type: "object",
Required: []string{"location"},
Properties: map[string]api.ToolProperty{
"location": {
Type: api.PropertyType{"string"},
Description: "The city and state, e.g. San Francisco, CA",
},
},
},
},
},
{
Type: "function",
Function: api.ToolFunction{
Name: "calculate",
Description: "Calculate a mathematical expression",
Parameters: struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]api.ToolProperty `json:"properties"`
}{
Type: "object",
Required: []string{"expression"},
Properties: map[string]api.ToolProperty{
"expression": {
Type: api.PropertyType{"string"},
Description: "The mathematical expression to calculate",
},
},
},
},
},
}
}
func createHarmonyTestModel(t *testing.T) (string, string) {
t.Helper()
return createBinFile(t, ggml.KV{
"general.architecture": "gptoss",
"llama.block_count": uint32(1),
"llama.context_length": uint32(8192),
"llama.embedding_length": uint32(4096),
"llama.attention.head_count": uint32(32),
"llama.attention.head_count_kv": uint32(8),
"tokenizer.ggml.tokens": []string{""},
"tokenizer.ggml.scores": []float32{0},
"tokenizer.ggml.token_type": []int32{0},
}, []*ggml.Tensor{
{Name: "token_embd.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_norm.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_down.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_gate.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_up.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_norm.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_k.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_output.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_q.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_v.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "output.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
})
}
// TestChatHarmonyParserStreamingRealtime verifies that chunks are emitted as soon as they're available
func TestChatHarmonyParserStreamingRealtime(t *testing.T) {
gin.SetMode(gin.TestMode)
type step struct {
input llm.CompletionResponse
wantContent string
wantThinking string
wantToolCalls []api.ToolCall
}
testCases := []struct {
name string
steps []step
only bool
}{
{
name: "content streams as it arrives",
steps: []step{
{
input: llm.CompletionResponse{Content: "<|message|>Hello", Done: false},
wantContent: "Hello",
},
{
input: llm.CompletionResponse{Content: ", world", Done: false},
wantContent: ", world",
},
{
input: llm.CompletionResponse{Content: "!<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
wantContent: "!",
},
},
},
{
name: "thinking streams separately from content",
steps: []step{
{
input: llm.CompletionResponse{Content: "<|channel|>analysis<|message|>Thinking...", Done: false},
wantThinking: "Thinking...",
},
{
input: llm.CompletionResponse{Content: "<|end|>", Done: false},
// No output expected - just closes the analysis message and resets state to normal
},
{
input: llm.CompletionResponse{Content: "<|start|>assistant<|message|>Answer", Done: false},
wantContent: "Answer", // After message end, state is reset to normal
},
{
input: llm.CompletionResponse{Content: "<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
// No output expected - just closes the assistant message
},
},
},
{
name: "partial tags buffer until complete",
steps: []step{
{
input: llm.CompletionResponse{Content: "<|chan", Done: false},
// No output - partial tag
},
{
input: llm.CompletionResponse{Content: "nel|>analysis<|mess", Done: false},
// No output - still building tags
},
{
input: llm.CompletionResponse{Content: "age|>Deep ", Done: false},
wantThinking: "Deep ",
},
{
input: llm.CompletionResponse{Content: "thought<|end|>", Done: false},
wantThinking: "thought",
},
{
input: llm.CompletionResponse{Content: "<|start|>assistant<|message|>Done<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
wantContent: "Done", // After message end, state is reset to normal
},
},
},
{
name: "simple assistant after analysis",
steps: []step{
{
input: llm.CompletionResponse{Content: "<|channel|>analysis<|message|>Think<|end|><|start|>assistant<|message|>Answer<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
wantContent: "Answer",
wantThinking: "Think",
},
},
},
{
name: "tool call parsed and returned correctly",
steps: []step{
{
input: llm.CompletionResponse{Content: "<|channel|>commentary to=functions.get_weather<|message|>{\"location\":\"San Francisco\"}<|end|><|start|>assistant<|message|>The weather is sunny<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
wantContent: "The weather is sunny",
wantToolCalls: []api.ToolCall{
{
Function: api.ToolCallFunction{
Name: "get_weather",
Arguments: api.ToolCallFunctionArguments{
"location": "San Francisco",
},
},
},
},
},
},
},
{
name: "tool call with streaming JSON across chunks",
steps: []step{
{
input: llm.CompletionResponse{Content: "<|channel|>commentary to=functions.calculate<|message|>{\"expr", Done: false},
// No output yet - incomplete JSON
},
{
input: llm.CompletionResponse{Content: "ession\":\"2+", Done: false},
// Still no output - incomplete JSON
},
{
input: llm.CompletionResponse{Content: "2\"}", Done: true},
wantToolCalls: []api.ToolCall{
{
Function: api.ToolCallFunction{
Name: "calculate",
Arguments: api.ToolCallFunctionArguments{
"expression": "2+2",
},
},
},
},
},
},
},
}
anyOnlies := false
for _, tc := range testCases {
if tc.only {
anyOnlies = true
}
}
for _, tc := range testCases {
if anyOnlies && !tc.only {
continue
}
t.Run(tc.name, func(t *testing.T) {
var chunks []api.ChatResponse
chunkIdx := 0
mockResponses := make([]llm.CompletionResponse, len(tc.steps))
for i, step := range tc.steps {
mockResponses[i] = step.input
}
mock := mockRunner{
CompletionFn: func(ctx context.Context, r llm.CompletionRequest, fn func(llm.CompletionResponse)) error {
for _, resp := range mockResponses {
fn(resp)
// Give the handler time to process each response
time.Sleep(30 * time.Millisecond)
}
return nil
},
}
s := Server{
sched: &Scheduler{
pendingReqCh: make(chan *LlmRequest, 1),
finishedReqCh: make(chan *LlmRequest, 1),
expiredCh: make(chan *runnerRef, 1),
unloadedCh: make(chan any, 1),
loaded: make(map[string]*runnerRef),
newServerFn: newMockServer(&mock),
getGpuFn: discover.GetGPUInfo,
getCpuFn: discover.GetCPUInfo,
reschedDelay: 100 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ int) {
req.successCh <- &runnerRef{
llama: &mock,
}
},
},
}
go s.sched.Run(t.Context())
// Create a simple test model
_, digest := createHarmonyTestModel(t)
streamFalse := false
w := createRequest(t, s.CreateHandler, api.CreateRequest{
Model: "harmony-test-streaming",
Files: map[string]string{"test.gguf": digest},
Template: `<|start|><|end|>{{ with .Tools }}{{ end }}{{ .Prompt }}`,
Stream: &streamFalse,
})
if w.Code != 200 {
t.Fatalf("failed to create model: %d", w.Code)
}
// Test chat endpoint with streaming
streamTrue := true
w = createRequest(t, s.ChatHandler, api.ChatRequest{
Model: "harmony-test-streaming",
Messages: []api.Message{{Role: "user", Content: "Hello"}},
Stream: &streamTrue,
Tools: getTestTools(),
})
if w.Code != 200 {
t.Fatalf("chat request failed: %d - %s", w.Code, w.Body.String())
}
// Parse all chunks
decoder := json.NewDecoder(w.Body)
for decoder.More() {
var chunk api.ChatResponse
if err := decoder.Decode(&chunk); err != nil {
t.Fatalf("failed to decode chunk: %v", err)
}
if chunk.Message.Content != "" || chunk.Message.Thinking != "" || len(chunk.Message.ToolCalls) > 0 {
chunks = append(chunks, chunk)
}
}
// Log received chunks for debugging
if t.Failed() || len(chunks) == 0 {
t.Logf("Received %d chunks:", len(chunks))
for i, chunk := range chunks {
t.Logf(" Chunk %d: content=%q thinking=%q", i, chunk.Message.Content, chunk.Message.Thinking)
}
}
// Verify chunks match expected steps
for i, step := range tc.steps {
// Skip steps that don't expect any output
if step.wantContent == "" && step.wantThinking == "" && len(step.wantToolCalls) == 0 {
continue
}
if chunkIdx >= len(chunks) {
t.Errorf("step %d: expected chunk not received (wanted content=%q thinking=%q)",
i, step.wantContent, step.wantThinking)
continue
}
chunk := chunks[chunkIdx]
if chunk.Message.Content != step.wantContent || chunk.Message.Thinking != step.wantThinking {
t.Errorf("step %d: chunk mismatch: got (content=%q, thinking=%q), want (content=%q, thinking=%q)",
i, chunk.Message.Content, chunk.Message.Thinking, step.wantContent, step.wantThinking)
}
// Check tool calls if expected
if len(step.wantToolCalls) > 0 {
if len(chunk.Message.ToolCalls) != len(step.wantToolCalls) {
t.Errorf("step %d: tool calls count mismatch: got %d, want %d",
i, len(chunk.Message.ToolCalls), len(step.wantToolCalls))
} else {
for j, wantCall := range step.wantToolCalls {
if j >= len(chunk.Message.ToolCalls) {
break
}
gotCall := chunk.Message.ToolCalls[j]
if gotCall.Function.Name != wantCall.Function.Name {
t.Errorf("step %d, tool call %d: name mismatch: got %q, want %q",
i, j, gotCall.Function.Name, wantCall.Function.Name)
}
// Compare arguments as JSON strings for simplicity
gotArgs, _ := json.Marshal(gotCall.Function.Arguments)
wantArgs, _ := json.Marshal(wantCall.Function.Arguments)
if string(gotArgs) != string(wantArgs) {
t.Errorf("step %d, tool call %d: arguments mismatch: got %s, want %s",
i, j, string(gotArgs), string(wantArgs))
}
}
}
}
chunkIdx++
}
// Check if we have extra chunks
if chunkIdx < len(chunks) {
t.Errorf("received %d extra chunks", len(chunks)-chunkIdx)
for i := chunkIdx; i < len(chunks); i++ {
t.Logf(" extra chunk %d: content=%q thinking=%q",
i-chunkIdx, chunks[i].Message.Content, chunks[i].Message.Thinking)
}
}
})
}
}
// TestChatHarmonyParserStreamingSimple is a simpler test that just verifies basic streaming
func TestChatHarmonyParserStreamingSimple(t *testing.T) {
gin.SetMode(gin.TestMode)
mockResponses := []llm.CompletionResponse{
{Content: "<|message|>First ", Done: false},
{Content: "chunk ", Done: false},
{Content: "here<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
}
mock := mockRunner{
CompletionFn: func(ctx context.Context, r llm.CompletionRequest, fn func(llm.CompletionResponse)) error {
t.Logf("Mock received prompt: %q", r.Prompt)
t.Logf("Mock sending %d responses", len(mockResponses))
for i, resp := range mockResponses {
t.Logf("Sending response %d: %q", i, resp.Content)
fn(resp)
}
return nil
},
}
s := Server{
sched: &Scheduler{
pendingReqCh: make(chan *LlmRequest, 1),
finishedReqCh: make(chan *LlmRequest, 1),
expiredCh: make(chan *runnerRef, 1),
unloadedCh: make(chan any, 1),
loaded: make(map[string]*runnerRef),
newServerFn: newMockServer(&mock),
getGpuFn: discover.GetGPUInfo,
getCpuFn: discover.GetCPUInfo,
reschedDelay: 100 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ int) {
req.successCh <- &runnerRef{
llama: &mock,
}
},
},
}
go s.sched.Run(t.Context())
// Create model
_, digest := createHarmonyTestModel(t)
streamFalse := false
w := createRequest(t, s.CreateHandler, api.CreateRequest{
Model: "gpt-oss",
Files: map[string]string{"test.gguf": digest},
Template: `<|start|><|end|>{{ .Tools }}{{ .Prompt }}`,
Stream: &streamFalse,
})
if w.Code != 200 {
t.Fatalf("failed to create model: %d", w.Code)
}
// Test streaming
streamTrue := true
w = createRequest(t, s.ChatHandler, api.ChatRequest{
Model: "gpt-oss",
Messages: []api.Message{{Role: "user", Content: "Hello"}},
Stream: &streamTrue,
Tools: getTestTools(),
})
if w.Code != 200 {
t.Fatalf("chat request failed: %d - %s", w.Code, w.Body.String())
}
// Parse chunks
var chunks []api.ChatResponse
decoder := json.NewDecoder(w.Body)
for decoder.More() {
var chunk api.ChatResponse
if err := decoder.Decode(&chunk); err != nil {
t.Fatalf("failed to decode chunk: %v", err)
}
chunks = append(chunks, chunk)
t.Logf("Received chunk %d: content=%q thinking=%q done=%v",
len(chunks), chunk.Message.Content, chunk.Message.Thinking, chunk.Done)
}
// Verify we got chunks
if len(chunks) == 0 {
t.Fatal("expected streaming chunks, got none")
}
// Verify content
var content strings.Builder
for _, chunk := range chunks {
content.WriteString(chunk.Message.Content)
}
expectedContent := "First chunk here"
if content.String() != expectedContent {
t.Errorf("content mismatch: got %q, want %q", content.String(), expectedContent)
}
// Verify we got multiple chunks (streaming)
contentChunks := 0
for _, chunk := range chunks {
if chunk.Message.Content != "" {
contentChunks++
}
}
if contentChunks < 2 {
t.Errorf("expected at least 2 content chunks for streaming, got %d", contentChunks)
}
}
func TestChatHarmonyParserStreaming(t *testing.T) {
gin.SetMode(gin.TestMode)
type expectedChunk struct {
afterResponse int // Which mock response this chunk should appear after
content string // Expected content in this chunk
thinking string // Expected thinking in this chunk
}
testCases := []struct {
name string
mockResponses []llm.CompletionResponse
expectedChunks []expectedChunk
wantContent string
wantThinking string
}{
{
name: "simple message without thinking",
mockResponses: []llm.CompletionResponse{
{Content: "<|start|>assistant<|message|>Hello, ", Done: false},
{Content: "how can I help?", Done: false},
{Content: "<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
},
expectedChunks: []expectedChunk{
{afterResponse: 1, content: "Hello, "},
{afterResponse: 2, content: "how can I help?"},
},
wantContent: "Hello, how can I help?",
},
{
name: "message with analysis channel for thinking",
mockResponses: []llm.CompletionResponse{
{Content: "<|channel|>analysis<|message|>", Done: false},
{Content: "Let me think ", Done: false},
{Content: "about this problem...", Done: false},
{Content: "<|end|>", Done: false},
{Content: "<|start|>assistant<|message|>", Done: false},
{Content: "The answer ", Done: false},
{Content: "is 42", Done: false},
{Content: "<|end|>", Done: true, DoneReason: llm.DoneReasonStop},
},
expectedChunks: []expectedChunk{
{afterResponse: 2, thinking: "Let me think "},
{afterResponse: 3, thinking: "about this problem..."},
{afterResponse: 6, content: "The answer "},
{afterResponse: 7, content: "is 42"},
},
wantContent: "The answer is 42",
wantThinking: "Let me think about this problem...",
},
{
name: "streaming with partial tags across boundaries",
mockResponses: []llm.CompletionResponse{
{Content: "<|chan", Done: false},
{Content: "nel|>analy", Done: false},
{Content: "sis<|mess", Done: false},
{Content: "age|>Think", Done: false},
{Content: "ing deeply...<|end|>", Done: false},
{Content: "<|start|>assi", Done: false},
{Content: "stant<|message|>Result ", Done: false},
{Content: "computed<|e", Done: false},
{Content: "nd|>", Done: true, DoneReason: llm.DoneReasonStop},
},
expectedChunks: []expectedChunk{
{afterResponse: 4, thinking: "Think"},
{afterResponse: 5, thinking: "ing deeply..."},
{afterResponse: 7, content: "Result "},
{afterResponse: 8, content: "computed"},
},
wantContent: "Result computed",
wantThinking: "Thinking deeply...",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Channel to synchronize mock responses with chunk verification
responsesSent := make(chan int, len(tc.mockResponses))
mock := mockRunner{
CompletionFn: func(ctx context.Context, r llm.CompletionRequest, fn func(llm.CompletionResponse)) error {
// Send mock responses one at a time, notifying when each is sent
for i, resp := range tc.mockResponses {
fn(resp)
responsesSent <- i + 1
}
close(responsesSent)
return nil
},
}
s := Server{
sched: &Scheduler{
pendingReqCh: make(chan *LlmRequest, 1),
finishedReqCh: make(chan *LlmRequest, 1),
expiredCh: make(chan *runnerRef, 1),
unloadedCh: make(chan any, 1),
loaded: make(map[string]*runnerRef),
newServerFn: newMockServer(&mock),
getGpuFn: discover.GetGPUInfo,
getCpuFn: discover.GetCPUInfo,
reschedDelay: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ int) {
req.successCh <- &runnerRef{
llama: &mock,
}
},
},
}
go s.sched.Run(t.Context())
// Create a minimal model
_, digest := createHarmonyTestModel(t)
// Create model with passthrough template
stream := false
w := createRequest(t, s.CreateHandler, api.CreateRequest{
Model: "harmony-test",
Files: map[string]string{"file.gguf": digest},
Template: `<|start|><|end|>{{ with .Tools }}{{ end }}{{ .Prompt }}`,
Stream: &stream,
})
if w.Code != http.StatusOK {
t.Fatalf("failed to create model: %d", w.Code)
}
// Test chat endpoint with streaming
streamTrue := true
w = createRequest(t, s.ChatHandler, api.ChatRequest{
Model: "harmony-test",
Messages: []api.Message{{Role: "user", Content: "Hello"}},
Stream: &streamTrue,
Tools: getTestTools(),
})
if w.Code != http.StatusOK {
t.Fatalf("chat request failed: %d - %s", w.Code, w.Body.String())
}
// Parse streaming response
var chunks []api.ChatResponse
var content, thinking strings.Builder
decoder := json.NewDecoder(w.Body)
for decoder.More() {
var chunk api.ChatResponse
if err := decoder.Decode(&chunk); err != nil {
t.Fatalf("failed to decode chunk: %v", err)
}
chunks = append(chunks, chunk)
// Accumulate content and thinking from each chunk
content.WriteString(chunk.Message.Content)
thinking.WriteString(chunk.Message.Thinking)
// Debug output
t.Logf("Chunk %d: content=%q thinking=%q done=%v", len(chunks), chunk.Message.Content, chunk.Message.Thinking, chunk.Done)
}
// Verify we got streaming chunks
if len(chunks) == 0 {
t.Fatal("expected streaming chunks, got none")
}
gotContent := content.String()
gotThinking := thinking.String()
if gotContent != tc.wantContent {
t.Errorf("content mismatch: got %q, want %q", gotContent, tc.wantContent)
}
if gotThinking != tc.wantThinking {
t.Errorf("thinking mismatch: got %q, want %q", gotThinking, tc.wantThinking)
}
// Verify last chunk has done=true
lastChunk := chunks[len(chunks)-1]
if !lastChunk.Done {
t.Error("expected last chunk to have done=true")
}
})
}
}