ollamarunner: measure only active time

This commit is contained in:
Michael Yang 2025-09-29 12:29:26 -07:00
parent 3fde0300c8
commit df23ca2307
1 changed files with 39 additions and 28 deletions

View File

@ -91,10 +91,10 @@ type Sequence struct {
doneReason llm.DoneReason doneReason llm.DoneReason
// Metrics // Metrics
startProcessingTime time.Time processingDuration time.Duration
startGenerationTime time.Time generationDuration time.Duration
numPredicted int numPredicted int
numPromptInputs int numPromptInputs int
} }
type NewSequenceParams struct { type NewSequenceParams struct {
@ -108,8 +108,6 @@ type NewSequenceParams struct {
func (s *Server) NewSequence(prompt string, images []llm.ImageData, params NewSequenceParams) (*Sequence, error) { func (s *Server) NewSequence(prompt string, images []llm.ImageData, params NewSequenceParams) (*Sequence, error) {
s.ready.Wait() s.ready.Wait()
startTime := time.Now()
inputs, ctxs, mmStore, err := s.inputs(prompt, images) inputs, ctxs, mmStore, err := s.inputs(prompt, images)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to process inputs: %w", err) return nil, fmt.Errorf("failed to process inputs: %w", err)
@ -164,20 +162,19 @@ func (s *Server) NewSequence(prompt string, images []llm.ImageData, params NewSe
// TODO(jessegross): Ingest cached history for grammar // TODO(jessegross): Ingest cached history for grammar
return &Sequence{ return &Sequence{
ctxs: ctxs, ctxs: ctxs,
mmStore: mmStore, mmStore: mmStore,
inputs: inputs, inputs: inputs,
numPromptInputs: len(inputs), numPromptInputs: len(inputs),
startProcessingTime: startTime, numPredict: params.numPredict,
numPredict: params.numPredict, pendingResponses: make([]string, 0),
pendingResponses: make([]string, 0), responses: make(chan string, 100),
responses: make(chan string, 100), quit: make(chan bool, 1),
quit: make(chan bool, 1), embedding: make(chan []float32, 1),
embedding: make(chan []float32, 1), sampler: params.sampler,
sampler: params.sampler, embeddingOnly: params.embedding,
embeddingOnly: params.embedding, stop: params.stop,
stop: params.stop, numKeep: params.numKeep,
numKeep: params.numKeep,
}, nil }, nil
} }
@ -290,6 +287,11 @@ type batchState struct {
// Signaled when this batches outputs are complete and the next batch can proceed // Signaled when this batches outputs are complete and the next batch can proceed
outputsReadyCh chan struct{} outputsReadyCh chan struct{}
process, generate struct {
startedAt time.Time
stoppedAt time.Time
}
} }
type Server struct { type Server struct {
@ -408,7 +410,7 @@ func (s *Server) run(ctx context.Context) {
supportsAsync := pooling.Type(s.model.Backend().Config().Uint("pooling_type")) == pooling.TypeNone supportsAsync := pooling.Type(s.model.Backend().Config().Uint("pooling_type")) == pooling.TypeNone
var activeBatch batchState var previousBatch batchState
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -417,16 +419,18 @@ func (s *Server) run(ctx context.Context) {
panic(err) panic(err)
default: default:
var err error var err error
activeBatch, err = s.forwardBatch(activeBatch) nextBatch, err := s.forwardBatch(previousBatch)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if supportsAsync { if supportsAsync {
go s.computeBatch(activeBatch) go s.computeBatch(nextBatch)
} else { } else {
s.computeBatch(activeBatch) s.computeBatch(nextBatch)
} }
previousBatch = nextBatch
} }
} }
} }
@ -454,8 +458,10 @@ func (s *Server) forwardBatch(pendingBatch batchState) (nextBatch batchState, er
} }
defer s.mu.Unlock() defer s.mu.Unlock()
nextBatch.process.startedAt = time.Now()
nextBatch.ctx = s.model.Backend().NewContext() nextBatch.ctx = s.model.Backend().NewContext()
defer func() { defer func() {
nextBatch.process.stoppedAt = time.Now()
if err != nil { if err != nil {
nextBatch.ctx.Close() nextBatch.ctx.Close()
nextBatch.ctx = nil nextBatch.ctx = nil
@ -673,6 +679,7 @@ func (s *Server) computeBatch(activeBatch batchState) {
// At this point the seqs are ready for forwardBatch to move forward so unblock // At this point the seqs are ready for forwardBatch to move forward so unblock
s.mu.Unlock() s.mu.Unlock()
activeBatch.generate.startedAt = time.Now()
activeBatch.batch.Inputs.SetValueFromIntSlice(batchInputs) activeBatch.batch.Inputs.SetValueFromIntSlice(batchInputs)
activeBatch.ctx.ComputeWithNotify( activeBatch.ctx.ComputeWithNotify(
func() { func() {
@ -682,6 +689,8 @@ func (s *Server) computeBatch(activeBatch batchState) {
activeBatch.modelOutput) activeBatch.modelOutput)
outputs := activeBatch.modelOutput.Floats() outputs := activeBatch.modelOutput.Floats()
activeBatch.generate.stoppedAt = time.Now()
activeDuration := activeBatch.generate.stoppedAt.Sub(activeBatch.generate.startedAt) + activeBatch.process.stoppedAt.Sub(activeBatch.process.startedAt)
logutil.Trace("computeBatch: logits ready", "batchID", activeBatch.id) logutil.Trace("computeBatch: logits ready", "batchID", activeBatch.id)
@ -694,8 +703,10 @@ func (s *Server) computeBatch(activeBatch batchState) {
continue continue
} }
if seq.numPredicted == 1 { if seq.numPredicted > 1 {
seq.startGenerationTime = time.Now() seq.generationDuration += activeDuration
} else {
seq.processingDuration += activeDuration
} }
// if done processing the prompt, generate an embedding and return // if done processing the prompt, generate an embedding and return
@ -887,9 +898,9 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
Done: true, Done: true,
DoneReason: seq.doneReason, DoneReason: seq.doneReason,
PromptEvalCount: seq.numPromptInputs, PromptEvalCount: seq.numPromptInputs,
PromptEvalDuration: seq.startGenerationTime.Sub(seq.startProcessingTime), PromptEvalDuration: seq.processingDuration,
EvalCount: seq.numPredicted, EvalCount: seq.numPredicted,
EvalDuration: time.Since(seq.startGenerationTime), EvalDuration: seq.generationDuration,
}); err != nil { }); err != nil {
http.Error(w, fmt.Sprintf("failed to encode final response: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("failed to encode final response: %v", err), http.StatusInternalServerError)
} }