2023-04-13 00:30:33 +08:00
package clientmiddleware
import (
"context"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/caching"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/prometheus/client_golang/prometheus"
)
// NewCachingMiddleware creates a new plugins.ClientMiddleware that will
// attempt to read and write query results to the cache
func NewCachingMiddleware ( cachingService caching . CachingService ) plugins . ClientMiddleware {
log := log . New ( "caching_middleware" )
if err := prometheus . Register ( QueryCachingRequestHistogram ) ; err != nil {
log . Error ( "error registering prometheus collector 'QueryRequestHistogram'" , "error" , err )
}
if err := prometheus . Register ( ResourceCachingRequestHistogram ) ; err != nil {
log . Error ( "error registering prometheus collector 'ResourceRequestHistogram'" , "error" , err )
}
return plugins . ClientMiddlewareFunc ( func ( next plugins . Client ) plugins . Client {
return & CachingMiddleware {
next : next ,
caching : cachingService ,
log : log ,
}
} )
}
type CachingMiddleware struct {
next plugins . Client
caching caching . CachingService
log log . Logger
}
// QueryData receives a data request and attempts to access results already stored in the cache for that request.
// If data is found, it will return it immediately. Otherwise, it will perform the queries as usual, then write the response to the cache.
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
func ( m * CachingMiddleware ) QueryData ( ctx context . Context , req * backend . QueryDataRequest ) ( * backend . QueryDataResponse , error ) {
if req == nil {
return m . next . QueryData ( ctx , req )
}
reqCtx := contexthandler . FromContext ( ctx )
if reqCtx == nil {
return m . next . QueryData ( ctx , req )
}
// time how long this request takes
start := time . Now ( )
// First look in the query cache if enabled
hit , cr := m . caching . HandleQueryRequest ( ctx , req )
2023-04-26 01:05:37 +08:00
// record request duration if caching was used
if ch := reqCtx . Resp . Header ( ) . Get ( caching . XCacheHeader ) ; ch != "" {
defer func ( ) {
2023-04-13 00:30:33 +08:00
QueryCachingRequestHistogram . With ( prometheus . Labels {
"datasource_type" : req . PluginContext . DataSourceInstanceSettings . Type ,
"cache" : ch ,
"query_type" : getQueryType ( reqCtx ) ,
} ) . Observe ( time . Since ( start ) . Seconds ( ) )
2023-04-26 01:05:37 +08:00
} ( )
}
2023-04-13 00:30:33 +08:00
// Cache hit; return the response
if hit {
return cr . Response , nil
}
// Cache miss; do the actual queries
resp , err := m . next . QueryData ( ctx , req )
// Update the query cache with the result for this metrics request
if err == nil && cr . UpdateCacheFn != nil {
cr . UpdateCacheFn ( ctx , resp )
}
return resp , err
}
// CallResource receives a resource request and attempts to access results already stored in the cache for that request.
// If data is found, it will return it immediately. Otherwise, it will perform the request as usual. The caller of CallResource is expected to explicitly update the cache with any responses.
// If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers.
func ( m * CachingMiddleware ) CallResource ( ctx context . Context , req * backend . CallResourceRequest , sender backend . CallResourceResponseSender ) error {
if req == nil {
return m . next . CallResource ( ctx , req , sender )
}
reqCtx := contexthandler . FromContext ( ctx )
if reqCtx == nil {
return m . next . CallResource ( ctx , req , sender )
}
// time how long this request takes
start := time . Now ( )
// First look in the resource cache if enabled
2023-04-22 01:03:49 +08:00
hit , cr := m . caching . HandleResourceRequest ( ctx , req )
2023-04-13 00:30:33 +08:00
2023-04-26 01:05:37 +08:00
// record request duration if caching was used
if ch := reqCtx . Resp . Header ( ) . Get ( caching . XCacheHeader ) ; ch != "" {
defer func ( ) {
2023-04-13 00:30:33 +08:00
ResourceCachingRequestHistogram . With ( prometheus . Labels {
"plugin_id" : req . PluginContext . PluginID ,
"cache" : ch ,
} ) . Observe ( time . Since ( start ) . Seconds ( ) )
2023-04-26 01:05:37 +08:00
} ( )
}
2023-04-13 00:30:33 +08:00
// Cache hit; send the response and return
if hit {
2023-04-22 01:03:49 +08:00
return sender . Send ( cr . Response )
2023-04-13 00:30:33 +08:00
}
// Cache miss; do the actual request
2023-04-22 01:03:49 +08:00
// If there is no update cache func, just pass in the original sender
if cr . UpdateCacheFn == nil {
return m . next . CallResource ( ctx , req , sender )
}
// Otherwise, intercept the responses in a wrapped sender so we can cache them first
2023-04-26 01:44:32 +08:00
cacheSender := callResourceResponseSenderFunc ( func ( res * backend . CallResourceResponse ) error {
2023-04-22 01:03:49 +08:00
cr . UpdateCacheFn ( ctx , res )
return sender . Send ( res )
} )
return m . next . CallResource ( ctx , req , cacheSender )
2023-04-13 00:30:33 +08:00
}
func ( m * CachingMiddleware ) CheckHealth ( ctx context . Context , req * backend . CheckHealthRequest ) ( * backend . CheckHealthResult , error ) {
return m . next . CheckHealth ( ctx , req )
}
func ( m * CachingMiddleware ) CollectMetrics ( ctx context . Context , req * backend . CollectMetricsRequest ) ( * backend . CollectMetricsResult , error ) {
return m . next . CollectMetrics ( ctx , req )
}
func ( m * CachingMiddleware ) SubscribeStream ( ctx context . Context , req * backend . SubscribeStreamRequest ) ( * backend . SubscribeStreamResponse , error ) {
return m . next . SubscribeStream ( ctx , req )
}
func ( m * CachingMiddleware ) PublishStream ( ctx context . Context , req * backend . PublishStreamRequest ) ( * backend . PublishStreamResponse , error ) {
return m . next . PublishStream ( ctx , req )
}
func ( m * CachingMiddleware ) RunStream ( ctx context . Context , req * backend . RunStreamRequest , sender * backend . StreamSender ) error {
return m . next . RunStream ( ctx , req , sender )
}