mirror of https://github.com/grafana/grafana.git
Alerting: Move notification historian to `grafana/alerting` (#109078)
* Move notification historian to grafana/alerting
* wip
* golangci-lint
* Revert "golangci-lint"
This reverts commit 10ccebad41
.
* JSONEncoder
* alertingInstrument
* go mod tidy
* go.work.sum
* make update-workspace
* merge
* revert go.mod changes
* github.com/grafana/alerting
* make update-workspace
* update github.com/grafana/alerting
* merge
This commit is contained in:
parent
7bba151416
commit
d4bad37853
4
go.mod
4
go.mod
|
@ -85,7 +85,7 @@ require (
|
|||
github.com/googleapis/gax-go/v2 v2.14.2 // @grafana/grafana-backend-group
|
||||
github.com/gorilla/mux v1.8.1 // @grafana/grafana-backend-group
|
||||
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // @grafana/grafana-app-platform-squad
|
||||
github.com/grafana/alerting v0.0.0-20250912123435-f2728ab090ee // @grafana/alerting-backend
|
||||
github.com/grafana/alerting v0.0.0-20250915130141-a8ee25091876 // @grafana/alerting-backend
|
||||
github.com/grafana/authlib v0.0.0-20250909101823-1b466dbd19a1 // @grafana/identity-access-team
|
||||
github.com/grafana/authlib/types v0.0.0-20250721184729-1593a38e4933 // @grafana/identity-access-team
|
||||
github.com/grafana/dataplane/examples v0.0.1 // @grafana/observability-metrics
|
||||
|
@ -438,7 +438,7 @@ require (
|
|||
github.com/google/s2a-go v0.1.9 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
|
||||
github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 // indirect
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 // indirect
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000 // indirect
|
||||
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
|
||||
github.com/grafana/sqlds/v4 v4.2.4 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -1588,8 +1588,8 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg
|
|||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
|
||||
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
|
||||
github.com/grafana/alerting v0.0.0-20250912123435-f2728ab090ee h1:J/9l2w3Q5JDBEB3t5bDsxhxWldGtFd5KpYGoQi0m/hc=
|
||||
github.com/grafana/alerting v0.0.0-20250912123435-f2728ab090ee/go.mod h1:XWqj/rlsy4OV/E9XNNyFn+a7U4GNsSugPb2rDBj9+58=
|
||||
github.com/grafana/alerting v0.0.0-20250915130141-a8ee25091876 h1:BzoGpzARwRCNOHcqQdYPAFp2LS1pqnkLWhIuDdq1zho=
|
||||
github.com/grafana/alerting v0.0.0-20250915130141-a8ee25091876/go.mod h1:T5sitas9VhVj8/S9LeRLy6H75kTBdh/sCCqHo7gaQI8=
|
||||
github.com/grafana/authlib v0.0.0-20250909101823-1b466dbd19a1 h1:qdH5s5FV+0Dyja8O1tBJq7MGd8nPCfxfsMimcYq5cRI=
|
||||
github.com/grafana/authlib v0.0.0-20250909101823-1b466dbd19a1/go.mod h1:C6CmTG6vfiqebjJswKsc6zes+1F/OtTCi6aAtL5Um6A=
|
||||
github.com/grafana/authlib/types v0.0.0-20250721184729-1593a38e4933 h1:GjiMR5NIO1/bYSCnt8x7VUeOMaupv2qXJkeLDVAddxQ=
|
||||
|
@ -1630,8 +1630,8 @@ github.com/grafana/grafana/pkg/semconv v0.0.0-20250804150913-990f1c69ecc2 h1:A65
|
|||
github.com/grafana/grafana/pkg/semconv v0.0.0-20250804150913-990f1c69ecc2/go.mod h1:2HRzUK/xQEYc+8d5If/XSusMcaYq9IptnBSHACiQcOQ=
|
||||
github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 h1:NznuPwItog+rwdVg8hAuGKP29ndRSzJAwhxKldkP8oQ=
|
||||
github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 h1:ZYk42718kSXOiIKdjZKljWLgBpzL5z1yutKABksQCMg=
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0=
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000 h1:/5LKSYgLmAhwA4m6iGUD4w1YkydEWWjazn9qxCFT8W0=
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000/go.mod h1:/ZklAgE1i4f3Z8uriXwESmCr1VLF8lBGaJspuaGuf78=
|
||||
github.com/grafana/loki/v3 v3.2.1 h1:VB7u+KHfvL5aHAxgoVBvz5wVhsdGuqKC7uuOFOOe7jw=
|
||||
github.com/grafana/loki/v3 v3.2.1/go.mod h1:WvdLl6wOS+yahaeQY+xhD2m2XzkHDfKr5FZaX7D/X2Y=
|
||||
github.com/grafana/nanogit v0.0.0-20250723104447-68f58f5ecec0 h1:cS0SlJGIlZbmDLctNj5vIYGemrJDLy25wwoiIyZWVN8=
|
||||
|
|
15
go.work.sum
15
go.work.sum
|
@ -997,6 +997,7 @@ github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4
|
|||
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
|
||||
github.com/grafana/cog v0.0.38 h1:V7gRRn/mh7Bg1ptrCxo0bv6K0SnG9TiDZk+3Ppftn6s=
|
||||
github.com/grafana/cog v0.0.38/go.mod h1:UDstzYqMdgIROmbfkHL8fB9XWQO2lnf5z+4W/eJo4Dc=
|
||||
github.com/grafana/dskit v0.0.0-20250818234656-8ff9c6532e85/go.mod h1:kImsvJ1xnmeT9Z6StK+RdEKLzlpzBsKwJbEQfmBJdFs=
|
||||
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
|
||||
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
|
||||
github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw=
|
||||
|
@ -1041,6 +1042,7 @@ github.com/grafana/grafana/pkg/aggregator v0.0.0-20250121113133-e747350fee2d/go.
|
|||
github.com/grafana/grafana/pkg/semconv v0.0.0-20250121113133-e747350fee2d/go.mod h1:tfLnBpPYgwrBMRz4EXqPCZJyCjEG4Ev37FSlXnocJ2c=
|
||||
github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250121113133-e747350fee2d/go.mod h1:CXpwZ3Mkw6xVlGKc0SqUxqXCP3Uv182q6qAQnLaLxRg=
|
||||
github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250514132646-acbc7b54ed9e/go.mod h1:xrKQcxQxz+IUF90ybtfENFeEXtlj9nAsX/3Fw0KEIeQ=
|
||||
github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000 h1:/5LKSYgLmAhwA4m6iGUD4w1YkydEWWjazn9qxCFT8W0=
|
||||
github.com/grafana/nanogit v0.0.0-20250616082354-5e94194d02ed h1:59JF1WhHLT+lNX89Tm1OzOEySMVMASAhaPbsRjtp8Kc=
|
||||
github.com/grafana/nanogit v0.0.0-20250616082354-5e94194d02ed/go.mod h1:OIAAKNgG5fpuJQRNO1lUSj9nc18Xl3O7M8fjIlBO1cI=
|
||||
github.com/grafana/nanogit v0.0.0-20250619160700-ebf70d342aa5 h1:MAQ2B0cu0V1S91ZjVa7NomNZFjaR2SmdtvdwhqBtyhU=
|
||||
|
@ -1370,12 +1372,8 @@ github.com/pquerna/cachecontrol v0.1.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQ
|
|||
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 h1:xoIK0ctDddBMnc74udxJYBqlo9Ylnsp1waqjLsnef20=
|
||||
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M=
|
||||
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
|
||||
github.com/prometheus/client_golang v1.23.1 h1:w6gXMLQGgd0jXXlote9lRHMe0nG01EbnJT+C0EJru2Y=
|
||||
github.com/prometheus/client_golang v1.23.1/go.mod h1:br8j//v2eg2K5Vvna5klK8Ku5pcU5r4ll73v6ik5dIQ=
|
||||
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
|
||||
github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
|
||||
github.com/prometheus/common v0.66.0 h1:K/rJPHrG3+AoQs50r2+0t7zMnMzek2Vbv31OFVsMeVY=
|
||||
github.com/prometheus/common v0.66.0/go.mod h1:Ux6NtV1B4LatamKE63tJBntoxD++xmtI/lK0VtEplN4=
|
||||
github.com/prometheus/common/assets v0.2.0 h1:0P5OrzoHrYBOSM1OigWL3mY8ZvV2N4zIE/5AahrSrfM=
|
||||
github.com/prometheus/exporter-toolkit v0.10.1-0.20230714054209-2f4150c63f97/go.mod h1:LoBCZeRh+5hX+fSULNyFnagYlQG/gBsyA/deNzROkq8=
|
||||
github.com/prometheus/statsd_exporter v0.26.1 h1:ucbIAdPmwAUcA+dU+Opok8Qt81Aw8HanlO+2N/Wjv7w=
|
||||
|
@ -1437,12 +1435,7 @@ github.com/spf13/afero v1.10.0/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb
|
|||
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
||||
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
|
||||
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
|
||||
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
|
||||
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
|
||||
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
|
||||
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
|
||||
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad h1:fiWzISvDn0Csy5H0iwgAuJGQTUpVfEMJJd4nRFXogbc=
|
||||
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
|
||||
github.com/substrait-io/substrait v0.66.1-0.20250205013839-a30b3e2d7ec6 h1:XqtxwYFCjS4L0o1QD4ipGHCuFG94U0f6BeldbilGQjU=
|
||||
|
@ -1947,8 +1940,6 @@ google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a/go.
|
|||
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250728155136-f173205681a0/go.mod h1:8ytArBbtOy2xfht+y2fqKd5DRDJRUQhqbyEnQ4bDChs=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c/go.mod h1:ea2MjsO70ssTfCjiwHgI0ZFqcw45Ksuk2ckf9G468GA=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw=
|
||||
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250505200425-f936aa4a68b2 h1:DbpkGFGRkd4GORg+IWQW2EhxUaa/My/PM8d1CGyTDMY=
|
||||
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y=
|
||||
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250512202823-5a2f75b736a9 h1:YI36gCL8AQMhzYN6+jH8PdV/iZ0On+Zd0rO/7lCH3k8=
|
||||
|
@ -1977,8 +1968,6 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.
|
|||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
|
||||
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
|
||||
|
|
|
@ -8,7 +8,8 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiconfig"
|
||||
"golang.org/x/exp/constraints"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
|
@ -61,7 +62,7 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, db d
|
|||
if !useStore(cfg) {
|
||||
return nil
|
||||
}
|
||||
lokiCfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings)
|
||||
lokiCfg, err := lokiconfig.NewLokiConfig(cfg.LokiSettings)
|
||||
if err != nil {
|
||||
// this config error is already handled elsewhere
|
||||
return nil
|
||||
|
|
|
@ -10,11 +10,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
alertingInstrument "github.com/grafana/alerting/http/instrument"
|
||||
"github.com/grafana/alerting/http/instrument/instrumenttest"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
|
@ -24,7 +26,6 @@ import (
|
|||
"github.com/grafana/grafana/pkg/services/annotations/testutil"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
|
@ -811,7 +812,7 @@ func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO)
|
|||
}
|
||||
|
||||
type FakeLokiClient struct {
|
||||
client client.Requester
|
||||
client alertingInstrument.Requester
|
||||
cfg lokiclient.LokiConfig
|
||||
metrics *metrics.Historian
|
||||
log log.Logger
|
||||
|
@ -820,15 +821,15 @@ type FakeLokiClient struct {
|
|||
|
||||
func NewFakeLokiClient() *FakeLokiClient {
|
||||
url, _ := url.Parse("http://some.url")
|
||||
req := lokiclient.NewFakeRequester()
|
||||
req := instrumenttest.NewFakeRequester()
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations_test")
|
||||
|
||||
return &FakeLokiClient{
|
||||
client: client.NewTimedClient(req, metrics.WriteDuration),
|
||||
client: alertingInstrument.NewTimedClient(req, metrics.WriteDuration),
|
||||
cfg: lokiclient.LokiConfig{
|
||||
WritePathURL: url,
|
||||
ReadPathURL: url,
|
||||
Encoder: lokiclient.JsonEncoder{},
|
||||
Encoder: lokiclient.JSONEncoder{},
|
||||
MaxQueryLength: 721 * time.Hour,
|
||||
MaxQuerySize: 65536,
|
||||
},
|
||||
|
|
|
@ -1,125 +0,0 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/dskit/instrument"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Requester executes an HTTP request.
|
||||
type Requester interface {
|
||||
Do(req *http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
// TimedClient instruments a request with metrics. It implements Requester.
|
||||
type TimedClient struct {
|
||||
client Requester
|
||||
collector instrument.Collector
|
||||
}
|
||||
|
||||
type contextKey int
|
||||
|
||||
// OperationNameContextKey specifies the operation name location within the context
|
||||
// for instrumentation.
|
||||
const OperationNameContextKey contextKey = 0
|
||||
|
||||
// NewTimedClient creates a Requester that instruments requests on `client`.
|
||||
func NewTimedClient(client Requester, collector instrument.Collector) *TimedClient {
|
||||
return &TimedClient{
|
||||
client: client,
|
||||
collector: collector,
|
||||
}
|
||||
}
|
||||
|
||||
// Do executes the request.
|
||||
func (c TimedClient) Do(r *http.Request) (*http.Response, error) {
|
||||
return TimeRequest(r.Context(), c.operationName(r), c.collector, c.client, r)
|
||||
}
|
||||
|
||||
// RoundTrip implements the RoundTripper interface.
|
||||
func (c TimedClient) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
return c.Do(r)
|
||||
}
|
||||
|
||||
func (c TimedClient) operationName(r *http.Request) string {
|
||||
operation, _ := r.Context().Value(OperationNameContextKey).(string)
|
||||
if operation == "" {
|
||||
operation = r.URL.Path
|
||||
}
|
||||
return operation
|
||||
}
|
||||
|
||||
// TimeRequest performs an HTTP client request and records the duration in a histogram.
|
||||
func TimeRequest(ctx context.Context, operation string, coll instrument.Collector, client Requester, request *http.Request) (*http.Response, error) {
|
||||
var response *http.Response
|
||||
doRequest := func(_ context.Context) error {
|
||||
var err error
|
||||
response, err = client.Do(request) // nolint:bodyclose
|
||||
return err
|
||||
}
|
||||
toStatusCode := func(err error) string {
|
||||
if err == nil {
|
||||
return strconv.Itoa(response.StatusCode)
|
||||
}
|
||||
return "error"
|
||||
}
|
||||
err := instrument.CollectedRequest(ctx, fmt.Sprintf("%s %s", request.Method, operation),
|
||||
coll, toStatusCode, doRequest)
|
||||
return response, err
|
||||
}
|
||||
|
||||
// TracedClient instruments a request with tracing. It implements Requester.
|
||||
type TracedClient struct {
|
||||
client Requester
|
||||
tracer tracing.Tracer
|
||||
name string
|
||||
}
|
||||
|
||||
func NewTracedClient(client Requester, tracer tracing.Tracer, name string) *TracedClient {
|
||||
return &TracedClient{
|
||||
client: client,
|
||||
tracer: tracer,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
// Do executes the request.
|
||||
func (c TracedClient) Do(r *http.Request) (*http.Response, error) {
|
||||
ctx, span := c.tracer.Start(r.Context(), c.name, trace.WithSpanKind(trace.SpanKindClient))
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(semconv.HTTPURL(r.URL.String()))
|
||||
span.SetAttributes(semconv.HTTPMethod(r.Method))
|
||||
|
||||
c.tracer.Inject(ctx, r.Header, span)
|
||||
|
||||
r = r.WithContext(ctx)
|
||||
resp, err := c.client.Do(r)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "request failed")
|
||||
span.RecordError(err)
|
||||
} else {
|
||||
if resp.ContentLength > 0 {
|
||||
span.SetAttributes(attribute.Int64("http.content_length", resp.ContentLength))
|
||||
}
|
||||
span.SetAttributes(semconv.HTTPStatusCode(resp.StatusCode))
|
||||
if resp.StatusCode >= 400 && resp.StatusCode < 600 {
|
||||
span.RecordError(fmt.Errorf("error with HTTP status code %d", resp.StatusCode))
|
||||
}
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// RoundTrip implements the RoundTripper interface.
|
||||
func (c TracedClient) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
return c.Do(r)
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestTimedClient_operationName(t *testing.T) {
|
||||
r, err := http.NewRequest("GET", "https://weave.test", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
r = r.WithContext(context.WithValue(context.Background(), OperationNameContextKey, "opp"))
|
||||
c := NewTimedClient(http.DefaultClient, nil)
|
||||
|
||||
assert.Equal(t, "opp", c.operationName(r))
|
||||
}
|
||||
|
||||
func TestTimedClient_operationName_Default(t *testing.T) {
|
||||
r, err := http.NewRequest("GET", "https://weave.test/you/know/me", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
r = r.WithContext(context.Background())
|
||||
c := NewTimedClient(http.DefaultClient, nil)
|
||||
|
||||
assert.Equal(t, "/you/know/me", c.operationName(r))
|
||||
}
|
|
@ -1,333 +0,0 @@
|
|||
package lokiclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/instrument"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const defaultPageSize = 1000
|
||||
const maximumPageSize = 5000
|
||||
|
||||
func NewRequester() client.Requester {
|
||||
return &http.Client{}
|
||||
}
|
||||
|
||||
// encoder serializes log streams to some byte format.
|
||||
type encoder interface {
|
||||
// encode serializes a set of log streams to bytes.
|
||||
encode(s []Stream) ([]byte, error)
|
||||
// headers returns a set of HTTP-style headers that describes the encoding scheme used.
|
||||
headers() map[string]string
|
||||
}
|
||||
|
||||
type LokiConfig struct {
|
||||
ReadPathURL *url.URL
|
||||
WritePathURL *url.URL
|
||||
BasicAuthUser string
|
||||
BasicAuthPassword string
|
||||
TenantID string
|
||||
ExternalLabels map[string]string
|
||||
Encoder encoder
|
||||
MaxQueryLength time.Duration
|
||||
MaxQuerySize int
|
||||
}
|
||||
|
||||
func NewLokiConfig(cfg setting.UnifiedAlertingLokiSettings) (LokiConfig, error) {
|
||||
read, write := cfg.LokiReadURL, cfg.LokiWriteURL
|
||||
if read == "" {
|
||||
read = cfg.LokiRemoteURL
|
||||
}
|
||||
if write == "" {
|
||||
write = cfg.LokiRemoteURL
|
||||
}
|
||||
|
||||
if read == "" {
|
||||
return LokiConfig{}, fmt.Errorf("either read path URL or remote Loki URL must be provided")
|
||||
}
|
||||
if write == "" {
|
||||
return LokiConfig{}, fmt.Errorf("either write path URL or remote Loki URL must be provided")
|
||||
}
|
||||
|
||||
readURL, err := url.Parse(read)
|
||||
if err != nil {
|
||||
return LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err)
|
||||
}
|
||||
writeURL, err := url.Parse(write)
|
||||
if err != nil {
|
||||
return LokiConfig{}, fmt.Errorf("failed to parse loki remote write URL: %w", err)
|
||||
}
|
||||
|
||||
return LokiConfig{
|
||||
ReadPathURL: readURL,
|
||||
WritePathURL: writeURL,
|
||||
BasicAuthUser: cfg.LokiBasicAuthUsername,
|
||||
BasicAuthPassword: cfg.LokiBasicAuthPassword,
|
||||
TenantID: cfg.LokiTenantID,
|
||||
ExternalLabels: cfg.ExternalLabels,
|
||||
MaxQueryLength: cfg.LokiMaxQueryLength,
|
||||
MaxQuerySize: cfg.LokiMaxQuerySize,
|
||||
// Snappy-compressed protobuf is the default, same goes for Promtail.
|
||||
Encoder: SnappyProtoEncoder{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
type HttpLokiClient struct {
|
||||
client client.Requester
|
||||
encoder encoder
|
||||
cfg LokiConfig
|
||||
bytesWritten prometheus.Counter
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
// Kind of Operation (=, !=, =~, !~)
|
||||
type Operator string
|
||||
|
||||
const (
|
||||
// Equal operator (=)
|
||||
Eq Operator = "="
|
||||
// Not Equal operator (!=)
|
||||
Neq Operator = "!="
|
||||
// Equal operator supporting RegEx (=~)
|
||||
EqRegEx Operator = "=~"
|
||||
// Not Equal operator supporting RegEx (!~)
|
||||
NeqRegEx Operator = "!~"
|
||||
)
|
||||
|
||||
func NewLokiClient(cfg LokiConfig, req client.Requester, bytesWritten prometheus.Counter, writeDuration *instrument.HistogramCollector, logger log.Logger, tracer tracing.Tracer, spanName string) *HttpLokiClient {
|
||||
tc := client.NewTimedClient(req, writeDuration)
|
||||
trc := client.NewTracedClient(tc, tracer, spanName)
|
||||
return &HttpLokiClient{
|
||||
client: trc,
|
||||
encoder: cfg.Encoder,
|
||||
cfg: cfg,
|
||||
bytesWritten: bytesWritten,
|
||||
log: logger.New("protocol", "http"),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *HttpLokiClient) Ping(ctx context.Context) error {
|
||||
log := c.log.FromContext(ctx)
|
||||
uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels")
|
||||
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating request: %w", err)
|
||||
}
|
||||
c.setAuthAndTenantHeaders(req)
|
||||
|
||||
req = req.WithContext(ctx)
|
||||
res, err := c.client.Do(req)
|
||||
if res != nil {
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
log.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("error sending request: %w", err)
|
||||
}
|
||||
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
return fmt.Errorf("ping request to loki endpoint returned a non-200 status code: %d", res.StatusCode)
|
||||
}
|
||||
log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode)
|
||||
return nil
|
||||
}
|
||||
|
||||
type Stream struct {
|
||||
Stream map[string]string `json:"stream"`
|
||||
Values []Sample `json:"values"`
|
||||
}
|
||||
|
||||
type Sample struct {
|
||||
T time.Time
|
||||
V string
|
||||
}
|
||||
|
||||
func (r *Sample) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal([2]string{
|
||||
fmt.Sprintf("%d", r.T.UnixNano()), r.V,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Sample) UnmarshalJSON(b []byte) error {
|
||||
// A Loki stream sample is formatted like a list with two elements, [At, Val]
|
||||
// At is a string wrapping a timestamp, in nanosecond unix epoch.
|
||||
// Val is a string containing the log line.
|
||||
var tuple [2]string
|
||||
if err := json.Unmarshal(b, &tuple); err != nil {
|
||||
return fmt.Errorf("failed to deserialize sample in Loki response: %w", err)
|
||||
}
|
||||
nano, err := strconv.ParseInt(tuple[0], 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("timestamp in Loki sample not convertible to nanosecond epoch: %v", tuple[0])
|
||||
}
|
||||
r.T = time.Unix(0, nano)
|
||||
r.V = tuple[1]
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error {
|
||||
log := c.log.FromContext(ctx)
|
||||
enc, err := c.encoder.encode(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uri := c.cfg.WritePathURL.JoinPath("/loki/api/v1/push")
|
||||
req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(enc))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create Loki request: %w", err)
|
||||
}
|
||||
|
||||
c.setAuthAndTenantHeaders(req)
|
||||
for k, v := range c.encoder.headers() {
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
|
||||
c.bytesWritten.Add(float64(len(enc)))
|
||||
req = req.WithContext(ctx)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
log.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = c.handleLokiResponse(log, resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *HttpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
|
||||
if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" {
|
||||
req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword)
|
||||
}
|
||||
|
||||
if c.cfg.TenantID != "" {
|
||||
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) {
|
||||
log := c.log.FromContext(ctx)
|
||||
// Run the pre-flight checks for the query.
|
||||
if start > end {
|
||||
return QueryRes{}, fmt.Errorf("start time cannot be after end time")
|
||||
}
|
||||
start, end = ClampRange(start, end, c.cfg.MaxQueryLength.Nanoseconds())
|
||||
if limit < 1 {
|
||||
limit = defaultPageSize
|
||||
}
|
||||
if limit > maximumPageSize {
|
||||
limit = maximumPageSize
|
||||
}
|
||||
|
||||
queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range")
|
||||
|
||||
values := url.Values{}
|
||||
values.Set("query", logQL)
|
||||
values.Set("start", fmt.Sprintf("%d", start))
|
||||
values.Set("end", fmt.Sprintf("%d", end))
|
||||
values.Set("limit", fmt.Sprintf("%d", limit))
|
||||
|
||||
queryURL.RawQuery = values.Encode()
|
||||
log.Debug("Sending query request", "query", logQL, "start", start, "end", end, "limit", limit)
|
||||
req, err := http.NewRequest(http.MethodGet,
|
||||
queryURL.String(), nil)
|
||||
if err != nil {
|
||||
return QueryRes{}, fmt.Errorf("error creating request: %w", err)
|
||||
}
|
||||
|
||||
req = req.WithContext(ctx)
|
||||
c.setAuthAndTenantHeaders(req)
|
||||
|
||||
res, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return QueryRes{}, fmt.Errorf("error executing request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
log.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := c.handleLokiResponse(log, res)
|
||||
if err != nil {
|
||||
return QueryRes{}, err
|
||||
}
|
||||
|
||||
result := QueryRes{}
|
||||
err = json.Unmarshal(data, &result)
|
||||
if err != nil {
|
||||
fmt.Println(string(data))
|
||||
return QueryRes{}, fmt.Errorf("error parsing request response: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *HttpLokiClient) MaxQuerySize() int {
|
||||
return c.cfg.MaxQuerySize
|
||||
}
|
||||
|
||||
type QueryRes struct {
|
||||
Data QueryData `json:"data"`
|
||||
}
|
||||
|
||||
type QueryData struct {
|
||||
Result []Stream `json:"result"`
|
||||
}
|
||||
|
||||
func (c *HttpLokiClient) handleLokiResponse(log log.Logger, res *http.Response) ([]byte, error) {
|
||||
if res == nil {
|
||||
return nil, fmt.Errorf("response is nil")
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading request response: %w", err)
|
||||
}
|
||||
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
if len(data) > 0 {
|
||||
log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode)
|
||||
} else {
|
||||
log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
|
||||
}
|
||||
return nil, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// ClampRange ensures that the time range is within the configured maximum query length.
|
||||
func ClampRange(start, end, maxTimeRange int64) (newStart int64, newEnd int64) {
|
||||
newStart, newEnd = start, end
|
||||
|
||||
if maxTimeRange != 0 && end-start > maxTimeRange {
|
||||
newStart = end - maxTimeRange
|
||||
}
|
||||
|
||||
return newStart, newEnd
|
||||
}
|
|
@ -1,409 +0,0 @@
|
|||
package lokiclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
const lokiClientSpanName = "testLokiClientSpanName"
|
||||
|
||||
func TestLokiConfig(t *testing.T) {
|
||||
t.Run("test URL options", func(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
in setting.UnifiedAlertingLokiSettings
|
||||
expRead string
|
||||
expWrite string
|
||||
expErr string
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "remote url only",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
},
|
||||
expRead: "http://url.com",
|
||||
expWrite: "http://url.com",
|
||||
},
|
||||
{
|
||||
name: "separate urls",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiReadURL: "http://read.url.com",
|
||||
LokiWriteURL: "http://write.url.com",
|
||||
},
|
||||
expRead: "http://read.url.com",
|
||||
expWrite: "http://write.url.com",
|
||||
},
|
||||
{
|
||||
name: "single fallback",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
LokiReadURL: "http://read.url.com",
|
||||
},
|
||||
expRead: "http://read.url.com",
|
||||
expWrite: "http://url.com",
|
||||
},
|
||||
{
|
||||
name: "missing read",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiWriteURL: "http://url.com",
|
||||
},
|
||||
expErr: "either read path URL or remote",
|
||||
},
|
||||
{
|
||||
name: "missing write",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiReadURL: "http://url.com",
|
||||
},
|
||||
expErr: "either write path URL or remote",
|
||||
},
|
||||
{
|
||||
name: "invalid",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "://://",
|
||||
},
|
||||
expErr: "failed to parse",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
res, err := NewLokiConfig(tc.in)
|
||||
if tc.expErr != "" {
|
||||
require.ErrorContains(t, err, tc.expErr)
|
||||
} else {
|
||||
require.Equal(t, tc.expRead, res.ReadPathURL.String())
|
||||
require.Equal(t, tc.expWrite, res.WritePathURL.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("captures external labels", func(t *testing.T) {
|
||||
set := setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
ExternalLabels: map[string]string{"a": "b"},
|
||||
}
|
||||
|
||||
res, err := NewLokiConfig(set)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, res.ExternalLabels, "a")
|
||||
})
|
||||
}
|
||||
|
||||
func TestLokiHTTPClient(t *testing.T) {
|
||||
t.Run("push formats expected data", func(t *testing.T) {
|
||||
req := NewFakeRequester()
|
||||
client := createTestLokiClient(req)
|
||||
now := time.Now().UTC()
|
||||
data := []Stream{
|
||||
{
|
||||
Stream: map[string]string{},
|
||||
Values: []Sample{
|
||||
{
|
||||
T: now,
|
||||
V: "some line",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := client.Push(context.Background(), data)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path)
|
||||
sent := reqBody(t, req.LastRequest)
|
||||
exp := fmt.Sprintf(`{"streams": [{"stream": {}, "values": [["%d", "some line"]]}]}`, now.UnixNano())
|
||||
require.JSONEq(t, exp, sent)
|
||||
})
|
||||
|
||||
t.Run("range query", func(t *testing.T) {
|
||||
t.Run("passes along page size", func(t *testing.T) {
|
||||
req := NewFakeRequester().WithResponse(&http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString(`{}`)),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
})
|
||||
client := createTestLokiClient(req)
|
||||
now := time.Now().UTC().UnixNano()
|
||||
q := `{from="state-history"}`
|
||||
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, 1100)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(1100), params.Get("limit"))
|
||||
})
|
||||
|
||||
t.Run("uses default page size if limit not provided", func(t *testing.T) {
|
||||
req := NewFakeRequester().WithResponse(&http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString(`{}`)),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
})
|
||||
client := createTestLokiClient(req)
|
||||
now := time.Now().UTC().UnixNano()
|
||||
q := `{from="state-history"}`
|
||||
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, 0)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit"))
|
||||
})
|
||||
|
||||
t.Run("uses default page size if limit invalid", func(t *testing.T) {
|
||||
req := NewFakeRequester().WithResponse(&http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString(`{}`)),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
})
|
||||
client := createTestLokiClient(req)
|
||||
now := time.Now().UTC().UnixNano()
|
||||
q := `{from="state-history"}`
|
||||
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, -100)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit"))
|
||||
})
|
||||
|
||||
t.Run("uses maximum page size if limit too big", func(t *testing.T) {
|
||||
req := NewFakeRequester().WithResponse(&http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString(`{}`)),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
})
|
||||
client := createTestLokiClient(req)
|
||||
now := time.Now().UTC().UnixNano()
|
||||
q := `{from="state-history"}`
|
||||
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(maximumPageSize), params.Get("limit"))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// This function can be used for local testing, just remove the skip call.
|
||||
func TestLokiHTTPClient_Manual(t *testing.T) {
|
||||
t.Skip()
|
||||
|
||||
t.Run("smoke test pinging Loki", func(t *testing.T) {
|
||||
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
client := NewLokiClient(LokiConfig{
|
||||
ReadPathURL: url,
|
||||
WritePathURL: url,
|
||||
Encoder: JsonEncoder{},
|
||||
}, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName)
|
||||
|
||||
// Unauthorized request should fail against Grafana Cloud.
|
||||
err = client.Ping(context.Background())
|
||||
require.Error(t, err)
|
||||
|
||||
client.cfg.BasicAuthUser = "<your_username>"
|
||||
client.cfg.BasicAuthPassword = "<your_password>"
|
||||
|
||||
// When running on prem, you might need to set the tenant id,
|
||||
// so the x-scope-orgid header is set.
|
||||
// client.cfg.TenantID = "<your_tenant_id>"
|
||||
|
||||
// Authorized request should not fail against Grafana Cloud.
|
||||
err = client.Ping(context.Background())
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("smoke test range querying Loki", func(t *testing.T) {
|
||||
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
client := NewLokiClient(LokiConfig{
|
||||
ReadPathURL: url,
|
||||
WritePathURL: url,
|
||||
BasicAuthUser: "<your_username>",
|
||||
BasicAuthPassword: "<your_password>",
|
||||
Encoder: JsonEncoder{},
|
||||
}, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName)
|
||||
|
||||
// When running on prem, you might need to set the tenant id,
|
||||
// so the x-scope-orgid header is set.
|
||||
// client.cfg.TenantID = "<your_tenant_id>"
|
||||
|
||||
logQL := `{probe="Paris"}`
|
||||
|
||||
// Define the query time range
|
||||
start := time.Now().Add(-30 * time.Minute).UnixNano()
|
||||
end := time.Now().UnixNano()
|
||||
|
||||
// Authorized request should not fail against Grafana Cloud.
|
||||
res, err := client.RangeQuery(context.Background(), logQL, start, end, defaultPageSize)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, res)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRow(t *testing.T) {
|
||||
t.Run("marshal", func(t *testing.T) {
|
||||
row := Sample{
|
||||
T: time.Unix(0, 1234),
|
||||
V: "some sample",
|
||||
}
|
||||
|
||||
jsn, err := json.Marshal(&row)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.JSONEq(t, `["1234", "some sample"]`, string(jsn))
|
||||
})
|
||||
|
||||
t.Run("unmarshal", func(t *testing.T) {
|
||||
jsn := []byte(`["1234", "some sample"]`)
|
||||
|
||||
row := Sample{}
|
||||
err := json.Unmarshal(jsn, &row)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1234), row.T.UnixNano())
|
||||
require.Equal(t, "some sample", row.V)
|
||||
})
|
||||
|
||||
t.Run("unmarshal invalid", func(t *testing.T) {
|
||||
jsn := []byte(`{"key": "wrong shape"}`)
|
||||
|
||||
row := Sample{}
|
||||
err := json.Unmarshal(jsn, &row)
|
||||
|
||||
require.ErrorContains(t, err, "failed to deserialize sample")
|
||||
})
|
||||
|
||||
t.Run("unmarshal bad timestamp", func(t *testing.T) {
|
||||
jsn := []byte(`["not-unix-nano", "some sample"]`)
|
||||
|
||||
row := Sample{}
|
||||
err := json.Unmarshal(jsn, &row)
|
||||
|
||||
require.ErrorContains(t, err, "timestamp in Loki sample")
|
||||
})
|
||||
}
|
||||
|
||||
func TestStream(t *testing.T) {
|
||||
t.Run("marshal", func(t *testing.T) {
|
||||
stream := Stream{
|
||||
Stream: map[string]string{"a": "b"},
|
||||
Values: []Sample{
|
||||
{T: time.Unix(0, 1), V: "one"},
|
||||
{T: time.Unix(0, 2), V: "two"},
|
||||
},
|
||||
}
|
||||
|
||||
jsn, err := json.Marshal(stream)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.JSONEq(
|
||||
t,
|
||||
`{"stream": {"a": "b"}, "values": [["1", "one"], ["2", "two"]]}`,
|
||||
string(jsn),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
func TestClampRange(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
oldRange []int64
|
||||
max int64
|
||||
newRange []int64
|
||||
}{
|
||||
{
|
||||
name: "clamps start value if max is smaller than range",
|
||||
oldRange: []int64{5, 10},
|
||||
max: 1,
|
||||
newRange: []int64{9, 10},
|
||||
},
|
||||
{
|
||||
name: "returns same values if max is greater than range",
|
||||
oldRange: []int64{5, 10},
|
||||
max: 20,
|
||||
newRange: []int64{5, 10},
|
||||
},
|
||||
{
|
||||
name: "returns same values if max is equal to range",
|
||||
oldRange: []int64{5, 10},
|
||||
max: 5,
|
||||
newRange: []int64{5, 10},
|
||||
},
|
||||
{
|
||||
name: "returns same values if max is zero",
|
||||
oldRange: []int64{5, 10},
|
||||
max: 0,
|
||||
newRange: []int64{5, 10},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range tc {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
start, end := ClampRange(c.oldRange[0], c.oldRange[1], c.max)
|
||||
|
||||
require.Equal(t, c.newRange[0], start)
|
||||
require.Equal(t, c.newRange[1], end)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createTestLokiClient(req client.Requester) *HttpLokiClient {
|
||||
url, _ := url.Parse("http://some.url")
|
||||
cfg := LokiConfig{
|
||||
WritePathURL: url,
|
||||
ReadPathURL: url,
|
||||
Encoder: JsonEncoder{},
|
||||
}
|
||||
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
return NewLokiClient(cfg, req, met.BytesWritten, met.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName)
|
||||
}
|
||||
|
||||
func reqBody(t *testing.T, req *http.Request) string {
|
||||
t.Helper()
|
||||
|
||||
defer func() {
|
||||
_ = req.Body.Close()
|
||||
}()
|
||||
byt, err := io.ReadAll(req.Body)
|
||||
require.NoError(t, err)
|
||||
return string(byt)
|
||||
}
|
|
@ -1,107 +0,0 @@
|
|||
package lokiclient
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/loki/logproto"
|
||||
)
|
||||
|
||||
type JsonEncoder struct{}
|
||||
|
||||
func (e JsonEncoder) encode(s []Stream) ([]byte, error) {
|
||||
body := struct {
|
||||
Streams []Stream `json:"streams"`
|
||||
}{Streams: s}
|
||||
enc, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize Loki payload: %w", err)
|
||||
}
|
||||
return enc, nil
|
||||
}
|
||||
|
||||
func (e JsonEncoder) headers() map[string]string {
|
||||
return map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
}
|
||||
|
||||
type SnappyProtoEncoder struct{}
|
||||
|
||||
func (e SnappyProtoEncoder) encode(s []Stream) ([]byte, error) {
|
||||
body := logproto.PushRequest{
|
||||
Streams: make([]logproto.Stream, 0, len(s)),
|
||||
}
|
||||
|
||||
for _, str := range s {
|
||||
entries := make([]logproto.Entry, 0, len(str.Values))
|
||||
for _, sample := range str.Values {
|
||||
entries = append(entries, logproto.Entry{
|
||||
Timestamp: sample.T,
|
||||
Line: sample.V,
|
||||
})
|
||||
}
|
||||
body.Streams = append(body.Streams, logproto.Stream{
|
||||
Labels: labelsMapToString(str.Stream, ""),
|
||||
Entries: entries,
|
||||
// Hash seems to be mainly used for query responses. Promtail does not seem to calculate this field on push.
|
||||
})
|
||||
}
|
||||
|
||||
buf, err := proto.Marshal(&body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize Loki payload to proto: %w", err)
|
||||
}
|
||||
buf = snappy.Encode(nil, buf)
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (e SnappyProtoEncoder) headers() map[string]string {
|
||||
return map[string]string{
|
||||
"Content-Type": "application/x-protobuf",
|
||||
"Content-Encoding": "snappy",
|
||||
}
|
||||
}
|
||||
|
||||
// Copied from promtail.
|
||||
// Modified slightly to work in terms of plain map[string]string to avoid some unnecessary copies and type casts.
|
||||
// TODO: pkg/components/loki/lokihttp/batch.go contains an older (loki 2.7.4 released) version of this.
|
||||
// TODO: Consider replacing that one, with this one.
|
||||
func labelsMapToString(ls map[string]string, without model.LabelName) string {
|
||||
var b strings.Builder
|
||||
totalSize := 2
|
||||
lstrs := make([]string, 0, len(ls))
|
||||
|
||||
for l, v := range ls {
|
||||
if l == string(without) {
|
||||
continue
|
||||
}
|
||||
|
||||
lstrs = append(lstrs, l)
|
||||
// guess size increase: 2 for `, ` between labels and 3 for the `=` and quotes around label value
|
||||
totalSize += len(l) + 2 + len(v) + 3
|
||||
}
|
||||
|
||||
b.Grow(totalSize)
|
||||
b.WriteByte('{')
|
||||
slices.Sort(lstrs)
|
||||
for i, l := range lstrs {
|
||||
if i > 0 {
|
||||
b.WriteString(", ")
|
||||
}
|
||||
|
||||
b.WriteString(l)
|
||||
b.WriteString(`=`)
|
||||
b.WriteString(strconv.Quote(ls[l]))
|
||||
}
|
||||
b.WriteByte('}')
|
||||
|
||||
return b.String()
|
||||
}
|
|
@ -1,45 +0,0 @@
|
|||
package lokiclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type FakeRequester struct {
|
||||
LastRequest *http.Request
|
||||
Resp *http.Response
|
||||
}
|
||||
|
||||
func NewFakeRequester() *FakeRequester {
|
||||
return &FakeRequester{
|
||||
Resp: &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString("")),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FakeRequester) WithResponse(resp *http.Response) *FakeRequester {
|
||||
f.Resp = resp
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *FakeRequester) Do(req *http.Request) (*http.Response, error) {
|
||||
f.LastRequest = req
|
||||
f.Resp.Request = req // Not concurrency-safe!
|
||||
return f.Resp, nil
|
||||
}
|
||||
|
||||
func BadResponse() *http.Response {
|
||||
return &http.Response{
|
||||
Status: "400 Bad Request",
|
||||
StatusCode: http.StatusBadRequest,
|
||||
Body: io.NopCloser(bytes.NewBufferString("")),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package lokiconfig
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func NewLokiConfig(cfg setting.UnifiedAlertingLokiSettings) (lokiclient.LokiConfig, error) {
|
||||
read, write := cfg.LokiReadURL, cfg.LokiWriteURL
|
||||
if read == "" {
|
||||
read = cfg.LokiRemoteURL
|
||||
}
|
||||
if write == "" {
|
||||
write = cfg.LokiRemoteURL
|
||||
}
|
||||
|
||||
if read == "" {
|
||||
return lokiclient.LokiConfig{}, fmt.Errorf("either read path URL or remote Loki URL must be provided")
|
||||
}
|
||||
if write == "" {
|
||||
return lokiclient.LokiConfig{}, fmt.Errorf("either write path URL or remote Loki URL must be provided")
|
||||
}
|
||||
|
||||
readURL, err := url.Parse(read)
|
||||
if err != nil {
|
||||
return lokiclient.LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err)
|
||||
}
|
||||
writeURL, err := url.Parse(write)
|
||||
if err != nil {
|
||||
return lokiclient.LokiConfig{}, fmt.Errorf("failed to parse loki remote write URL: %w", err)
|
||||
}
|
||||
|
||||
return lokiclient.LokiConfig{
|
||||
ReadPathURL: readURL,
|
||||
WritePathURL: writeURL,
|
||||
BasicAuthUser: cfg.LokiBasicAuthUsername,
|
||||
BasicAuthPassword: cfg.LokiBasicAuthPassword,
|
||||
TenantID: cfg.LokiTenantID,
|
||||
ExternalLabels: cfg.ExternalLabels,
|
||||
MaxQueryLength: cfg.LokiMaxQueryLength,
|
||||
MaxQuerySize: cfg.LokiMaxQuerySize,
|
||||
// Snappy-compressed protobuf is the default, same goes for Promtail.
|
||||
Encoder: lokiclient.SnappyProtoEncoder{},
|
||||
}, nil
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package lokiconfig
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLokiConfig(t *testing.T) {
|
||||
t.Run("test URL options", func(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
in setting.UnifiedAlertingLokiSettings
|
||||
expRead string
|
||||
expWrite string
|
||||
expErr string
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "remote url only",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
},
|
||||
expRead: "http://url.com",
|
||||
expWrite: "http://url.com",
|
||||
},
|
||||
{
|
||||
name: "separate urls",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiReadURL: "http://read.url.com",
|
||||
LokiWriteURL: "http://write.url.com",
|
||||
},
|
||||
expRead: "http://read.url.com",
|
||||
expWrite: "http://write.url.com",
|
||||
},
|
||||
{
|
||||
name: "single fallback",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
LokiReadURL: "http://read.url.com",
|
||||
},
|
||||
expRead: "http://read.url.com",
|
||||
expWrite: "http://url.com",
|
||||
},
|
||||
{
|
||||
name: "missing read",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiWriteURL: "http://url.com",
|
||||
},
|
||||
expErr: "either read path URL or remote",
|
||||
},
|
||||
{
|
||||
name: "missing write",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiReadURL: "http://url.com",
|
||||
},
|
||||
expErr: "either write path URL or remote",
|
||||
},
|
||||
{
|
||||
name: "invalid",
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "://://",
|
||||
},
|
||||
expErr: "failed to parse",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
res, err := NewLokiConfig(tc.in)
|
||||
if tc.expErr != "" {
|
||||
require.ErrorContains(t, err, tc.expErr)
|
||||
} else {
|
||||
require.Equal(t, tc.expRead, res.ReadPathURL.String())
|
||||
require.Equal(t, tc.expWrite, res.WritePathURL.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("captures external labels", func(t *testing.T) {
|
||||
set := setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
ExternalLabels: map[string]string{"a": "b"},
|
||||
}
|
||||
|
||||
res, err := NewLokiConfig(set)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, res.ExternalLabels, "a")
|
||||
})
|
||||
}
|
|
@ -7,8 +7,10 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
notificationHistorian "github.com/grafana/alerting/notify/historian"
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
"github.com/grafana/alerting/notify/nfstatus"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiconfig"
|
||||
"github.com/prometheus/alertmanager/featurecontrol"
|
||||
"github.com/prometheus/alertmanager/matchers/compat"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
@ -659,7 +661,7 @@ func configureHistorianBackend(
|
|||
return historian.NewAnnotationBackend(annotationBackendLogger, store, rs, met, ac), nil
|
||||
}
|
||||
if backend == historian.BackendTypeLoki {
|
||||
lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings)
|
||||
lcfg, err := lokiconfig.NewLokiConfig(cfg.LokiSettings)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid remote loki configuration: %w", err)
|
||||
}
|
||||
|
@ -712,20 +714,20 @@ func configureNotificationHistorian(
|
|||
}
|
||||
|
||||
met.Info.Set(1)
|
||||
lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings)
|
||||
lcfg, err := lokiconfig.NewLokiConfig(cfg.LokiSettings)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid remote loki configuration: %w", err)
|
||||
}
|
||||
req := lokiclient.NewRequester()
|
||||
logger := log.New("ngalert.notifier.historian").FromContext(ctx)
|
||||
notificationHistorian := notifier.NewNotificationHistorian(logger, lcfg, req, met, tracer)
|
||||
nh := notificationHistorian.NewNotificationHistorian(logger, lcfg, req, met.BytesWritten, met.WriteDuration, met.WritesTotal, met.WritesFailed, tracer)
|
||||
|
||||
testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancelFunc()
|
||||
if err := notificationHistorian.TestConnection(testConnCtx); err != nil {
|
||||
if err := nh.TestConnection(testConnCtx); err != nil {
|
||||
l.Error("Failed to communicate with configured remote Loki backend, notification history may not be persisted", "error", err)
|
||||
}
|
||||
return notificationHistorian, nil
|
||||
return nh, nil
|
||||
}
|
||||
|
||||
func createRecordingWriter(settings setting.RecordingRuleSettings, httpClientProvider httpclient.Provider, datasourceService datasources.DataSourceService, pluginContextProvider *plugincontext.Provider, clock clock.Clock, m *metrics.RemoteWriter) (schedule.RecordingWriter, error) {
|
||||
|
|
|
@ -1,190 +0,0 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
alertingModels "github.com/grafana/alerting/models"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
prometheusModel "github.com/prometheus/common/model"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const LokiClientSpanName = "ngalert.notification-historian.client"
|
||||
const NotificationHistoryWriteTimeout = time.Minute
|
||||
const NotificationHistoryKey = "from"
|
||||
const NotificationHistoryLabelValue = "notify-history"
|
||||
|
||||
type NotificationHistoryLokiEntry struct {
|
||||
SchemaVersion int `json:"schemaVersion"`
|
||||
Receiver string `json:"receiver"`
|
||||
Status string `json:"status"`
|
||||
GroupLabels map[string]string `json:"groupLabels"`
|
||||
Alerts []NotificationHistoryLokiEntryAlert `json:"alerts"`
|
||||
Retry bool `json:"retry"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Duration int64 `json:"duration"`
|
||||
}
|
||||
|
||||
type NotificationHistoryLokiEntryAlert struct {
|
||||
Status string `json:"status"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
Annotations map[string]string `json:"annotations"`
|
||||
StartsAt time.Time `json:"startsAt"`
|
||||
EndsAt time.Time `json:"endsAt"`
|
||||
RuleUID string `json:"ruleUID"`
|
||||
}
|
||||
|
||||
type remoteLokiClient interface {
|
||||
Ping(context.Context) error
|
||||
Push(context.Context, []lokiclient.Stream) error
|
||||
}
|
||||
|
||||
type NotificationHistorian struct {
|
||||
client remoteLokiClient
|
||||
externalLabels map[string]string
|
||||
metrics *metrics.NotificationHistorian
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func NewNotificationHistorian(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.NotificationHistorian, tracer tracing.Tracer) *NotificationHistorian {
|
||||
return &NotificationHistorian{
|
||||
client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName),
|
||||
externalLabels: cfg.ExternalLabels,
|
||||
metrics: metrics,
|
||||
log: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) TestConnection(ctx context.Context) error {
|
||||
return h.client.Ping(ctx)
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) Record(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) <-chan error {
|
||||
stream, err := h.prepareStream(ctx, alerts, retry, notificationErr, duration)
|
||||
logger := h.log.FromContext(ctx)
|
||||
errCh := make(chan error, 1)
|
||||
if err != nil {
|
||||
logger.Error("Failed to convert notification history to stream", "error", err)
|
||||
errCh <- fmt.Errorf("failed to convert notification history to stream: %w", err)
|
||||
close(errCh)
|
||||
return errCh
|
||||
}
|
||||
|
||||
// This is a new background job, so let's create a new context for it.
|
||||
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
|
||||
// immediately but rather try to flush writes.
|
||||
// This also prevents timeouts or other lingering objects (like transactions) from being
|
||||
// incorrectly propagated here from other areas.
|
||||
writeCtx := context.Background()
|
||||
writeCtx, cancel := context.WithTimeout(writeCtx, NotificationHistoryWriteTimeout)
|
||||
writeCtx = trace.ContextWithSpan(writeCtx, trace.SpanFromContext(ctx))
|
||||
|
||||
go func(ctx context.Context) {
|
||||
defer cancel()
|
||||
defer close(errCh)
|
||||
logger := h.log.FromContext(ctx)
|
||||
logger.Debug("Saving notification history")
|
||||
h.metrics.WritesTotal.Inc()
|
||||
|
||||
if err := h.recordStream(ctx, stream, logger); err != nil {
|
||||
logger.Error("Failed to save notification history", "error", err)
|
||||
h.metrics.WritesFailed.Inc()
|
||||
errCh <- fmt.Errorf("failed to save notification history: %w", err)
|
||||
}
|
||||
}(writeCtx)
|
||||
return errCh
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) prepareStream(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) (lokiclient.Stream, error) {
|
||||
receiverName, ok := notify.ReceiverName(ctx)
|
||||
if !ok {
|
||||
return lokiclient.Stream{}, fmt.Errorf("receiver name not found in context")
|
||||
}
|
||||
groupLabels, ok := notify.GroupLabels(ctx)
|
||||
if !ok {
|
||||
return lokiclient.Stream{}, fmt.Errorf("group labels not found in context")
|
||||
}
|
||||
now, ok := notify.Now(ctx)
|
||||
if !ok {
|
||||
return lokiclient.Stream{}, fmt.Errorf("now not found in context")
|
||||
}
|
||||
|
||||
entryAlerts := make([]NotificationHistoryLokiEntryAlert, len(alerts))
|
||||
for i, alert := range alerts {
|
||||
labels := prepareLabels(alert.Labels)
|
||||
annotations := prepareLabels(alert.Annotations)
|
||||
entryAlerts[i] = NotificationHistoryLokiEntryAlert{
|
||||
Labels: labels,
|
||||
Annotations: annotations,
|
||||
Status: string(alert.StatusAt(now)),
|
||||
StartsAt: alert.StartsAt,
|
||||
EndsAt: alert.EndsAt,
|
||||
RuleUID: string(alert.Labels[alertingModels.RuleUIDLabel]),
|
||||
}
|
||||
}
|
||||
|
||||
notificationErrStr := ""
|
||||
if notificationErr != nil {
|
||||
notificationErrStr = notificationErr.Error()
|
||||
}
|
||||
|
||||
entry := NotificationHistoryLokiEntry{
|
||||
SchemaVersion: 1,
|
||||
Receiver: receiverName,
|
||||
Status: string(types.Alerts(alerts...).StatusAt(now)),
|
||||
GroupLabels: prepareLabels(groupLabels),
|
||||
Alerts: entryAlerts,
|
||||
Retry: retry,
|
||||
Error: notificationErrStr,
|
||||
Duration: duration.Milliseconds(),
|
||||
}
|
||||
|
||||
entryJSON, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
return lokiclient.Stream{}, err
|
||||
}
|
||||
|
||||
streamLabels := make(map[string]string)
|
||||
streamLabels[NotificationHistoryKey] = NotificationHistoryLabelValue
|
||||
for k, v := range h.externalLabels {
|
||||
streamLabels[k] = v
|
||||
}
|
||||
|
||||
return lokiclient.Stream{
|
||||
Stream: streamLabels,
|
||||
Values: []lokiclient.Sample{
|
||||
{
|
||||
T: now,
|
||||
V: string(entryJSON),
|
||||
}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) recordStream(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error {
|
||||
if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Debug("Done saving notification history")
|
||||
return nil
|
||||
}
|
||||
|
||||
func prepareLabels(labels prometheusModel.LabelSet) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for k, v := range labels {
|
||||
// Remove private labels
|
||||
if !strings.HasPrefix(string(k), "__") && !strings.HasSuffix(string(k), "__") {
|
||||
result[string(k)] = string(v)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
alertingModels "github.com/grafana/alerting/models"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var testNow = time.Date(2025, time.July, 15, 16, 55, 0, 0, time.UTC)
|
||||
var testAlerts = []*types.Alert{
|
||||
{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "Alert1", alertingModels.RuleUIDLabel: "testRuleUID"},
|
||||
Annotations: model.LabelSet{"foo": "bar", "__private__": "baz"},
|
||||
StartsAt: testNow,
|
||||
EndsAt: testNow,
|
||||
GeneratorURL: "http://localhost/test",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestRecord(t *testing.T) {
|
||||
t.Run("write notification history to Loki", func(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
retry bool
|
||||
notificationErr error
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
"successful notification",
|
||||
false,
|
||||
nil,
|
||||
"{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":false,\\\"duration\\\":1000}\"]]}]}",
|
||||
},
|
||||
{
|
||||
"failed notification",
|
||||
true,
|
||||
errors.New("test notification error"),
|
||||
"{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":true,\\\"error\\\":\\\"test notification error\\\",\\\"duration\\\":1000}\"]]}]}",
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry())
|
||||
h := createTestNotificationHistorian(req, met)
|
||||
|
||||
err := <-h.Record(recordCtx(), testAlerts, tc.retry, tc.notificationErr, time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
reqBody, err := io.ReadAll(req.LastRequest.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, string(reqBody))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("emits expected write metrics", func(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
met := metrics.NewNotificationHistorianMetrics(reg)
|
||||
goodHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester(), met)
|
||||
badHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met)
|
||||
|
||||
<-goodHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second)
|
||||
<-badHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second)
|
||||
|
||||
exp := bytes.NewBufferString(`
|
||||
# HELP grafana_alerting_notification_history_writes_failed_total The total number of failed writes of notification history batches.
|
||||
# TYPE grafana_alerting_notification_history_writes_failed_total counter
|
||||
grafana_alerting_notification_history_writes_failed_total 1
|
||||
# HELP grafana_alerting_notification_history_writes_total The total number of notification history batches that were attempted to be written.
|
||||
# TYPE grafana_alerting_notification_history_writes_total counter
|
||||
grafana_alerting_notification_history_writes_total 2
|
||||
`)
|
||||
err := testutil.GatherAndCompare(reg, exp,
|
||||
"grafana_alerting_notification_history_writes_total",
|
||||
"grafana_alerting_notification_history_writes_failed_total",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("returns error when context is missing required fields", func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry())
|
||||
h := createTestNotificationHistorian(req, met)
|
||||
|
||||
err := <-h.Record(context.Background(), testAlerts, false, nil, time.Second)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func createTestNotificationHistorian(req client.Requester, met *metrics.NotificationHistorian) *NotificationHistorian {
|
||||
writePathURL, _ := url.Parse("http://some.url")
|
||||
cfg := lokiclient.LokiConfig{
|
||||
WritePathURL: writePathURL,
|
||||
ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"},
|
||||
Encoder: lokiclient.JsonEncoder{},
|
||||
}
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
return NewNotificationHistorian(log.NewNopLogger(), cfg, req, met, tracer)
|
||||
}
|
||||
|
||||
func recordCtx() context.Context {
|
||||
ctx := notify.WithReceiverName(context.Background(), "testReceiverName")
|
||||
ctx = notify.WithGroupLabels(ctx, model.LabelSet{"foo": "bar"})
|
||||
ctx = notify.WithNow(ctx, testNow)
|
||||
return ctx
|
||||
}
|
|
@ -8,9 +8,9 @@ import (
|
|||
"time"
|
||||
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
alertingInstrument "github.com/grafana/alerting/http/instrument"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/util/httpclient"
|
||||
amclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
|
@ -29,7 +29,7 @@ type AlertmanagerConfig struct {
|
|||
|
||||
type Alertmanager struct {
|
||||
*amclient.AlertmanagerAPI
|
||||
httpClient client.Requester
|
||||
httpClient alertingInstrument.Requester
|
||||
url *url.URL
|
||||
logger log.Logger
|
||||
}
|
||||
|
@ -45,8 +45,8 @@ func NewAlertmanager(cfg *AlertmanagerConfig, metrics *metrics.RemoteAlertmanage
|
|||
Timeout: cfg.Timeout,
|
||||
}
|
||||
|
||||
tc := client.NewTimedClient(c, metrics.RequestLatency)
|
||||
trc := client.NewTracedClient(tc, tracer, "remote.alertmanager.client")
|
||||
tc := alertingInstrument.NewTimedClient(c, metrics.RequestLatency)
|
||||
trc := alertingInstrument.NewTracedClient(tc, tracer, "remote.alertmanager.client")
|
||||
apiEndpoint := *cfg.URL
|
||||
|
||||
// Next, make sure you set the right path.
|
||||
|
@ -66,7 +66,7 @@ func NewAlertmanager(cfg *AlertmanagerConfig, metrics *metrics.RemoteAlertmanage
|
|||
|
||||
// GetAuthedClient returns a client.Requester that includes a configured MimirAuthRoundTripper.
|
||||
// Requests using this client are fully authenticated.
|
||||
func (am *Alertmanager) GetAuthedClient() client.Requester {
|
||||
func (am *Alertmanager) GetAuthedClient() alertingInstrument.Requester {
|
||||
return am.httpClient
|
||||
}
|
||||
|
||||
|
|
|
@ -14,10 +14,11 @@ import (
|
|||
|
||||
alertingNotify "github.com/grafana/alerting/notify"
|
||||
|
||||
alertingInstrument "github.com/grafana/alerting/http/instrument"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/util/httpclient"
|
||||
)
|
||||
|
@ -41,7 +42,7 @@ type MimirClient interface {
|
|||
}
|
||||
|
||||
type Mimir struct {
|
||||
client client.Requester
|
||||
client alertingInstrument.Requester
|
||||
endpoint *url.URL
|
||||
logger log.Logger
|
||||
metrics *metrics.RemoteAlertmanager
|
||||
|
@ -98,8 +99,8 @@ func New(cfg *Config, metrics *metrics.RemoteAlertmanager, tracer tracing.Tracer
|
|||
c := &http.Client{
|
||||
Transport: rt,
|
||||
}
|
||||
tc := client.NewTimedClient(c, metrics.RequestLatency)
|
||||
trc := client.NewTracedClient(tc, tracer, "remote.alertmanager.client")
|
||||
tc := alertingInstrument.NewTimedClient(c, metrics.RequestLatency)
|
||||
trc := alertingInstrument.NewTracedClient(tc, tracer, "remote.alertmanager.client")
|
||||
|
||||
return &Mimir{
|
||||
endpoint: cfg.URL,
|
||||
|
|
|
@ -12,16 +12,17 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
alertingInstrument "github.com/grafana/alerting/http/instrument"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/errutil"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
|
@ -85,7 +86,7 @@ type RemoteLokiBackend struct {
|
|||
ruleStore RuleStore
|
||||
}
|
||||
|
||||
func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend {
|
||||
func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req alertingInstrument.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend {
|
||||
return &RemoteLokiBackend{
|
||||
client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName),
|
||||
externalLabels: cfg.ExternalLabels,
|
||||
|
|
|
@ -13,13 +13,15 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
alertingInstrument "github.com/grafana/alerting/http/instrument"
|
||||
"github.com/grafana/alerting/http/instrument/instrumenttest"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
|
@ -27,7 +29,6 @@ import (
|
|||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
rulesAuthz "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol"
|
||||
acfakes "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
|
@ -668,7 +669,7 @@ func TestMerge(t *testing.T) {
|
|||
|
||||
func TestRecordStates(t *testing.T) {
|
||||
t.Run("writes state transitions to loki", func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
req := instrumenttest.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
|
@ -685,8 +686,8 @@ func TestRecordStates(t *testing.T) {
|
|||
t.Run("emits expected write metrics", func(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
|
||||
loki := createTestLokiBackend(t, lokiclient.NewFakeRequester(), met)
|
||||
errLoki := createTestLokiBackend(t, lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met) //nolint:bodyclose
|
||||
loki := createTestLokiBackend(t, instrumenttest.NewFakeRequester(), met)
|
||||
errLoki := createTestLokiBackend(t, instrumenttest.NewFakeRequester().WithResponse(instrumenttest.BadResponse()), met) //nolint:bodyclose
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
State: eval.Alerting,
|
||||
|
@ -720,7 +721,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
|||
})
|
||||
|
||||
t.Run("elides request if nothing to send", func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
req := instrumenttest.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := []state.StateTransition{}
|
||||
|
@ -732,7 +733,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
|||
})
|
||||
|
||||
t.Run("succeeds with special chars in labels", func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
req := instrumenttest.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
|
@ -755,7 +756,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
|||
})
|
||||
|
||||
t.Run("adds external labels to log lines", func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
req := instrumenttest.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
|
@ -783,7 +784,7 @@ func TestGetFolderUIDsForFilter(t *testing.T) {
|
|||
usr := accesscontrol.BackgroundUser("test", 1, org.RoleNone, nil)
|
||||
|
||||
createLoki := func(ac AccessControl) *RemoteLokiBackend {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
req := instrumenttest.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rules := fakes.NewRuleStore(t)
|
||||
f := make([]*folder.Folder, 0, len(folders))
|
||||
|
@ -923,12 +924,12 @@ func TestGetFolderUIDsForFilter(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func createTestLokiBackend(t *testing.T, req client.Requester, met *metrics.Historian) *RemoteLokiBackend {
|
||||
func createTestLokiBackend(t *testing.T, req alertingInstrument.Requester, met *metrics.Historian) *RemoteLokiBackend {
|
||||
url, _ := url.Parse("http://some.url")
|
||||
cfg := lokiclient.LokiConfig{
|
||||
WritePathURL: url,
|
||||
ReadPathURL: url,
|
||||
Encoder: lokiclient.JsonEncoder{},
|
||||
Encoder: lokiclient.JSONEncoder{},
|
||||
ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"},
|
||||
}
|
||||
lokiBackendLogger := log.New("ngalert.state.historian", "backend", "loki")
|
||||
|
|
Loading…
Reference in New Issue