| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | package tsdb | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-07 18:10:42 +08:00
										 |  |  | import "context" | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | type Batch struct { | 
					
						
							|  |  |  | 	DataSourceId int64 | 
					
						
							|  |  |  | 	Queries      QuerySlice | 
					
						
							|  |  |  | 	Depends      map[string]bool | 
					
						
							|  |  |  | 	Done         bool | 
					
						
							|  |  |  | 	Started      bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type BatchSlice []*Batch | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newBatch(dsId int64, queries QuerySlice) *Batch { | 
					
						
							|  |  |  | 	return &Batch{ | 
					
						
							|  |  |  | 		DataSourceId: dsId, | 
					
						
							|  |  |  | 		Queries:      queries, | 
					
						
							|  |  |  | 		Depends:      make(map[string]bool), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-03 15:38:03 +08:00
										 |  |  | func (bg *Batch) process(ctx context.Context, queryContext *QueryContext) { | 
					
						
							| 
									
										
										
										
											2016-12-07 18:10:42 +08:00
										 |  |  | 	executor, err := getExecutorFor(bg.Queries[0].DataSource) | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-07 18:10:42 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | 		bg.Done = true | 
					
						
							|  |  |  | 		result := &BatchResult{ | 
					
						
							| 
									
										
										
										
											2016-12-07 18:10:42 +08:00
										 |  |  | 			Error:        err, | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | 			QueryResults: make(map[string]*QueryResult), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		for _, query := range bg.Queries { | 
					
						
							|  |  |  | 			result.QueryResults[query.RefId] = &QueryResult{Error: result.Error} | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2016-10-03 15:38:03 +08:00
										 |  |  | 		queryContext.ResultsChan <- result | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-03 15:38:03 +08:00
										 |  |  | 	res := executor.Execute(ctx, bg.Queries, queryContext) | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | 	bg.Done = true | 
					
						
							| 
									
										
										
										
											2016-10-03 15:38:03 +08:00
										 |  |  | 	queryContext.ResultsChan <- res | 
					
						
							| 
									
										
										
										
											2016-06-06 16:31:21 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (bg *Batch) addQuery(query *Query) { | 
					
						
							|  |  |  | 	bg.Queries = append(bg.Queries, query) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (bg *Batch) allDependenciesAreIn(context *QueryContext) bool { | 
					
						
							|  |  |  | 	for key := range bg.Depends { | 
					
						
							|  |  |  | 		if _, exists := context.Results[key]; !exists { | 
					
						
							|  |  |  | 			return false | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return true | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func getBatches(req *Request) (BatchSlice, error) { | 
					
						
							|  |  |  | 	batches := make(BatchSlice, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, query := range req.Queries { | 
					
						
							|  |  |  | 		if foundBatch := findMatchingBatchGroup(query, batches); foundBatch != nil { | 
					
						
							|  |  |  | 			foundBatch.addQuery(query) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			newBatch := newBatch(query.DataSource.Id, QuerySlice{query}) | 
					
						
							|  |  |  | 			batches = append(batches, newBatch) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			for _, refId := range query.Depends { | 
					
						
							|  |  |  | 				for _, batch := range batches { | 
					
						
							|  |  |  | 					for _, batchQuery := range batch.Queries { | 
					
						
							|  |  |  | 						if batchQuery.RefId == refId { | 
					
						
							|  |  |  | 							newBatch.Depends[refId] = true | 
					
						
							|  |  |  | 						} | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return batches, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func findMatchingBatchGroup(query *Query, batches BatchSlice) *Batch { | 
					
						
							|  |  |  | 	for _, batch := range batches { | 
					
						
							|  |  |  | 		if batch.DataSourceId == query.DataSource.Id { | 
					
						
							|  |  |  | 			return batch | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } |