2023-04-13 00:30:33 +08:00
package clientmiddleware
import (
"context"
2023-07-21 23:34:07 +08:00
"strconv"
2023-04-13 00:30:33 +08:00
"time"
2023-07-21 23:34:07 +08:00
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
2023-04-13 00:30:33 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2023-11-15 04:50:27 +08:00
"github.com/prometheus/client_golang/prometheus"
2023-04-13 00:30:33 +08:00
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/caching"
"github.com/grafana/grafana/pkg/services/contexthandler"
2023-07-21 23:34:07 +08:00
"github.com/grafana/grafana/pkg/services/featuremgmt"
2023-04-13 00:30:33 +08:00
)
2023-07-21 23:34:07 +08:00
// needed to mock the function for testing
var shouldCacheQuery = awsds . ShouldCacheQuery
2023-04-13 00:30:33 +08:00
// NewCachingMiddleware creates a new plugins.ClientMiddleware that will
// attempt to read and write query results to the cache
func NewCachingMiddleware ( cachingService caching . CachingService ) plugins . ClientMiddleware {
2023-07-21 23:34:07 +08:00
return NewCachingMiddlewareWithFeatureManager ( cachingService , nil )
}
// NewCachingMiddlewareWithFeatureManager creates a new plugins.ClientMiddleware that will
// attempt to read and write query results to the cache with a feature manager
2024-01-10 02:38:06 +08:00
func NewCachingMiddlewareWithFeatureManager ( cachingService caching . CachingService , features featuremgmt . FeatureToggles ) plugins . ClientMiddleware {
2023-04-13 00:30:33 +08:00
log := log . New ( "caching_middleware" )
if err := prometheus . Register ( QueryCachingRequestHistogram ) ; err != nil {
2023-09-05 04:25:43 +08:00
log . Error ( "Error registering prometheus collector 'QueryRequestHistogram'" , "error" , err )
2023-04-13 00:30:33 +08:00
}
if err := prometheus . Register ( ResourceCachingRequestHistogram ) ; err != nil {
2023-09-05 04:25:43 +08:00
log . Error ( "Error registering prometheus collector 'ResourceRequestHistogram'" , "error" , err )
2023-04-13 00:30:33 +08:00
}
return plugins . ClientMiddlewareFunc ( func ( next plugins . Client ) plugins . Client {
return & CachingMiddleware {
2024-05-14 21:27:40 +08:00
baseMiddleware : baseMiddleware {
next : next ,
} ,
2023-07-21 23:34:07 +08:00
caching : cachingService ,
log : log ,
features : features ,
2023-04-13 00:30:33 +08:00
}
} )
}
type CachingMiddleware struct {
2024-05-14 21:27:40 +08:00
baseMiddleware
2023-07-21 23:34:07 +08:00
caching caching . CachingService
log log . Logger
2024-01-10 02:38:06 +08:00
features featuremgmt . FeatureToggles
2023-04-13 00:30:33 +08:00
}
// QueryData receives a data request and attempts to access results already stored in the cache for that request.
// If data is found, it will return it immediately. Otherwise, it will perform the queries as usual, then write the response to the cache.
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
func ( m * CachingMiddleware ) QueryData ( ctx context . Context , req * backend . QueryDataRequest ) ( * backend . QueryDataResponse , error ) {
if req == nil {
return m . next . QueryData ( ctx , req )
}
reqCtx := contexthandler . FromContext ( ctx )
if reqCtx == nil {
return m . next . QueryData ( ctx , req )
}
// time how long this request takes
start := time . Now ( )
// First look in the query cache if enabled
hit , cr := m . caching . HandleQueryRequest ( ctx , req )
2023-04-26 01:05:37 +08:00
// record request duration if caching was used
2023-07-21 23:34:07 +08:00
ch := reqCtx . Resp . Header ( ) . Get ( caching . XCacheHeader )
if ch != "" {
2023-04-26 01:05:37 +08:00
defer func ( ) {
2023-04-13 00:30:33 +08:00
QueryCachingRequestHistogram . With ( prometheus . Labels {
"datasource_type" : req . PluginContext . DataSourceInstanceSettings . Type ,
"cache" : ch ,
"query_type" : getQueryType ( reqCtx ) ,
} ) . Observe ( time . Since ( start ) . Seconds ( ) )
2023-04-26 01:05:37 +08:00
} ( )
}
2023-04-13 00:30:33 +08:00
// Cache hit; return the response
if hit {
return cr . Response , nil
}
// Cache miss; do the actual queries
resp , err := m . next . QueryData ( ctx , req )
// Update the query cache with the result for this metrics request
if err == nil && cr . UpdateCacheFn != nil {
2023-07-21 23:34:07 +08:00
// If AWS async caching is not enabled, use the old code path
2023-11-15 04:50:27 +08:00
if m . features == nil || ! m . features . IsEnabled ( ctx , featuremgmt . FlagAwsAsyncQueryCaching ) {
2023-07-21 23:34:07 +08:00
cr . UpdateCacheFn ( ctx , resp )
} else {
// time how long shouldCacheQuery takes
startShouldCacheQuery := time . Now ( )
shouldCache := shouldCacheQuery ( resp )
ShouldCacheQueryHistogram . With ( prometheus . Labels {
"datasource_type" : req . PluginContext . DataSourceInstanceSettings . Type ,
"cache" : ch ,
"shouldCache" : strconv . FormatBool ( shouldCache ) ,
"query_type" : getQueryType ( reqCtx ) ,
} ) . Observe ( time . Since ( startShouldCacheQuery ) . Seconds ( ) )
// If AWS async caching is enabled and resp is for a running async query, don't cache it
if shouldCache {
cr . UpdateCacheFn ( ctx , resp )
}
}
2023-04-13 00:30:33 +08:00
}
return resp , err
}
// CallResource receives a resource request and attempts to access results already stored in the cache for that request.
// If data is found, it will return it immediately. Otherwise, it will perform the request as usual. The caller of CallResource is expected to explicitly update the cache with any responses.
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
func ( m * CachingMiddleware ) CallResource ( ctx context . Context , req * backend . CallResourceRequest , sender backend . CallResourceResponseSender ) error {
if req == nil {
return m . next . CallResource ( ctx , req , sender )
}
reqCtx := contexthandler . FromContext ( ctx )
if reqCtx == nil {
return m . next . CallResource ( ctx , req , sender )
}
// time how long this request takes
start := time . Now ( )
// First look in the resource cache if enabled
2023-04-22 01:03:49 +08:00
hit , cr := m . caching . HandleResourceRequest ( ctx , req )
2023-04-13 00:30:33 +08:00
2023-04-26 01:05:37 +08:00
// record request duration if caching was used
if ch := reqCtx . Resp . Header ( ) . Get ( caching . XCacheHeader ) ; ch != "" {
defer func ( ) {
2023-04-13 00:30:33 +08:00
ResourceCachingRequestHistogram . With ( prometheus . Labels {
"plugin_id" : req . PluginContext . PluginID ,
"cache" : ch ,
} ) . Observe ( time . Since ( start ) . Seconds ( ) )
2023-04-26 01:05:37 +08:00
} ( )
}
2023-04-13 00:30:33 +08:00
// Cache hit; send the response and return
if hit {
2023-04-22 01:03:49 +08:00
return sender . Send ( cr . Response )
2023-04-13 00:30:33 +08:00
}
// Cache miss; do the actual request
2023-04-22 01:03:49 +08:00
// If there is no update cache func, just pass in the original sender
if cr . UpdateCacheFn == nil {
return m . next . CallResource ( ctx , req , sender )
}
// Otherwise, intercept the responses in a wrapped sender so we can cache them first
2023-04-26 01:44:32 +08:00
cacheSender := callResourceResponseSenderFunc ( func ( res * backend . CallResourceResponse ) error {
2023-04-22 01:03:49 +08:00
cr . UpdateCacheFn ( ctx , res )
return sender . Send ( res )
} )
return m . next . CallResource ( ctx , req , cacheSender )
2023-04-13 00:30:33 +08:00
}