From d4bad37853054566bd1fcfea3d0db626c27bb16b Mon Sep 17 00:00:00 2001 From: Vadim Stepanov Date: Mon, 15 Sep 2025 15:23:51 +0100 Subject: [PATCH] Alerting: Move notification historian to `grafana/alerting` (#109078) * Move notification historian to grafana/alerting * wip * golangci-lint * Revert "golangci-lint" This reverts commit 10ccebad41baf871c184e72fc97c22a99e3087ec. * JSONEncoder * alertingInstrument * go mod tidy * go.work.sum * make update-workspace * merge * revert go.mod changes * github.com/grafana/alerting * make update-workspace * update github.com/grafana/alerting * merge --- go.mod | 4 +- go.sum | 8 +- go.work.sum | 15 +- .../annotationsimpl/loki/historian_store.go | 5 +- .../loki/historian_store_test.go | 13 +- pkg/services/ngalert/client/client.go | 125 ------ pkg/services/ngalert/client/client_test.go | 29 -- pkg/services/ngalert/lokiclient/client.go | 333 -------------- .../ngalert/lokiclient/client_test.go | 409 ------------------ pkg/services/ngalert/lokiclient/encode.go | 107 ----- pkg/services/ngalert/lokiclient/testing.go | 45 -- pkg/services/ngalert/lokiconfig/lokiconfig.go | 48 ++ .../ngalert/lokiconfig/lokiconfig_test.go | 94 ++++ pkg/services/ngalert/ngalert.go | 14 +- pkg/services/ngalert/notifier/historian.go | 190 -------- .../ngalert/notifier/historian_test.go | 126 ------ .../ngalert/remote/client/alertmanager.go | 10 +- pkg/services/ngalert/remote/client/mimir.go | 9 +- pkg/services/ngalert/state/historian/loki.go | 7 +- .../ngalert/state/historian/loki_test.go | 23 +- 20 files changed, 194 insertions(+), 1420 deletions(-) delete mode 100644 pkg/services/ngalert/client/client.go delete mode 100644 pkg/services/ngalert/client/client_test.go delete mode 100644 pkg/services/ngalert/lokiclient/client.go delete mode 100644 pkg/services/ngalert/lokiclient/client_test.go delete mode 100644 pkg/services/ngalert/lokiclient/encode.go delete mode 100644 pkg/services/ngalert/lokiclient/testing.go create mode 100644 pkg/services/ngalert/lokiconfig/lokiconfig.go create mode 100644 pkg/services/ngalert/lokiconfig/lokiconfig_test.go delete mode 100644 pkg/services/ngalert/notifier/historian.go delete mode 100644 pkg/services/ngalert/notifier/historian_test.go diff --git a/go.mod b/go.mod index c095043e143..ed63bcab828 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,7 @@ require ( github.com/googleapis/gax-go/v2 v2.14.2 // @grafana/grafana-backend-group github.com/gorilla/mux v1.8.1 // @grafana/grafana-backend-group github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // @grafana/grafana-app-platform-squad - github.com/grafana/alerting v0.0.0-20250912123435-f2728ab090ee // @grafana/alerting-backend + github.com/grafana/alerting v0.0.0-20250915130141-a8ee25091876 // @grafana/alerting-backend github.com/grafana/authlib v0.0.0-20250909101823-1b466dbd19a1 // @grafana/identity-access-team github.com/grafana/authlib/types v0.0.0-20250721184729-1593a38e4933 // @grafana/identity-access-team github.com/grafana/dataplane/examples v0.0.1 // @grafana/observability-metrics @@ -438,7 +438,7 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 // indirect - github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 // indirect + github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/grafana/sqlds/v4 v4.2.4 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect diff --git a/go.sum b/go.sum index 5ed95314a2f..5f36a9f9dcc 100644 --- a/go.sum +++ b/go.sum @@ -1588,8 +1588,8 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= -github.com/grafana/alerting v0.0.0-20250912123435-f2728ab090ee h1:J/9l2w3Q5JDBEB3t5bDsxhxWldGtFd5KpYGoQi0m/hc= -github.com/grafana/alerting v0.0.0-20250912123435-f2728ab090ee/go.mod h1:XWqj/rlsy4OV/E9XNNyFn+a7U4GNsSugPb2rDBj9+58= +github.com/grafana/alerting v0.0.0-20250915130141-a8ee25091876 h1:BzoGpzARwRCNOHcqQdYPAFp2LS1pqnkLWhIuDdq1zho= +github.com/grafana/alerting v0.0.0-20250915130141-a8ee25091876/go.mod h1:T5sitas9VhVj8/S9LeRLy6H75kTBdh/sCCqHo7gaQI8= github.com/grafana/authlib v0.0.0-20250909101823-1b466dbd19a1 h1:qdH5s5FV+0Dyja8O1tBJq7MGd8nPCfxfsMimcYq5cRI= github.com/grafana/authlib v0.0.0-20250909101823-1b466dbd19a1/go.mod h1:C6CmTG6vfiqebjJswKsc6zes+1F/OtTCi6aAtL5Um6A= github.com/grafana/authlib/types v0.0.0-20250721184729-1593a38e4933 h1:GjiMR5NIO1/bYSCnt8x7VUeOMaupv2qXJkeLDVAddxQ= @@ -1630,8 +1630,8 @@ github.com/grafana/grafana/pkg/semconv v0.0.0-20250804150913-990f1c69ecc2 h1:A65 github.com/grafana/grafana/pkg/semconv v0.0.0-20250804150913-990f1c69ecc2/go.mod h1:2HRzUK/xQEYc+8d5If/XSusMcaYq9IptnBSHACiQcOQ= github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 h1:NznuPwItog+rwdVg8hAuGKP29ndRSzJAwhxKldkP8oQ= github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY= -github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 h1:ZYk42718kSXOiIKdjZKljWLgBpzL5z1yutKABksQCMg= -github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0= +github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000 h1:/5LKSYgLmAhwA4m6iGUD4w1YkydEWWjazn9qxCFT8W0= +github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000/go.mod h1:/ZklAgE1i4f3Z8uriXwESmCr1VLF8lBGaJspuaGuf78= github.com/grafana/loki/v3 v3.2.1 h1:VB7u+KHfvL5aHAxgoVBvz5wVhsdGuqKC7uuOFOOe7jw= github.com/grafana/loki/v3 v3.2.1/go.mod h1:WvdLl6wOS+yahaeQY+xhD2m2XzkHDfKr5FZaX7D/X2Y= github.com/grafana/nanogit v0.0.0-20250723104447-68f58f5ecec0 h1:cS0SlJGIlZbmDLctNj5vIYGemrJDLy25wwoiIyZWVN8= diff --git a/go.work.sum b/go.work.sum index 3faf53bae1c..fd97d5c1ebf 100644 --- a/go.work.sum +++ b/go.work.sum @@ -997,6 +997,7 @@ github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= github.com/grafana/cog v0.0.38 h1:V7gRRn/mh7Bg1ptrCxo0bv6K0SnG9TiDZk+3Ppftn6s= github.com/grafana/cog v0.0.38/go.mod h1:UDstzYqMdgIROmbfkHL8fB9XWQO2lnf5z+4W/eJo4Dc= +github.com/grafana/dskit v0.0.0-20250818234656-8ff9c6532e85/go.mod h1:kImsvJ1xnmeT9Z6StK+RdEKLzlpzBsKwJbEQfmBJdFs= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw= @@ -1041,6 +1042,7 @@ github.com/grafana/grafana/pkg/aggregator v0.0.0-20250121113133-e747350fee2d/go. github.com/grafana/grafana/pkg/semconv v0.0.0-20250121113133-e747350fee2d/go.mod h1:tfLnBpPYgwrBMRz4EXqPCZJyCjEG4Ev37FSlXnocJ2c= github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250121113133-e747350fee2d/go.mod h1:CXpwZ3Mkw6xVlGKc0SqUxqXCP3Uv182q6qAQnLaLxRg= github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250514132646-acbc7b54ed9e/go.mod h1:xrKQcxQxz+IUF90ybtfENFeEXtlj9nAsX/3Fw0KEIeQ= +github.com/grafana/loki/pkg/push v0.0.0-20250823105456-332df2b20000 h1:/5LKSYgLmAhwA4m6iGUD4w1YkydEWWjazn9qxCFT8W0= github.com/grafana/nanogit v0.0.0-20250616082354-5e94194d02ed h1:59JF1WhHLT+lNX89Tm1OzOEySMVMASAhaPbsRjtp8Kc= github.com/grafana/nanogit v0.0.0-20250616082354-5e94194d02ed/go.mod h1:OIAAKNgG5fpuJQRNO1lUSj9nc18Xl3O7M8fjIlBO1cI= github.com/grafana/nanogit v0.0.0-20250619160700-ebf70d342aa5 h1:MAQ2B0cu0V1S91ZjVa7NomNZFjaR2SmdtvdwhqBtyhU= @@ -1370,12 +1372,8 @@ github.com/pquerna/cachecontrol v0.1.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQ github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 h1:xoIK0ctDddBMnc74udxJYBqlo9Ylnsp1waqjLsnef20= github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= -github.com/prometheus/client_golang v1.23.1 h1:w6gXMLQGgd0jXXlote9lRHMe0nG01EbnJT+C0EJru2Y= -github.com/prometheus/client_golang v1.23.1/go.mod h1:br8j//v2eg2K5Vvna5klK8Ku5pcU5r4ll73v6ik5dIQ= github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= -github.com/prometheus/common v0.66.0 h1:K/rJPHrG3+AoQs50r2+0t7zMnMzek2Vbv31OFVsMeVY= -github.com/prometheus/common v0.66.0/go.mod h1:Ux6NtV1B4LatamKE63tJBntoxD++xmtI/lK0VtEplN4= github.com/prometheus/common/assets v0.2.0 h1:0P5OrzoHrYBOSM1OigWL3mY8ZvV2N4zIE/5AahrSrfM= github.com/prometheus/exporter-toolkit v0.10.1-0.20230714054209-2f4150c63f97/go.mod h1:LoBCZeRh+5hX+fSULNyFnagYlQG/gBsyA/deNzROkq8= github.com/prometheus/statsd_exporter v0.26.1 h1:ucbIAdPmwAUcA+dU+Opok8Qt81Aw8HanlO+2N/Wjv7w= @@ -1437,12 +1435,7 @@ github.com/spf13/afero v1.10.0/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= -github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s= -github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= -github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= -github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad h1:fiWzISvDn0Csy5H0iwgAuJGQTUpVfEMJJd4nRFXogbc= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/substrait-io/substrait v0.66.1-0.20250205013839-a30b3e2d7ec6 h1:XqtxwYFCjS4L0o1QD4ipGHCuFG94U0f6BeldbilGQjU= @@ -1947,8 +1940,6 @@ google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a/go. google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA= google.golang.org/genproto/googleapis/api v0.0.0-20250728155136-f173205681a0/go.mod h1:8ytArBbtOy2xfht+y2fqKd5DRDJRUQhqbyEnQ4bDChs= google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c/go.mod h1:ea2MjsO70ssTfCjiwHgI0ZFqcw45Ksuk2ckf9G468GA= -google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ= -google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250505200425-f936aa4a68b2 h1:DbpkGFGRkd4GORg+IWQW2EhxUaa/My/PM8d1CGyTDMY= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250512202823-5a2f75b736a9 h1:YI36gCL8AQMhzYN6+jH8PdV/iZ0On+Zd0rO/7lCH3k8= @@ -1977,8 +1968,6 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go. google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store.go b/pkg/services/annotations/annotationsimpl/loki/historian_store.go index 59ed1494c41..5af4dece39f 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store.go @@ -8,7 +8,8 @@ import ( "sort" "time" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" + "github.com/grafana/alerting/notify/historian/lokiclient" + "github.com/grafana/grafana/pkg/services/ngalert/lokiconfig" "golang.org/x/exp/constraints" "github.com/grafana/grafana/pkg/components/simplejson" @@ -61,7 +62,7 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, db d if !useStore(cfg) { return nil } - lokiCfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings) + lokiCfg, err := lokiconfig.NewLokiConfig(cfg.LokiSettings) if err != nil { // this config error is already handled elsewhere return nil diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go index 967aa88c1d9..348aa38d477 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go @@ -10,11 +10,13 @@ import ( "testing" "time" + "github.com/grafana/alerting/notify/historian/lokiclient" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "golang.org/x/exp/maps" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" + alertingInstrument "github.com/grafana/alerting/http/instrument" + "github.com/grafana/alerting/http/instrument/instrumenttest" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" @@ -24,7 +26,6 @@ import ( "github.com/grafana/grafana/pkg/services/annotations/testutil" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/services/ngalert/client" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -811,7 +812,7 @@ func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO) } type FakeLokiClient struct { - client client.Requester + client alertingInstrument.Requester cfg lokiclient.LokiConfig metrics *metrics.Historian log log.Logger @@ -820,15 +821,15 @@ type FakeLokiClient struct { func NewFakeLokiClient() *FakeLokiClient { url, _ := url.Parse("http://some.url") - req := lokiclient.NewFakeRequester() + req := instrumenttest.NewFakeRequester() metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations_test") return &FakeLokiClient{ - client: client.NewTimedClient(req, metrics.WriteDuration), + client: alertingInstrument.NewTimedClient(req, metrics.WriteDuration), cfg: lokiclient.LokiConfig{ WritePathURL: url, ReadPathURL: url, - Encoder: lokiclient.JsonEncoder{}, + Encoder: lokiclient.JSONEncoder{}, MaxQueryLength: 721 * time.Hour, MaxQuerySize: 65536, }, diff --git a/pkg/services/ngalert/client/client.go b/pkg/services/ngalert/client/client.go deleted file mode 100644 index b6c99d32be4..00000000000 --- a/pkg/services/ngalert/client/client.go +++ /dev/null @@ -1,125 +0,0 @@ -package client - -import ( - "context" - "fmt" - "net/http" - "strconv" - - "github.com/grafana/dskit/instrument" - "github.com/grafana/grafana/pkg/infra/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - semconv "go.opentelemetry.io/otel/semconv/v1.17.0" - "go.opentelemetry.io/otel/trace" -) - -// Requester executes an HTTP request. -type Requester interface { - Do(req *http.Request) (*http.Response, error) -} - -// TimedClient instruments a request with metrics. It implements Requester. -type TimedClient struct { - client Requester - collector instrument.Collector -} - -type contextKey int - -// OperationNameContextKey specifies the operation name location within the context -// for instrumentation. -const OperationNameContextKey contextKey = 0 - -// NewTimedClient creates a Requester that instruments requests on `client`. -func NewTimedClient(client Requester, collector instrument.Collector) *TimedClient { - return &TimedClient{ - client: client, - collector: collector, - } -} - -// Do executes the request. -func (c TimedClient) Do(r *http.Request) (*http.Response, error) { - return TimeRequest(r.Context(), c.operationName(r), c.collector, c.client, r) -} - -// RoundTrip implements the RoundTripper interface. -func (c TimedClient) RoundTrip(r *http.Request) (*http.Response, error) { - return c.Do(r) -} - -func (c TimedClient) operationName(r *http.Request) string { - operation, _ := r.Context().Value(OperationNameContextKey).(string) - if operation == "" { - operation = r.URL.Path - } - return operation -} - -// TimeRequest performs an HTTP client request and records the duration in a histogram. -func TimeRequest(ctx context.Context, operation string, coll instrument.Collector, client Requester, request *http.Request) (*http.Response, error) { - var response *http.Response - doRequest := func(_ context.Context) error { - var err error - response, err = client.Do(request) // nolint:bodyclose - return err - } - toStatusCode := func(err error) string { - if err == nil { - return strconv.Itoa(response.StatusCode) - } - return "error" - } - err := instrument.CollectedRequest(ctx, fmt.Sprintf("%s %s", request.Method, operation), - coll, toStatusCode, doRequest) - return response, err -} - -// TracedClient instruments a request with tracing. It implements Requester. -type TracedClient struct { - client Requester - tracer tracing.Tracer - name string -} - -func NewTracedClient(client Requester, tracer tracing.Tracer, name string) *TracedClient { - return &TracedClient{ - client: client, - tracer: tracer, - name: name, - } -} - -// Do executes the request. -func (c TracedClient) Do(r *http.Request) (*http.Response, error) { - ctx, span := c.tracer.Start(r.Context(), c.name, trace.WithSpanKind(trace.SpanKindClient)) - defer span.End() - - span.SetAttributes(semconv.HTTPURL(r.URL.String())) - span.SetAttributes(semconv.HTTPMethod(r.Method)) - - c.tracer.Inject(ctx, r.Header, span) - - r = r.WithContext(ctx) - resp, err := c.client.Do(r) - if err != nil { - span.SetStatus(codes.Error, "request failed") - span.RecordError(err) - } else { - if resp.ContentLength > 0 { - span.SetAttributes(attribute.Int64("http.content_length", resp.ContentLength)) - } - span.SetAttributes(semconv.HTTPStatusCode(resp.StatusCode)) - if resp.StatusCode >= 400 && resp.StatusCode < 600 { - span.RecordError(fmt.Errorf("error with HTTP status code %d", resp.StatusCode)) - } - } - - return resp, err -} - -// RoundTrip implements the RoundTripper interface. -func (c TracedClient) RoundTrip(r *http.Request) (*http.Response, error) { - return c.Do(r) -} diff --git a/pkg/services/ngalert/client/client_test.go b/pkg/services/ngalert/client/client_test.go deleted file mode 100644 index 25d26576e66..00000000000 --- a/pkg/services/ngalert/client/client_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package client - -import ( - "context" - "net/http" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestTimedClient_operationName(t *testing.T) { - r, err := http.NewRequest("GET", "https://weave.test", nil) - assert.NoError(t, err) - - r = r.WithContext(context.WithValue(context.Background(), OperationNameContextKey, "opp")) - c := NewTimedClient(http.DefaultClient, nil) - - assert.Equal(t, "opp", c.operationName(r)) -} - -func TestTimedClient_operationName_Default(t *testing.T) { - r, err := http.NewRequest("GET", "https://weave.test/you/know/me", nil) - assert.NoError(t, err) - - r = r.WithContext(context.Background()) - c := NewTimedClient(http.DefaultClient, nil) - - assert.Equal(t, "/you/know/me", c.operationName(r)) -} diff --git a/pkg/services/ngalert/lokiclient/client.go b/pkg/services/ngalert/lokiclient/client.go deleted file mode 100644 index 0b1fca30daf..00000000000 --- a/pkg/services/ngalert/lokiclient/client.go +++ /dev/null @@ -1,333 +0,0 @@ -package lokiclient - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "strconv" - "time" - - "github.com/grafana/dskit/instrument" - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/services/ngalert/client" - "github.com/grafana/grafana/pkg/setting" - "github.com/prometheus/client_golang/prometheus" -) - -const defaultPageSize = 1000 -const maximumPageSize = 5000 - -func NewRequester() client.Requester { - return &http.Client{} -} - -// encoder serializes log streams to some byte format. -type encoder interface { - // encode serializes a set of log streams to bytes. - encode(s []Stream) ([]byte, error) - // headers returns a set of HTTP-style headers that describes the encoding scheme used. - headers() map[string]string -} - -type LokiConfig struct { - ReadPathURL *url.URL - WritePathURL *url.URL - BasicAuthUser string - BasicAuthPassword string - TenantID string - ExternalLabels map[string]string - Encoder encoder - MaxQueryLength time.Duration - MaxQuerySize int -} - -func NewLokiConfig(cfg setting.UnifiedAlertingLokiSettings) (LokiConfig, error) { - read, write := cfg.LokiReadURL, cfg.LokiWriteURL - if read == "" { - read = cfg.LokiRemoteURL - } - if write == "" { - write = cfg.LokiRemoteURL - } - - if read == "" { - return LokiConfig{}, fmt.Errorf("either read path URL or remote Loki URL must be provided") - } - if write == "" { - return LokiConfig{}, fmt.Errorf("either write path URL or remote Loki URL must be provided") - } - - readURL, err := url.Parse(read) - if err != nil { - return LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err) - } - writeURL, err := url.Parse(write) - if err != nil { - return LokiConfig{}, fmt.Errorf("failed to parse loki remote write URL: %w", err) - } - - return LokiConfig{ - ReadPathURL: readURL, - WritePathURL: writeURL, - BasicAuthUser: cfg.LokiBasicAuthUsername, - BasicAuthPassword: cfg.LokiBasicAuthPassword, - TenantID: cfg.LokiTenantID, - ExternalLabels: cfg.ExternalLabels, - MaxQueryLength: cfg.LokiMaxQueryLength, - MaxQuerySize: cfg.LokiMaxQuerySize, - // Snappy-compressed protobuf is the default, same goes for Promtail. - Encoder: SnappyProtoEncoder{}, - }, nil -} - -type HttpLokiClient struct { - client client.Requester - encoder encoder - cfg LokiConfig - bytesWritten prometheus.Counter - log log.Logger -} - -// Kind of Operation (=, !=, =~, !~) -type Operator string - -const ( - // Equal operator (=) - Eq Operator = "=" - // Not Equal operator (!=) - Neq Operator = "!=" - // Equal operator supporting RegEx (=~) - EqRegEx Operator = "=~" - // Not Equal operator supporting RegEx (!~) - NeqRegEx Operator = "!~" -) - -func NewLokiClient(cfg LokiConfig, req client.Requester, bytesWritten prometheus.Counter, writeDuration *instrument.HistogramCollector, logger log.Logger, tracer tracing.Tracer, spanName string) *HttpLokiClient { - tc := client.NewTimedClient(req, writeDuration) - trc := client.NewTracedClient(tc, tracer, spanName) - return &HttpLokiClient{ - client: trc, - encoder: cfg.Encoder, - cfg: cfg, - bytesWritten: bytesWritten, - log: logger.New("protocol", "http"), - } -} - -func (c *HttpLokiClient) Ping(ctx context.Context) error { - log := c.log.FromContext(ctx) - uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels") - req, err := http.NewRequest(http.MethodGet, uri.String(), nil) - if err != nil { - return fmt.Errorf("error creating request: %w", err) - } - c.setAuthAndTenantHeaders(req) - - req = req.WithContext(ctx) - res, err := c.client.Do(req) - if res != nil { - defer func() { - if err := res.Body.Close(); err != nil { - log.Warn("Failed to close response body", "err", err) - } - }() - } - if err != nil { - return fmt.Errorf("error sending request: %w", err) - } - - if res.StatusCode < 200 || res.StatusCode >= 300 { - return fmt.Errorf("ping request to loki endpoint returned a non-200 status code: %d", res.StatusCode) - } - log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode) - return nil -} - -type Stream struct { - Stream map[string]string `json:"stream"` - Values []Sample `json:"values"` -} - -type Sample struct { - T time.Time - V string -} - -func (r *Sample) MarshalJSON() ([]byte, error) { - return json.Marshal([2]string{ - fmt.Sprintf("%d", r.T.UnixNano()), r.V, - }) -} - -func (r *Sample) UnmarshalJSON(b []byte) error { - // A Loki stream sample is formatted like a list with two elements, [At, Val] - // At is a string wrapping a timestamp, in nanosecond unix epoch. - // Val is a string containing the log line. - var tuple [2]string - if err := json.Unmarshal(b, &tuple); err != nil { - return fmt.Errorf("failed to deserialize sample in Loki response: %w", err) - } - nano, err := strconv.ParseInt(tuple[0], 10, 64) - if err != nil { - return fmt.Errorf("timestamp in Loki sample not convertible to nanosecond epoch: %v", tuple[0]) - } - r.T = time.Unix(0, nano) - r.V = tuple[1] - return nil -} - -func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error { - log := c.log.FromContext(ctx) - enc, err := c.encoder.encode(s) - if err != nil { - return err - } - - uri := c.cfg.WritePathURL.JoinPath("/loki/api/v1/push") - req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(enc)) - if err != nil { - return fmt.Errorf("failed to create Loki request: %w", err) - } - - c.setAuthAndTenantHeaders(req) - for k, v := range c.encoder.headers() { - req.Header.Add(k, v) - } - - c.bytesWritten.Add(float64(len(enc))) - req = req.WithContext(ctx) - resp, err := c.client.Do(req) - if err != nil { - return fmt.Errorf("failed to send request: %w", err) - } - defer func() { - if err := resp.Body.Close(); err != nil { - log.Warn("Failed to close response body", "err", err) - } - }() - - _, err = c.handleLokiResponse(log, resp) - if err != nil { - return err - } - - return nil -} - -func (c *HttpLokiClient) setAuthAndTenantHeaders(req *http.Request) { - if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" { - req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword) - } - - if c.cfg.TenantID != "" { - req.Header.Add("X-Scope-OrgID", c.cfg.TenantID) - } -} - -func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) { - log := c.log.FromContext(ctx) - // Run the pre-flight checks for the query. - if start > end { - return QueryRes{}, fmt.Errorf("start time cannot be after end time") - } - start, end = ClampRange(start, end, c.cfg.MaxQueryLength.Nanoseconds()) - if limit < 1 { - limit = defaultPageSize - } - if limit > maximumPageSize { - limit = maximumPageSize - } - - queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range") - - values := url.Values{} - values.Set("query", logQL) - values.Set("start", fmt.Sprintf("%d", start)) - values.Set("end", fmt.Sprintf("%d", end)) - values.Set("limit", fmt.Sprintf("%d", limit)) - - queryURL.RawQuery = values.Encode() - log.Debug("Sending query request", "query", logQL, "start", start, "end", end, "limit", limit) - req, err := http.NewRequest(http.MethodGet, - queryURL.String(), nil) - if err != nil { - return QueryRes{}, fmt.Errorf("error creating request: %w", err) - } - - req = req.WithContext(ctx) - c.setAuthAndTenantHeaders(req) - - res, err := c.client.Do(req) - if err != nil { - return QueryRes{}, fmt.Errorf("error executing request: %w", err) - } - defer func() { - if err := res.Body.Close(); err != nil { - log.Warn("Failed to close response body", "err", err) - } - }() - - data, err := c.handleLokiResponse(log, res) - if err != nil { - return QueryRes{}, err - } - - result := QueryRes{} - err = json.Unmarshal(data, &result) - if err != nil { - fmt.Println(string(data)) - return QueryRes{}, fmt.Errorf("error parsing request response: %w", err) - } - - return result, nil -} - -func (c *HttpLokiClient) MaxQuerySize() int { - return c.cfg.MaxQuerySize -} - -type QueryRes struct { - Data QueryData `json:"data"` -} - -type QueryData struct { - Result []Stream `json:"result"` -} - -func (c *HttpLokiClient) handleLokiResponse(log log.Logger, res *http.Response) ([]byte, error) { - if res == nil { - return nil, fmt.Errorf("response is nil") - } - - data, err := io.ReadAll(res.Body) - if err != nil { - return nil, fmt.Errorf("error reading request response: %w", err) - } - - if res.StatusCode < 200 || res.StatusCode >= 300 { - if len(data) > 0 { - log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode) - } else { - log.Error("Error response from Loki with an empty body", "status", res.StatusCode) - } - return nil, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) - } - - return data, nil -} - -// ClampRange ensures that the time range is within the configured maximum query length. -func ClampRange(start, end, maxTimeRange int64) (newStart int64, newEnd int64) { - newStart, newEnd = start, end - - if maxTimeRange != 0 && end-start > maxTimeRange { - newStart = end - maxTimeRange - } - - return newStart, newEnd -} diff --git a/pkg/services/ngalert/lokiclient/client_test.go b/pkg/services/ngalert/lokiclient/client_test.go deleted file mode 100644 index 5568ec1601c..00000000000 --- a/pkg/services/ngalert/lokiclient/client_test.go +++ /dev/null @@ -1,409 +0,0 @@ -package lokiclient - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "testing" - "time" - - "github.com/grafana/grafana/pkg/services/ngalert/client" - "github.com/grafana/grafana/pkg/services/ngalert/metrics" - "github.com/grafana/grafana/pkg/setting" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/tracing" -) - -const lokiClientSpanName = "testLokiClientSpanName" - -func TestLokiConfig(t *testing.T) { - t.Run("test URL options", func(t *testing.T) { - type testCase struct { - name string - in setting.UnifiedAlertingLokiSettings - expRead string - expWrite string - expErr string - } - - cases := []testCase{ - { - name: "remote url only", - in: setting.UnifiedAlertingLokiSettings{ - LokiRemoteURL: "http://url.com", - }, - expRead: "http://url.com", - expWrite: "http://url.com", - }, - { - name: "separate urls", - in: setting.UnifiedAlertingLokiSettings{ - LokiReadURL: "http://read.url.com", - LokiWriteURL: "http://write.url.com", - }, - expRead: "http://read.url.com", - expWrite: "http://write.url.com", - }, - { - name: "single fallback", - in: setting.UnifiedAlertingLokiSettings{ - LokiRemoteURL: "http://url.com", - LokiReadURL: "http://read.url.com", - }, - expRead: "http://read.url.com", - expWrite: "http://url.com", - }, - { - name: "missing read", - in: setting.UnifiedAlertingLokiSettings{ - LokiWriteURL: "http://url.com", - }, - expErr: "either read path URL or remote", - }, - { - name: "missing write", - in: setting.UnifiedAlertingLokiSettings{ - LokiReadURL: "http://url.com", - }, - expErr: "either write path URL or remote", - }, - { - name: "invalid", - in: setting.UnifiedAlertingLokiSettings{ - LokiRemoteURL: "://://", - }, - expErr: "failed to parse", - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - res, err := NewLokiConfig(tc.in) - if tc.expErr != "" { - require.ErrorContains(t, err, tc.expErr) - } else { - require.Equal(t, tc.expRead, res.ReadPathURL.String()) - require.Equal(t, tc.expWrite, res.WritePathURL.String()) - } - }) - } - }) - - t.Run("captures external labels", func(t *testing.T) { - set := setting.UnifiedAlertingLokiSettings{ - LokiRemoteURL: "http://url.com", - ExternalLabels: map[string]string{"a": "b"}, - } - - res, err := NewLokiConfig(set) - - require.NoError(t, err) - require.Contains(t, res.ExternalLabels, "a") - }) -} - -func TestLokiHTTPClient(t *testing.T) { - t.Run("push formats expected data", func(t *testing.T) { - req := NewFakeRequester() - client := createTestLokiClient(req) - now := time.Now().UTC() - data := []Stream{ - { - Stream: map[string]string{}, - Values: []Sample{ - { - T: now, - V: "some line", - }, - }, - }, - } - - err := client.Push(context.Background(), data) - - require.NoError(t, err) - require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path) - sent := reqBody(t, req.LastRequest) - exp := fmt.Sprintf(`{"streams": [{"stream": {}, "values": [["%d", "some line"]]}]}`, now.UnixNano()) - require.JSONEq(t, exp, sent) - }) - - t.Run("range query", func(t *testing.T) { - t.Run("passes along page size", func(t *testing.T) { - req := NewFakeRequester().WithResponse(&http.Response{ - Status: "200 OK", - StatusCode: 200, - Body: io.NopCloser(bytes.NewBufferString(`{}`)), - ContentLength: int64(0), - Header: make(http.Header, 0), - }) - client := createTestLokiClient(req) - now := time.Now().UTC().UnixNano() - q := `{from="state-history"}` - - _, err := client.RangeQuery(context.Background(), q, now-100, now, 1100) - - require.NoError(t, err) - params := req.LastRequest.URL.Query() - require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) - require.Equal(t, fmt.Sprint(1100), params.Get("limit")) - }) - - t.Run("uses default page size if limit not provided", func(t *testing.T) { - req := NewFakeRequester().WithResponse(&http.Response{ - Status: "200 OK", - StatusCode: 200, - Body: io.NopCloser(bytes.NewBufferString(`{}`)), - ContentLength: int64(0), - Header: make(http.Header, 0), - }) - client := createTestLokiClient(req) - now := time.Now().UTC().UnixNano() - q := `{from="state-history"}` - - _, err := client.RangeQuery(context.Background(), q, now-100, now, 0) - - require.NoError(t, err) - params := req.LastRequest.URL.Query() - require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) - require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit")) - }) - - t.Run("uses default page size if limit invalid", func(t *testing.T) { - req := NewFakeRequester().WithResponse(&http.Response{ - Status: "200 OK", - StatusCode: 200, - Body: io.NopCloser(bytes.NewBufferString(`{}`)), - ContentLength: int64(0), - Header: make(http.Header, 0), - }) - client := createTestLokiClient(req) - now := time.Now().UTC().UnixNano() - q := `{from="state-history"}` - - _, err := client.RangeQuery(context.Background(), q, now-100, now, -100) - - require.NoError(t, err) - params := req.LastRequest.URL.Query() - require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) - require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit")) - }) - - t.Run("uses maximum page size if limit too big", func(t *testing.T) { - req := NewFakeRequester().WithResponse(&http.Response{ - Status: "200 OK", - StatusCode: 200, - Body: io.NopCloser(bytes.NewBufferString(`{}`)), - ContentLength: int64(0), - Header: make(http.Header, 0), - }) - client := createTestLokiClient(req) - now := time.Now().UTC().UnixNano() - q := `{from="state-history"}` - - _, err := client.RangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000) - - require.NoError(t, err) - params := req.LastRequest.URL.Query() - require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) - require.Equal(t, fmt.Sprint(maximumPageSize), params.Get("limit")) - }) - }) -} - -// This function can be used for local testing, just remove the skip call. -func TestLokiHTTPClient_Manual(t *testing.T) { - t.Skip() - - t.Run("smoke test pinging Loki", func(t *testing.T) { - url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") - require.NoError(t, err) - - metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) - client := NewLokiClient(LokiConfig{ - ReadPathURL: url, - WritePathURL: url, - Encoder: JsonEncoder{}, - }, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName) - - // Unauthorized request should fail against Grafana Cloud. - err = client.Ping(context.Background()) - require.Error(t, err) - - client.cfg.BasicAuthUser = "" - client.cfg.BasicAuthPassword = "" - - // When running on prem, you might need to set the tenant id, - // so the x-scope-orgid header is set. - // client.cfg.TenantID = "" - - // Authorized request should not fail against Grafana Cloud. - err = client.Ping(context.Background()) - require.NoError(t, err) - }) - - t.Run("smoke test range querying Loki", func(t *testing.T) { - url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") - require.NoError(t, err) - - metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) - client := NewLokiClient(LokiConfig{ - ReadPathURL: url, - WritePathURL: url, - BasicAuthUser: "", - BasicAuthPassword: "", - Encoder: JsonEncoder{}, - }, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName) - - // When running on prem, you might need to set the tenant id, - // so the x-scope-orgid header is set. - // client.cfg.TenantID = "" - - logQL := `{probe="Paris"}` - - // Define the query time range - start := time.Now().Add(-30 * time.Minute).UnixNano() - end := time.Now().UnixNano() - - // Authorized request should not fail against Grafana Cloud. - res, err := client.RangeQuery(context.Background(), logQL, start, end, defaultPageSize) - require.NoError(t, err) - require.NotNil(t, res) - }) -} - -func TestRow(t *testing.T) { - t.Run("marshal", func(t *testing.T) { - row := Sample{ - T: time.Unix(0, 1234), - V: "some sample", - } - - jsn, err := json.Marshal(&row) - - require.NoError(t, err) - require.JSONEq(t, `["1234", "some sample"]`, string(jsn)) - }) - - t.Run("unmarshal", func(t *testing.T) { - jsn := []byte(`["1234", "some sample"]`) - - row := Sample{} - err := json.Unmarshal(jsn, &row) - - require.NoError(t, err) - require.Equal(t, int64(1234), row.T.UnixNano()) - require.Equal(t, "some sample", row.V) - }) - - t.Run("unmarshal invalid", func(t *testing.T) { - jsn := []byte(`{"key": "wrong shape"}`) - - row := Sample{} - err := json.Unmarshal(jsn, &row) - - require.ErrorContains(t, err, "failed to deserialize sample") - }) - - t.Run("unmarshal bad timestamp", func(t *testing.T) { - jsn := []byte(`["not-unix-nano", "some sample"]`) - - row := Sample{} - err := json.Unmarshal(jsn, &row) - - require.ErrorContains(t, err, "timestamp in Loki sample") - }) -} - -func TestStream(t *testing.T) { - t.Run("marshal", func(t *testing.T) { - stream := Stream{ - Stream: map[string]string{"a": "b"}, - Values: []Sample{ - {T: time.Unix(0, 1), V: "one"}, - {T: time.Unix(0, 2), V: "two"}, - }, - } - - jsn, err := json.Marshal(stream) - - require.NoError(t, err) - require.JSONEq( - t, - `{"stream": {"a": "b"}, "values": [["1", "one"], ["2", "two"]]}`, - string(jsn), - ) - }) -} - -func TestClampRange(t *testing.T) { - tc := []struct { - name string - oldRange []int64 - max int64 - newRange []int64 - }{ - { - name: "clamps start value if max is smaller than range", - oldRange: []int64{5, 10}, - max: 1, - newRange: []int64{9, 10}, - }, - { - name: "returns same values if max is greater than range", - oldRange: []int64{5, 10}, - max: 20, - newRange: []int64{5, 10}, - }, - { - name: "returns same values if max is equal to range", - oldRange: []int64{5, 10}, - max: 5, - newRange: []int64{5, 10}, - }, - { - name: "returns same values if max is zero", - oldRange: []int64{5, 10}, - max: 0, - newRange: []int64{5, 10}, - }, - } - - for _, c := range tc { - t.Run(c.name, func(t *testing.T) { - start, end := ClampRange(c.oldRange[0], c.oldRange[1], c.max) - - require.Equal(t, c.newRange[0], start) - require.Equal(t, c.newRange[1], end) - }) - } -} - -func createTestLokiClient(req client.Requester) *HttpLokiClient { - url, _ := url.Parse("http://some.url") - cfg := LokiConfig{ - WritePathURL: url, - ReadPathURL: url, - Encoder: JsonEncoder{}, - } - met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) - return NewLokiClient(cfg, req, met.BytesWritten, met.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName) -} - -func reqBody(t *testing.T, req *http.Request) string { - t.Helper() - - defer func() { - _ = req.Body.Close() - }() - byt, err := io.ReadAll(req.Body) - require.NoError(t, err) - return string(byt) -} diff --git a/pkg/services/ngalert/lokiclient/encode.go b/pkg/services/ngalert/lokiclient/encode.go deleted file mode 100644 index 16dd827f6c5..00000000000 --- a/pkg/services/ngalert/lokiclient/encode.go +++ /dev/null @@ -1,107 +0,0 @@ -package lokiclient - -import ( - "encoding/json" - "fmt" - "slices" - "strconv" - "strings" - - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" - "github.com/prometheus/common/model" - - "github.com/grafana/grafana/pkg/components/loki/logproto" -) - -type JsonEncoder struct{} - -func (e JsonEncoder) encode(s []Stream) ([]byte, error) { - body := struct { - Streams []Stream `json:"streams"` - }{Streams: s} - enc, err := json.Marshal(body) - if err != nil { - return nil, fmt.Errorf("failed to serialize Loki payload: %w", err) - } - return enc, nil -} - -func (e JsonEncoder) headers() map[string]string { - return map[string]string{ - "Content-Type": "application/json", - } -} - -type SnappyProtoEncoder struct{} - -func (e SnappyProtoEncoder) encode(s []Stream) ([]byte, error) { - body := logproto.PushRequest{ - Streams: make([]logproto.Stream, 0, len(s)), - } - - for _, str := range s { - entries := make([]logproto.Entry, 0, len(str.Values)) - for _, sample := range str.Values { - entries = append(entries, logproto.Entry{ - Timestamp: sample.T, - Line: sample.V, - }) - } - body.Streams = append(body.Streams, logproto.Stream{ - Labels: labelsMapToString(str.Stream, ""), - Entries: entries, - // Hash seems to be mainly used for query responses. Promtail does not seem to calculate this field on push. - }) - } - - buf, err := proto.Marshal(&body) - if err != nil { - return nil, fmt.Errorf("failed to serialize Loki payload to proto: %w", err) - } - buf = snappy.Encode(nil, buf) - return buf, nil -} - -func (e SnappyProtoEncoder) headers() map[string]string { - return map[string]string{ - "Content-Type": "application/x-protobuf", - "Content-Encoding": "snappy", - } -} - -// Copied from promtail. -// Modified slightly to work in terms of plain map[string]string to avoid some unnecessary copies and type casts. -// TODO: pkg/components/loki/lokihttp/batch.go contains an older (loki 2.7.4 released) version of this. -// TODO: Consider replacing that one, with this one. -func labelsMapToString(ls map[string]string, without model.LabelName) string { - var b strings.Builder - totalSize := 2 - lstrs := make([]string, 0, len(ls)) - - for l, v := range ls { - if l == string(without) { - continue - } - - lstrs = append(lstrs, l) - // guess size increase: 2 for `, ` between labels and 3 for the `=` and quotes around label value - totalSize += len(l) + 2 + len(v) + 3 - } - - b.Grow(totalSize) - b.WriteByte('{') - slices.Sort(lstrs) - for i, l := range lstrs { - if i > 0 { - b.WriteString(", ") - } - - b.WriteString(l) - b.WriteString(`=`) - b.WriteString(strconv.Quote(ls[l])) - } - b.WriteByte('}') - - return b.String() -} diff --git a/pkg/services/ngalert/lokiclient/testing.go b/pkg/services/ngalert/lokiclient/testing.go deleted file mode 100644 index 8d3cd097f0c..00000000000 --- a/pkg/services/ngalert/lokiclient/testing.go +++ /dev/null @@ -1,45 +0,0 @@ -package lokiclient - -import ( - "bytes" - "io" - "net/http" -) - -type FakeRequester struct { - LastRequest *http.Request - Resp *http.Response -} - -func NewFakeRequester() *FakeRequester { - return &FakeRequester{ - Resp: &http.Response{ - Status: "200 OK", - StatusCode: 200, - Body: io.NopCloser(bytes.NewBufferString("")), - ContentLength: int64(0), - Header: make(http.Header, 0), - }, - } -} - -func (f *FakeRequester) WithResponse(resp *http.Response) *FakeRequester { - f.Resp = resp - return f -} - -func (f *FakeRequester) Do(req *http.Request) (*http.Response, error) { - f.LastRequest = req - f.Resp.Request = req // Not concurrency-safe! - return f.Resp, nil -} - -func BadResponse() *http.Response { - return &http.Response{ - Status: "400 Bad Request", - StatusCode: http.StatusBadRequest, - Body: io.NopCloser(bytes.NewBufferString("")), - ContentLength: int64(0), - Header: make(http.Header, 0), - } -} diff --git a/pkg/services/ngalert/lokiconfig/lokiconfig.go b/pkg/services/ngalert/lokiconfig/lokiconfig.go new file mode 100644 index 00000000000..c99f3731492 --- /dev/null +++ b/pkg/services/ngalert/lokiconfig/lokiconfig.go @@ -0,0 +1,48 @@ +package lokiconfig + +import ( + "fmt" + "net/url" + + "github.com/grafana/alerting/notify/historian/lokiclient" + "github.com/grafana/grafana/pkg/setting" +) + +func NewLokiConfig(cfg setting.UnifiedAlertingLokiSettings) (lokiclient.LokiConfig, error) { + read, write := cfg.LokiReadURL, cfg.LokiWriteURL + if read == "" { + read = cfg.LokiRemoteURL + } + if write == "" { + write = cfg.LokiRemoteURL + } + + if read == "" { + return lokiclient.LokiConfig{}, fmt.Errorf("either read path URL or remote Loki URL must be provided") + } + if write == "" { + return lokiclient.LokiConfig{}, fmt.Errorf("either write path URL or remote Loki URL must be provided") + } + + readURL, err := url.Parse(read) + if err != nil { + return lokiclient.LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err) + } + writeURL, err := url.Parse(write) + if err != nil { + return lokiclient.LokiConfig{}, fmt.Errorf("failed to parse loki remote write URL: %w", err) + } + + return lokiclient.LokiConfig{ + ReadPathURL: readURL, + WritePathURL: writeURL, + BasicAuthUser: cfg.LokiBasicAuthUsername, + BasicAuthPassword: cfg.LokiBasicAuthPassword, + TenantID: cfg.LokiTenantID, + ExternalLabels: cfg.ExternalLabels, + MaxQueryLength: cfg.LokiMaxQueryLength, + MaxQuerySize: cfg.LokiMaxQuerySize, + // Snappy-compressed protobuf is the default, same goes for Promtail. + Encoder: lokiclient.SnappyProtoEncoder{}, + }, nil +} diff --git a/pkg/services/ngalert/lokiconfig/lokiconfig_test.go b/pkg/services/ngalert/lokiconfig/lokiconfig_test.go new file mode 100644 index 00000000000..7c2bc0ef1b8 --- /dev/null +++ b/pkg/services/ngalert/lokiconfig/lokiconfig_test.go @@ -0,0 +1,94 @@ +package lokiconfig + +import ( + "testing" + + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/require" +) + +func TestLokiConfig(t *testing.T) { + t.Run("test URL options", func(t *testing.T) { + type testCase struct { + name string + in setting.UnifiedAlertingLokiSettings + expRead string + expWrite string + expErr string + } + + cases := []testCase{ + { + name: "remote url only", + in: setting.UnifiedAlertingLokiSettings{ + LokiRemoteURL: "http://url.com", + }, + expRead: "http://url.com", + expWrite: "http://url.com", + }, + { + name: "separate urls", + in: setting.UnifiedAlertingLokiSettings{ + LokiReadURL: "http://read.url.com", + LokiWriteURL: "http://write.url.com", + }, + expRead: "http://read.url.com", + expWrite: "http://write.url.com", + }, + { + name: "single fallback", + in: setting.UnifiedAlertingLokiSettings{ + LokiRemoteURL: "http://url.com", + LokiReadURL: "http://read.url.com", + }, + expRead: "http://read.url.com", + expWrite: "http://url.com", + }, + { + name: "missing read", + in: setting.UnifiedAlertingLokiSettings{ + LokiWriteURL: "http://url.com", + }, + expErr: "either read path URL or remote", + }, + { + name: "missing write", + in: setting.UnifiedAlertingLokiSettings{ + LokiReadURL: "http://url.com", + }, + expErr: "either write path URL or remote", + }, + { + name: "invalid", + in: setting.UnifiedAlertingLokiSettings{ + LokiRemoteURL: "://://", + }, + expErr: "failed to parse", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res, err := NewLokiConfig(tc.in) + if tc.expErr != "" { + require.ErrorContains(t, err, tc.expErr) + } else { + require.Equal(t, tc.expRead, res.ReadPathURL.String()) + require.Equal(t, tc.expWrite, res.WritePathURL.String()) + } + }) + } + }) + + t.Run("captures external labels", func(t *testing.T) { + set := setting.UnifiedAlertingLokiSettings{ + LokiRemoteURL: "http://url.com", + ExternalLabels: map[string]string{"a": "b"}, + } + + res, err := NewLokiConfig(set) + + require.NoError(t, err) + require.Contains(t, res.ExternalLabels, "a") + }) +} diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 511980684e6..3dd1b410c86 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -7,8 +7,10 @@ import ( "time" "github.com/benbjohnson/clock" + notificationHistorian "github.com/grafana/alerting/notify/historian" + "github.com/grafana/alerting/notify/historian/lokiclient" "github.com/grafana/alerting/notify/nfstatus" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" + "github.com/grafana/grafana/pkg/services/ngalert/lokiconfig" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/matchers/compat" "golang.org/x/sync/errgroup" @@ -659,7 +661,7 @@ func configureHistorianBackend( return historian.NewAnnotationBackend(annotationBackendLogger, store, rs, met, ac), nil } if backend == historian.BackendTypeLoki { - lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings) + lcfg, err := lokiconfig.NewLokiConfig(cfg.LokiSettings) if err != nil { return nil, fmt.Errorf("invalid remote loki configuration: %w", err) } @@ -712,20 +714,20 @@ func configureNotificationHistorian( } met.Info.Set(1) - lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings) + lcfg, err := lokiconfig.NewLokiConfig(cfg.LokiSettings) if err != nil { return nil, fmt.Errorf("invalid remote loki configuration: %w", err) } req := lokiclient.NewRequester() logger := log.New("ngalert.notifier.historian").FromContext(ctx) - notificationHistorian := notifier.NewNotificationHistorian(logger, lcfg, req, met, tracer) + nh := notificationHistorian.NewNotificationHistorian(logger, lcfg, req, met.BytesWritten, met.WriteDuration, met.WritesTotal, met.WritesFailed, tracer) testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) defer cancelFunc() - if err := notificationHistorian.TestConnection(testConnCtx); err != nil { + if err := nh.TestConnection(testConnCtx); err != nil { l.Error("Failed to communicate with configured remote Loki backend, notification history may not be persisted", "error", err) } - return notificationHistorian, nil + return nh, nil } func createRecordingWriter(settings setting.RecordingRuleSettings, httpClientProvider httpclient.Provider, datasourceService datasources.DataSourceService, pluginContextProvider *plugincontext.Provider, clock clock.Clock, m *metrics.RemoteWriter) (schedule.RecordingWriter, error) { diff --git a/pkg/services/ngalert/notifier/historian.go b/pkg/services/ngalert/notifier/historian.go deleted file mode 100644 index f56175de20c..00000000000 --- a/pkg/services/ngalert/notifier/historian.go +++ /dev/null @@ -1,190 +0,0 @@ -package notifier - -import ( - "context" - "encoding/json" - "fmt" - "strings" - "time" - - alertingModels "github.com/grafana/alerting/models" - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/services/ngalert/client" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" - "github.com/grafana/grafana/pkg/services/ngalert/metrics" - "github.com/prometheus/alertmanager/notify" - "github.com/prometheus/alertmanager/types" - prometheusModel "github.com/prometheus/common/model" - "go.opentelemetry.io/otel/trace" -) - -const LokiClientSpanName = "ngalert.notification-historian.client" -const NotificationHistoryWriteTimeout = time.Minute -const NotificationHistoryKey = "from" -const NotificationHistoryLabelValue = "notify-history" - -type NotificationHistoryLokiEntry struct { - SchemaVersion int `json:"schemaVersion"` - Receiver string `json:"receiver"` - Status string `json:"status"` - GroupLabels map[string]string `json:"groupLabels"` - Alerts []NotificationHistoryLokiEntryAlert `json:"alerts"` - Retry bool `json:"retry"` - Error string `json:"error,omitempty"` - Duration int64 `json:"duration"` -} - -type NotificationHistoryLokiEntryAlert struct { - Status string `json:"status"` - Labels map[string]string `json:"labels"` - Annotations map[string]string `json:"annotations"` - StartsAt time.Time `json:"startsAt"` - EndsAt time.Time `json:"endsAt"` - RuleUID string `json:"ruleUID"` -} - -type remoteLokiClient interface { - Ping(context.Context) error - Push(context.Context, []lokiclient.Stream) error -} - -type NotificationHistorian struct { - client remoteLokiClient - externalLabels map[string]string - metrics *metrics.NotificationHistorian - log log.Logger -} - -func NewNotificationHistorian(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.NotificationHistorian, tracer tracing.Tracer) *NotificationHistorian { - return &NotificationHistorian{ - client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName), - externalLabels: cfg.ExternalLabels, - metrics: metrics, - log: logger, - } -} - -func (h *NotificationHistorian) TestConnection(ctx context.Context) error { - return h.client.Ping(ctx) -} - -func (h *NotificationHistorian) Record(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) <-chan error { - stream, err := h.prepareStream(ctx, alerts, retry, notificationErr, duration) - logger := h.log.FromContext(ctx) - errCh := make(chan error, 1) - if err != nil { - logger.Error("Failed to convert notification history to stream", "error", err) - errCh <- fmt.Errorf("failed to convert notification history to stream: %w", err) - close(errCh) - return errCh - } - - // This is a new background job, so let's create a new context for it. - // We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work - // immediately but rather try to flush writes. - // This also prevents timeouts or other lingering objects (like transactions) from being - // incorrectly propagated here from other areas. - writeCtx := context.Background() - writeCtx, cancel := context.WithTimeout(writeCtx, NotificationHistoryWriteTimeout) - writeCtx = trace.ContextWithSpan(writeCtx, trace.SpanFromContext(ctx)) - - go func(ctx context.Context) { - defer cancel() - defer close(errCh) - logger := h.log.FromContext(ctx) - logger.Debug("Saving notification history") - h.metrics.WritesTotal.Inc() - - if err := h.recordStream(ctx, stream, logger); err != nil { - logger.Error("Failed to save notification history", "error", err) - h.metrics.WritesFailed.Inc() - errCh <- fmt.Errorf("failed to save notification history: %w", err) - } - }(writeCtx) - return errCh -} - -func (h *NotificationHistorian) prepareStream(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) (lokiclient.Stream, error) { - receiverName, ok := notify.ReceiverName(ctx) - if !ok { - return lokiclient.Stream{}, fmt.Errorf("receiver name not found in context") - } - groupLabels, ok := notify.GroupLabels(ctx) - if !ok { - return lokiclient.Stream{}, fmt.Errorf("group labels not found in context") - } - now, ok := notify.Now(ctx) - if !ok { - return lokiclient.Stream{}, fmt.Errorf("now not found in context") - } - - entryAlerts := make([]NotificationHistoryLokiEntryAlert, len(alerts)) - for i, alert := range alerts { - labels := prepareLabels(alert.Labels) - annotations := prepareLabels(alert.Annotations) - entryAlerts[i] = NotificationHistoryLokiEntryAlert{ - Labels: labels, - Annotations: annotations, - Status: string(alert.StatusAt(now)), - StartsAt: alert.StartsAt, - EndsAt: alert.EndsAt, - RuleUID: string(alert.Labels[alertingModels.RuleUIDLabel]), - } - } - - notificationErrStr := "" - if notificationErr != nil { - notificationErrStr = notificationErr.Error() - } - - entry := NotificationHistoryLokiEntry{ - SchemaVersion: 1, - Receiver: receiverName, - Status: string(types.Alerts(alerts...).StatusAt(now)), - GroupLabels: prepareLabels(groupLabels), - Alerts: entryAlerts, - Retry: retry, - Error: notificationErrStr, - Duration: duration.Milliseconds(), - } - - entryJSON, err := json.Marshal(entry) - if err != nil { - return lokiclient.Stream{}, err - } - - streamLabels := make(map[string]string) - streamLabels[NotificationHistoryKey] = NotificationHistoryLabelValue - for k, v := range h.externalLabels { - streamLabels[k] = v - } - - return lokiclient.Stream{ - Stream: streamLabels, - Values: []lokiclient.Sample{ - { - T: now, - V: string(entryJSON), - }}, - }, nil -} - -func (h *NotificationHistorian) recordStream(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error { - if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil { - return err - } - logger.Debug("Done saving notification history") - return nil -} - -func prepareLabels(labels prometheusModel.LabelSet) map[string]string { - result := make(map[string]string) - for k, v := range labels { - // Remove private labels - if !strings.HasPrefix(string(k), "__") && !strings.HasSuffix(string(k), "__") { - result[string(k)] = string(v) - } - } - return result -} diff --git a/pkg/services/ngalert/notifier/historian_test.go b/pkg/services/ngalert/notifier/historian_test.go deleted file mode 100644 index 92056499dc3..00000000000 --- a/pkg/services/ngalert/notifier/historian_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package notifier - -import ( - "bytes" - "context" - "errors" - "io" - "net/url" - "testing" - "time" - - alertingModels "github.com/grafana/alerting/models" - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/services/ngalert/client" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" - "github.com/grafana/grafana/pkg/services/ngalert/metrics" - "github.com/prometheus/alertmanager/notify" - "github.com/prometheus/alertmanager/types" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" -) - -var testNow = time.Date(2025, time.July, 15, 16, 55, 0, 0, time.UTC) -var testAlerts = []*types.Alert{ - { - Alert: model.Alert{ - Labels: model.LabelSet{"alertname": "Alert1", alertingModels.RuleUIDLabel: "testRuleUID"}, - Annotations: model.LabelSet{"foo": "bar", "__private__": "baz"}, - StartsAt: testNow, - EndsAt: testNow, - GeneratorURL: "http://localhost/test", - }, - }, -} - -func TestRecord(t *testing.T) { - t.Run("write notification history to Loki", func(t *testing.T) { - testCases := []struct { - name string - retry bool - notificationErr error - expected string - }{ - { - "successful notification", - false, - nil, - "{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":false,\\\"duration\\\":1000}\"]]}]}", - }, - { - "failed notification", - true, - errors.New("test notification error"), - "{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":true,\\\"error\\\":\\\"test notification error\\\",\\\"duration\\\":1000}\"]]}]}", - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - req := lokiclient.NewFakeRequester() - met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry()) - h := createTestNotificationHistorian(req, met) - - err := <-h.Record(recordCtx(), testAlerts, tc.retry, tc.notificationErr, time.Second) - require.NoError(t, err) - - reqBody, err := io.ReadAll(req.LastRequest.Body) - require.NoError(t, err) - require.Equal(t, tc.expected, string(reqBody)) - }) - } - }) - - t.Run("emits expected write metrics", func(t *testing.T) { - reg := prometheus.NewRegistry() - met := metrics.NewNotificationHistorianMetrics(reg) - goodHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester(), met) - badHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met) - - <-goodHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second) - <-badHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second) - - exp := bytes.NewBufferString(` -# HELP grafana_alerting_notification_history_writes_failed_total The total number of failed writes of notification history batches. -# TYPE grafana_alerting_notification_history_writes_failed_total counter -grafana_alerting_notification_history_writes_failed_total 1 -# HELP grafana_alerting_notification_history_writes_total The total number of notification history batches that were attempted to be written. -# TYPE grafana_alerting_notification_history_writes_total counter -grafana_alerting_notification_history_writes_total 2 -`) - err := testutil.GatherAndCompare(reg, exp, - "grafana_alerting_notification_history_writes_total", - "grafana_alerting_notification_history_writes_failed_total", - ) - require.NoError(t, err) - }) - - t.Run("returns error when context is missing required fields", func(t *testing.T) { - req := lokiclient.NewFakeRequester() - met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry()) - h := createTestNotificationHistorian(req, met) - - err := <-h.Record(context.Background(), testAlerts, false, nil, time.Second) - require.Error(t, err) - }) -} - -func createTestNotificationHistorian(req client.Requester, met *metrics.NotificationHistorian) *NotificationHistorian { - writePathURL, _ := url.Parse("http://some.url") - cfg := lokiclient.LokiConfig{ - WritePathURL: writePathURL, - ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"}, - Encoder: lokiclient.JsonEncoder{}, - } - tracer := tracing.InitializeTracerForTest() - return NewNotificationHistorian(log.NewNopLogger(), cfg, req, met, tracer) -} - -func recordCtx() context.Context { - ctx := notify.WithReceiverName(context.Background(), "testReceiverName") - ctx = notify.WithGroupLabels(ctx, model.LabelSet{"foo": "bar"}) - ctx = notify.WithNow(ctx, testNow) - return ctx -} diff --git a/pkg/services/ngalert/remote/client/alertmanager.go b/pkg/services/ngalert/remote/client/alertmanager.go index 52739aab594..60e36ad142b 100644 --- a/pkg/services/ngalert/remote/client/alertmanager.go +++ b/pkg/services/ngalert/remote/client/alertmanager.go @@ -8,9 +8,9 @@ import ( "time" httptransport "github.com/go-openapi/runtime/client" + alertingInstrument "github.com/grafana/alerting/http/instrument" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/services/ngalert/client" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/util/httpclient" amclient "github.com/prometheus/alertmanager/api/v2/client" @@ -29,7 +29,7 @@ type AlertmanagerConfig struct { type Alertmanager struct { *amclient.AlertmanagerAPI - httpClient client.Requester + httpClient alertingInstrument.Requester url *url.URL logger log.Logger } @@ -45,8 +45,8 @@ func NewAlertmanager(cfg *AlertmanagerConfig, metrics *metrics.RemoteAlertmanage Timeout: cfg.Timeout, } - tc := client.NewTimedClient(c, metrics.RequestLatency) - trc := client.NewTracedClient(tc, tracer, "remote.alertmanager.client") + tc := alertingInstrument.NewTimedClient(c, metrics.RequestLatency) + trc := alertingInstrument.NewTracedClient(tc, tracer, "remote.alertmanager.client") apiEndpoint := *cfg.URL // Next, make sure you set the right path. @@ -66,7 +66,7 @@ func NewAlertmanager(cfg *AlertmanagerConfig, metrics *metrics.RemoteAlertmanage // GetAuthedClient returns a client.Requester that includes a configured MimirAuthRoundTripper. // Requests using this client are fully authenticated. -func (am *Alertmanager) GetAuthedClient() client.Requester { +func (am *Alertmanager) GetAuthedClient() alertingInstrument.Requester { return am.httpClient } diff --git a/pkg/services/ngalert/remote/client/mimir.go b/pkg/services/ngalert/remote/client/mimir.go index d8f9a51f327..36a42f364ea 100644 --- a/pkg/services/ngalert/remote/client/mimir.go +++ b/pkg/services/ngalert/remote/client/mimir.go @@ -14,10 +14,11 @@ import ( alertingNotify "github.com/grafana/alerting/notify" + alertingInstrument "github.com/grafana/alerting/http/instrument" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" - "github.com/grafana/grafana/pkg/services/ngalert/client" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/util/httpclient" ) @@ -41,7 +42,7 @@ type MimirClient interface { } type Mimir struct { - client client.Requester + client alertingInstrument.Requester endpoint *url.URL logger log.Logger metrics *metrics.RemoteAlertmanager @@ -98,8 +99,8 @@ func New(cfg *Config, metrics *metrics.RemoteAlertmanager, tracer tracing.Tracer c := &http.Client{ Transport: rt, } - tc := client.NewTimedClient(c, metrics.RequestLatency) - trc := client.NewTracedClient(tc, tracer, "remote.alertmanager.client") + tc := alertingInstrument.NewTimedClient(c, metrics.RequestLatency) + trc := alertingInstrument.NewTracedClient(tc, tracer, "remote.alertmanager.client") return &Mimir{ endpoint: cfg.URL, diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index 4ea2ce2c845..f31b993ad9d 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -12,16 +12,17 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/grafana/alerting/notify/historian/lokiclient" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "go.opentelemetry.io/otel/trace" + alertingInstrument "github.com/grafana/alerting/http/instrument" + "github.com/grafana/grafana/pkg/apimachinery/errutil" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol" - "github.com/grafana/grafana/pkg/services/ngalert/client" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -85,7 +86,7 @@ type RemoteLokiBackend struct { ruleStore RuleStore } -func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend { +func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req alertingInstrument.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend { return &RemoteLokiBackend{ client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName), externalLabels: cfg.ExternalLabels, diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index 08487fb27e7..99d9aff3b79 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -13,13 +13,15 @@ import ( "testing" "time" + "github.com/grafana/alerting/notify/historian/lokiclient" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + alertingInstrument "github.com/grafana/alerting/http/instrument" + "github.com/grafana/alerting/http/instrument/instrumenttest" "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" @@ -27,7 +29,6 @@ import ( "github.com/grafana/grafana/pkg/services/folder" rulesAuthz "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol" acfakes "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol/fakes" - "github.com/grafana/grafana/pkg/services/ngalert/client" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -668,7 +669,7 @@ func TestMerge(t *testing.T) { func TestRecordStates(t *testing.T) { t.Run("writes state transitions to loki", func(t *testing.T) { - req := lokiclient.NewFakeRequester() + req := instrumenttest.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ @@ -685,8 +686,8 @@ func TestRecordStates(t *testing.T) { t.Run("emits expected write metrics", func(t *testing.T) { reg := prometheus.NewRegistry() met := metrics.NewHistorianMetrics(reg, metrics.Subsystem) - loki := createTestLokiBackend(t, lokiclient.NewFakeRequester(), met) - errLoki := createTestLokiBackend(t, lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met) //nolint:bodyclose + loki := createTestLokiBackend(t, instrumenttest.NewFakeRequester(), met) + errLoki := createTestLokiBackend(t, instrumenttest.NewFakeRequester().WithResponse(instrumenttest.BadResponse()), met) //nolint:bodyclose rule := createTestRule() states := singleFromNormal(&state.State{ State: eval.Alerting, @@ -720,7 +721,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 }) t.Run("elides request if nothing to send", func(t *testing.T) { - req := lokiclient.NewFakeRequester() + req := instrumenttest.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := []state.StateTransition{} @@ -732,7 +733,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 }) t.Run("succeeds with special chars in labels", func(t *testing.T) { - req := lokiclient.NewFakeRequester() + req := instrumenttest.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ @@ -755,7 +756,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 }) t.Run("adds external labels to log lines", func(t *testing.T) { - req := lokiclient.NewFakeRequester() + req := instrumenttest.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ @@ -783,7 +784,7 @@ func TestGetFolderUIDsForFilter(t *testing.T) { usr := accesscontrol.BackgroundUser("test", 1, org.RoleNone, nil) createLoki := func(ac AccessControl) *RemoteLokiBackend { - req := lokiclient.NewFakeRequester() + req := instrumenttest.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rules := fakes.NewRuleStore(t) f := make([]*folder.Folder, 0, len(folders)) @@ -923,12 +924,12 @@ func TestGetFolderUIDsForFilter(t *testing.T) { }) } -func createTestLokiBackend(t *testing.T, req client.Requester, met *metrics.Historian) *RemoteLokiBackend { +func createTestLokiBackend(t *testing.T, req alertingInstrument.Requester, met *metrics.Historian) *RemoteLokiBackend { url, _ := url.Parse("http://some.url") cfg := lokiclient.LokiConfig{ WritePathURL: url, ReadPathURL: url, - Encoder: lokiclient.JsonEncoder{}, + Encoder: lokiclient.JSONEncoder{}, ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"}, } lokiBackendLogger := log.New("ngalert.state.historian", "backend", "loki")