mirror of https://github.com/grafana/grafana.git
				
				
				
			Alerting: Allow separate read and write path URLs for Loki state history (#62268)
Extract config parsing and add tests
This commit is contained in:
		
							parent
							
								
									8dab3bf36c
								
							
						
					
					
						commit
						e7ace4ed62
					
				|  | @ -392,16 +392,12 @@ func configureHistorianBackend(ctx context.Context, cfg setting.UnifiedAlertingS | |||
| 		return historian.NewAnnotationBackend(ar, ds, rs), nil | ||||
| 	} | ||||
| 	if cfg.Backend == "loki" { | ||||
| 		baseURL, err := url.Parse(cfg.LokiRemoteURL) | ||||
| 		lcfg, err := historian.NewLokiConfig(cfg) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to parse remote loki URL: %w", err) | ||||
| 			return nil, fmt.Errorf("invalid remote loki configuration: %w", err) | ||||
| 		} | ||||
| 		backend := historian.NewRemoteLokiBackend(historian.LokiConfig{ | ||||
| 			Url:               baseURL, | ||||
| 			BasicAuthUser:     cfg.LokiBasicAuthUsername, | ||||
| 			BasicAuthPassword: cfg.LokiBasicAuthPassword, | ||||
| 			TenantID:          cfg.LokiTenantID, | ||||
| 		}) | ||||
| 		backend := historian.NewRemoteLokiBackend(lcfg) | ||||
| 
 | ||||
| 		testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) | ||||
| 		defer cancelFunc() | ||||
| 		if err := backend.TestConnection(testConnCtx); err != nil { | ||||
|  |  | |||
|  | @ -11,18 +11,47 @@ import ( | |||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/grafana/grafana/pkg/infra/log" | ||||
| 	"github.com/grafana/grafana/pkg/setting" | ||||
| ) | ||||
| 
 | ||||
| const defaultClientTimeout = 30 * time.Second | ||||
| 
 | ||||
| type LokiConfig struct { | ||||
| 	Url               *url.URL | ||||
| 	ReadPathURL       *url.URL | ||||
| 	WritePathURL      *url.URL | ||||
| 	BasicAuthUser     string | ||||
| 	BasicAuthPassword string | ||||
| 	TenantID          string | ||||
| 	ExternalLabels    map[string]string | ||||
| } | ||||
| 
 | ||||
| func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, error) { | ||||
| 	read, write := cfg.LokiReadURL, cfg.LokiWriteURL | ||||
| 	if read == "" { | ||||
| 		read = cfg.LokiRemoteURL | ||||
| 	} | ||||
| 	if write == "" { | ||||
| 		write = cfg.LokiRemoteURL | ||||
| 	} | ||||
| 
 | ||||
| 	readURL, err := url.Parse(read) | ||||
| 	if err != nil { | ||||
| 		return LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err) | ||||
| 	} | ||||
| 	writeURL, err := url.Parse(write) | ||||
| 	if err != nil { | ||||
| 		return LokiConfig{}, fmt.Errorf("failed to parse loki remote write URL: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return LokiConfig{ | ||||
| 		ReadPathURL:       readURL, | ||||
| 		WritePathURL:      writeURL, | ||||
| 		BasicAuthUser:     cfg.LokiBasicAuthUsername, | ||||
| 		BasicAuthPassword: cfg.LokiBasicAuthPassword, | ||||
| 		TenantID:          cfg.LokiTenantID, | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| type httpLokiClient struct { | ||||
| 	client http.Client | ||||
| 	cfg    LokiConfig | ||||
|  | @ -40,7 +69,7 @@ func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient { | |||
| } | ||||
| 
 | ||||
| func (c *httpLokiClient) ping(ctx context.Context) error { | ||||
| 	uri := c.cfg.Url.JoinPath("/loki/api/v1/labels") | ||||
| 	uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels") | ||||
| 	req, err := http.NewRequest(http.MethodGet, uri.String(), nil) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error creating request: %w", err) | ||||
|  | @ -92,7 +121,7 @@ func (c *httpLokiClient) push(ctx context.Context, s []stream) error { | |||
| 		return fmt.Errorf("failed to serialize Loki payload: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	uri := c.cfg.Url.JoinPath("/loki/api/v1/push") | ||||
| 	uri := c.cfg.WritePathURL.JoinPath("/loki/api/v1/push") | ||||
| 	req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(enc)) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create Loki request: %w", err) | ||||
|  |  | |||
|  | @ -5,11 +5,72 @@ import ( | |||
| 	"net/url" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/grafana/grafana/pkg/setting" | ||||
| 	"github.com/stretchr/testify/require" | ||||
| 
 | ||||
| 	"github.com/grafana/grafana/pkg/infra/log" | ||||
| ) | ||||
| 
 | ||||
| func TestLokiConfig(t *testing.T) { | ||||
| 	t.Run("test URL options", func(t *testing.T) { | ||||
| 		type testCase struct { | ||||
| 			name     string | ||||
| 			in       setting.UnifiedAlertingStateHistorySettings | ||||
| 			expRead  string | ||||
| 			expWrite string | ||||
| 			expErr   string | ||||
| 		} | ||||
| 
 | ||||
| 		cases := []testCase{ | ||||
| 			{ | ||||
| 				name: "remote url only", | ||||
| 				in: setting.UnifiedAlertingStateHistorySettings{ | ||||
| 					LokiRemoteURL: "http://url.com", | ||||
| 				}, | ||||
| 				expRead:  "http://url.com", | ||||
| 				expWrite: "http://url.com", | ||||
| 			}, | ||||
| 			{ | ||||
| 				name: "separate urls", | ||||
| 				in: setting.UnifiedAlertingStateHistorySettings{ | ||||
| 					LokiReadURL:  "http://read.url.com", | ||||
| 					LokiWriteURL: "http://write.url.com", | ||||
| 				}, | ||||
| 				expRead:  "http://read.url.com", | ||||
| 				expWrite: "http://write.url.com", | ||||
| 			}, | ||||
| 			{ | ||||
| 				name: "single fallback", | ||||
| 				in: setting.UnifiedAlertingStateHistorySettings{ | ||||
| 					LokiRemoteURL: "http://url.com", | ||||
| 					LokiReadURL:   "http://read.url.com", | ||||
| 				}, | ||||
| 				expRead:  "http://read.url.com", | ||||
| 				expWrite: "http://url.com", | ||||
| 			}, | ||||
| 			{ | ||||
| 				name: "invalid", | ||||
| 				in: setting.UnifiedAlertingStateHistorySettings{ | ||||
| 					LokiRemoteURL: "://://", | ||||
| 				}, | ||||
| 				expErr: "failed to parse", | ||||
| 			}, | ||||
| 		} | ||||
| 
 | ||||
| 		for _, tc := range cases { | ||||
| 			t.Run(tc.name, func(t *testing.T) { | ||||
| 				res, err := NewLokiConfig(tc.in) | ||||
| 				if tc.expErr != "" { | ||||
| 					require.ErrorContains(t, err, tc.expErr) | ||||
| 				} else { | ||||
| 					require.Equal(t, tc.expRead, res.ReadPathURL.String()) | ||||
| 					require.Equal(t, tc.expWrite, res.WritePathURL.String()) | ||||
| 				} | ||||
| 			}) | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| // This function can be used for local testing, just remove the skip call.
 | ||||
| func TestLokiHTTPClient(t *testing.T) { | ||||
| 	t.Skip() | ||||
|  | @ -19,7 +80,8 @@ func TestLokiHTTPClient(t *testing.T) { | |||
| 		require.NoError(t, err) | ||||
| 
 | ||||
| 		client := newLokiClient(LokiConfig{ | ||||
| 			Url: url, | ||||
| 			ReadPathURL:  url, | ||||
| 			WritePathURL: url, | ||||
| 		}, log.NewNopLogger()) | ||||
| 
 | ||||
| 		// Unauthorized request should fail against Grafana Cloud.
 | ||||
|  |  | |||
|  | @ -103,6 +103,8 @@ type UnifiedAlertingStateHistorySettings struct { | |||
| 	Enabled       bool | ||||
| 	Backend       string | ||||
| 	LokiRemoteURL string | ||||
| 	LokiReadURL   string | ||||
| 	LokiWriteURL  string | ||||
| 	LokiTenantID  string | ||||
| 	// LokiBasicAuthUsername and LokiBasicAuthPassword are used for basic auth
 | ||||
| 	// if one of them is set.
 | ||||
|  | @ -323,6 +325,8 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error { | |||
| 		Enabled:               stateHistory.Key("enabled").MustBool(stateHistoryDefaultEnabled), | ||||
| 		Backend:               stateHistory.Key("backend").MustString("annotations"), | ||||
| 		LokiRemoteURL:         stateHistory.Key("loki_remote_url").MustString(""), | ||||
| 		LokiReadURL:           stateHistory.Key("loki_remote_read_url").MustString(""), | ||||
| 		LokiWriteURL:          stateHistory.Key("loki_remote_write_url").MustString(""), | ||||
| 		LokiTenantID:          stateHistory.Key("loki_tenant_id").MustString(""), | ||||
| 		LokiBasicAuthUsername: stateHistory.Key("loki_basic_auth_username").MustString(""), | ||||
| 		LokiBasicAuthPassword: stateHistory.Key("loki_basic_auth_password").MustString(""), | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue