| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | //go:build integration
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package integration | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"log/slog" | 
					
						
							|  |  |  | 	"os" | 
					
						
							|  |  |  | 	"strconv" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 	"testing" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-04 10:41:17 +08:00
										 |  |  | 	"github.com/ollama/ollama/api" | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func TestMaxQueue(t *testing.T) { | 
					
						
							| 
									
										
										
										
											2025-08-30 05:20:28 +08:00
										 |  |  | 	t.Skip("this test needs to be re-evaluated to use a proper embedding model") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-17 07:24:18 +08:00
										 |  |  | 	if os.Getenv("OLLAMA_TEST_EXISTING") != "" { | 
					
						
							| 
									
										
										
										
											2024-12-11 04:58:06 +08:00
										 |  |  | 		t.Skip("Max Queue test requires spawning a local server so we can adjust the queue size") | 
					
						
							| 
									
										
										
										
											2024-05-17 07:24:18 +08:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 	// Note: This test can be quite slow when running in CPU mode, so keep the threadCount low unless your on GPU
 | 
					
						
							|  |  |  | 	// Also note that by default Darwin can't sustain > ~128 connections without adjusting limits
 | 
					
						
							| 
									
										
										
										
											2024-11-23 00:05:45 +08:00
										 |  |  | 	threadCount := 16 | 
					
						
							|  |  |  | 	t.Setenv("OLLAMA_MAX_QUEUE", strconv.Itoa(threadCount)) | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	req := api.GenerateRequest{ | 
					
						
							| 
									
										
										
										
											2025-04-17 05:25:55 +08:00
										 |  |  | 		Model:  smol, | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 		Prompt: "write a long historical fiction story about christopher columbus.  use at least 10 facts from his actual journey", | 
					
						
							| 
									
										
										
										
											2025-04-03 00:44:27 +08:00
										 |  |  | 		Options: map[string]any{ | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 			"seed":        42, | 
					
						
							|  |  |  | 			"temperature": 0.0, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	resp := []string{"explore", "discover", "ocean"} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// CPU mode takes much longer at the limit with a large queue setting
 | 
					
						
							|  |  |  | 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | 
					
						
							|  |  |  | 	defer cancel() | 
					
						
							|  |  |  | 	client, _, cleanup := InitServerConnection(ctx, t) | 
					
						
							|  |  |  | 	defer cleanup() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-08-30 05:20:28 +08:00
										 |  |  | 	if err := PullIfMissing(ctx, client, req.Model); err != nil { | 
					
						
							|  |  |  | 		t.Fatal(err) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Context for the worker threads so we can shut them down
 | 
					
						
							|  |  |  | 	// embedCtx, embedCancel := context.WithCancel(ctx)
 | 
					
						
							|  |  |  | 	embedCtx := ctx | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var genwg sync.WaitGroup | 
					
						
							| 
									
										
										
										
											2025-04-09 06:17:40 +08:00
										 |  |  | 	genwg.Add(1) | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 	go func() { | 
					
						
							|  |  |  | 		defer genwg.Done() | 
					
						
							|  |  |  | 		slog.Info("Starting generate request") | 
					
						
							|  |  |  | 		DoGenerate(ctx, t, client, req, resp, 45*time.Second, 5*time.Second) | 
					
						
							|  |  |  | 		slog.Info("generate completed") | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Give the generate a chance to get started before we start hammering on embed requests
 | 
					
						
							| 
									
										
										
										
											2025-04-17 05:25:55 +08:00
										 |  |  | 	time.Sleep(10 * time.Millisecond) | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	threadCount += 10 // Add a few extra to ensure we push the queue past its limit
 | 
					
						
							|  |  |  | 	busyCount := 0 | 
					
						
							|  |  |  | 	resetByPeerCount := 0 | 
					
						
							|  |  |  | 	canceledCount := 0 | 
					
						
							| 
									
										
										
										
											2024-12-11 04:58:06 +08:00
										 |  |  | 	successCount := 0 | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 	counterMu := sync.Mutex{} | 
					
						
							|  |  |  | 	var embedwg sync.WaitGroup | 
					
						
							|  |  |  | 	for i := 0; i < threadCount; i++ { | 
					
						
							| 
									
										
										
										
											2025-04-09 06:17:40 +08:00
										 |  |  | 		embedwg.Add(1) | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 		go func(i int) { | 
					
						
							|  |  |  | 			defer embedwg.Done() | 
					
						
							|  |  |  | 			slog.Info("embed started", "id", i) | 
					
						
							|  |  |  | 			embedReq := api.EmbeddingRequest{ | 
					
						
							|  |  |  | 				Model:   req.Model, | 
					
						
							|  |  |  | 				Prompt:  req.Prompt, | 
					
						
							|  |  |  | 				Options: req.Options, | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Fresh client for every request
 | 
					
						
							|  |  |  | 			client, _ = GetTestEndpoint() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			resp, genErr := client.Embeddings(embedCtx, &embedReq) | 
					
						
							|  |  |  | 			counterMu.Lock() | 
					
						
							|  |  |  | 			defer counterMu.Unlock() | 
					
						
							|  |  |  | 			switch { | 
					
						
							|  |  |  | 			case genErr == nil: | 
					
						
							| 
									
										
										
										
											2024-12-11 04:58:06 +08:00
										 |  |  | 				successCount++ | 
					
						
							| 
									
										
										
										
											2025-08-30 05:20:28 +08:00
										 |  |  | 				if len(resp.Embedding) < 5 { // somewhat arbitrary, but sufficient to be reasonable
 | 
					
						
							|  |  |  | 					t.Fatalf("embeddings shorter than expected: %d", len(resp.Embedding)) | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 			case errors.Is(genErr, context.Canceled): | 
					
						
							|  |  |  | 				canceledCount++ | 
					
						
							|  |  |  | 			case strings.Contains(genErr.Error(), "busy"): | 
					
						
							|  |  |  | 				busyCount++ | 
					
						
							|  |  |  | 			case strings.Contains(genErr.Error(), "connection reset by peer"): | 
					
						
							|  |  |  | 				resetByPeerCount++ | 
					
						
							|  |  |  | 			default: | 
					
						
							| 
									
										
										
										
											2025-08-30 05:20:28 +08:00
										 |  |  | 				if genErr != nil { | 
					
						
							|  |  |  | 					t.Fatalf("%d request failed", i) | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			slog.Info("embed finished", "id", i) | 
					
						
							|  |  |  | 		}(i) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	genwg.Wait() | 
					
						
							|  |  |  | 	slog.Info("generate done, waiting for embeds") | 
					
						
							|  |  |  | 	embedwg.Wait() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-11 04:58:06 +08:00
										 |  |  | 	slog.Info("embeds completed", "success", successCount, "busy", busyCount, "reset", resetByPeerCount, "canceled", canceledCount) | 
					
						
							| 
									
										
										
										
											2025-08-30 05:20:28 +08:00
										 |  |  | 	if resetByPeerCount != 0 { | 
					
						
							|  |  |  | 		t.Fatalf("Connections reset by peer, have you updated your fd and socket limits? %d", resetByPeerCount) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if busyCount == 0 { | 
					
						
							|  |  |  | 		t.Fatalf("no requests hit busy error but some should have") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if canceledCount > 0 { | 
					
						
							|  |  |  | 		t.Fatalf("no requests should have been canceled due to timeout %d", canceledCount) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-05-06 01:06:33 +08:00
										 |  |  | } |