Usage Stats: Calculate concurrent users as a histogram (#30006)

Usage Stats: Calculate concurrent users as a histogram

Metric will help to understand if users are sharing their credentials or using one account for concurrent sessions. This will help to make more informed decisions when enforcing any license limitations on credentials usage.
This commit is contained in:
Vardan Torosyan 2021-01-06 16:57:31 +01:00 committed by GitHub
parent c77fb9fa13
commit 7ff37bc6bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 11 deletions

View File

@ -2,6 +2,7 @@ package usagestats
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
@ -22,7 +23,7 @@ func init() {
} }
type UsageStats interface { type UsageStats interface {
GetUsageReport() (UsageReport, error) GetUsageReport(ctx context.Context) (UsageReport, error)
RegisterMetric(name string, fn MetricFunc) RegisterMetric(name string, fn MetricFunc)
} }
@ -38,8 +39,9 @@ type UsageStatsService struct {
log log.Logger log log.Logger
oauthProviders map[string]bool oauthProviders map[string]bool
externalMetrics map[string]MetricFunc externalMetrics map[string]MetricFunc
concurrentUserStatsCache memoConcurrentUserStats
} }
func (uss *UsageStatsService) Init() error { func (uss *UsageStatsService) Init() error {
@ -60,7 +62,7 @@ func (uss *UsageStatsService) Run(ctx context.Context) error {
for { for {
select { select {
case <-onceEveryDayTick.C: case <-onceEveryDayTick.C:
if err := uss.sendUsageStats(); err != nil { if err := uss.sendUsageStats(ctx); err != nil {
metricsLogger.Warn("Failed to send usage stats", "err", err) metricsLogger.Warn("Failed to send usage stats", "err", err)
} }
case <-everyMinuteTicker.C: case <-everyMinuteTicker.C:
@ -70,3 +72,43 @@ func (uss *UsageStatsService) Run(ctx context.Context) error {
} }
} }
} }
type memoConcurrentUserStats struct {
stats *concurrentUsersStats
memoized time.Time
}
const concurrentUserStatsCacheLifetime = time.Hour
func (uss *UsageStatsService) GetConcurrentUsersStats(ctx context.Context) (*concurrentUsersStats, error) {
memoizationPeriod := time.Now().Add(-concurrentUserStatsCacheLifetime)
if !uss.concurrentUserStatsCache.memoized.Before(memoizationPeriod) {
return uss.concurrentUserStatsCache.stats, nil
}
uss.concurrentUserStatsCache.stats = &concurrentUsersStats{}
err := uss.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
// Retrieves concurrent users stats as a histogram. Buckets are accumulative and upper bound is inclusive.
rawSQL := `
SELECT
COUNT(CASE WHEN tokens <= 3 THEN 1 END) AS bucket_le_3,
COUNT(CASE WHEN tokens <= 6 THEN 1 END) AS bucket_le_6,
COUNT(CASE WHEN tokens <= 9 THEN 1 END) AS bucket_le_9,
COUNT(CASE WHEN tokens <= 12 THEN 1 END) AS bucket_le_12,
COUNT(CASE WHEN tokens <= 15 THEN 1 END) AS bucket_le_15,
COUNT(1) AS bucket_le_inf
FROM (select count(1) as tokens from user_auth_token group by user_id) uat;`
_, err := sess.SQL(rawSQL).Get(uss.concurrentUserStatsCache.stats)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to get concurrent users stats from database: %w", err)
}
uss.concurrentUserStatsCache.memoized = time.Now()
return uss.concurrentUserStatsCache.stats, nil
}

View File

@ -0,0 +1,10 @@
package usagestats
type concurrentUsersStats struct {
BucketLE3 int32 `xorm:"bucket_le_3"`
BucketLE6 int32 `xorm:"bucket_le_6"`
BucketLE9 int32 `xorm:"bucket_le_9"`
BucketLE12 int32 `xorm:"bucket_le_12"`
BucketLE15 int32 `xorm:"bucket_le_15"`
BucketLEInf int32 `xorm:"bucket_le_inf"`
}

View File

@ -2,6 +2,7 @@ package usagestats
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -27,7 +28,7 @@ type UsageReport struct {
Packaging string `json:"packaging"` Packaging string `json:"packaging"`
} }
func (uss *UsageStatsService) GetUsageReport() (UsageReport, error) { func (uss *UsageStatsService) GetUsageReport(ctx context.Context) (UsageReport, error) {
version := strings.ReplaceAll(setting.BuildVersion, ".", "_") version := strings.ReplaceAll(setting.BuildVersion, ".", "_")
metrics := map[string]interface{}{} metrics := map[string]interface{}{}
@ -185,6 +186,21 @@ func (uss *UsageStatsService) GetUsageReport() (UsageReport, error) {
metrics["stats.auth_enabled."+authType+".count"] = enabledValue metrics["stats.auth_enabled."+authType+".count"] = enabledValue
} }
// Get concurrent users stats as histogram
concurrentUsersStats, err := uss.GetConcurrentUsersStats(ctx)
if err != nil {
metricsLogger.Error("Failed to get concurrent users stats", "error", err)
return report, err
}
// Histogram is cumulative and metric name has a postfix of le_"<upper inclusive bound>"
metrics["stats.auth_token_per_user_le_3"] = concurrentUsersStats.BucketLE3
metrics["stats.auth_token_per_user_le_6"] = concurrentUsersStats.BucketLE6
metrics["stats.auth_token_per_user_le_9"] = concurrentUsersStats.BucketLE9
metrics["stats.auth_token_per_user_le_12"] = concurrentUsersStats.BucketLE12
metrics["stats.auth_token_per_user_le_15"] = concurrentUsersStats.BucketLE15
metrics["stats.auth_token_per_user_le_inf"] = concurrentUsersStats.BucketLEInf
return report, nil return report, nil
} }
@ -203,14 +219,14 @@ func (uss *UsageStatsService) RegisterMetric(name string, fn MetricFunc) {
uss.externalMetrics[name] = fn uss.externalMetrics[name] = fn
} }
func (uss *UsageStatsService) sendUsageStats() error { func (uss *UsageStatsService) sendUsageStats(ctx context.Context) error {
if !setting.ReportingEnabled { if !setting.ReportingEnabled {
return nil return nil
} }
metricsLogger.Debug(fmt.Sprintf("Sending anonymous usage stats to %s", usageStatsURL)) metricsLogger.Debug(fmt.Sprintf("Sending anonymous usage stats to %s", usageStatsURL))
report, err := uss.GetUsageReport() report, err := uss.GetUsageReport(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,106 @@
package usagestats
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"testing"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/services/licensing"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUsageStatsService_GetConcurrentUsersStats(t *testing.T) {
sqlStore := sqlstore.InitTestDB(t)
uss := &UsageStatsService{
Bus: bus.New(),
SQLStore: sqlStore,
License: &licensing.OSSLicensingService{},
}
createConcurrentTokens(t, sqlStore)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(func() {
cancel()
})
actualResult, err := uss.GetConcurrentUsersStats(ctx)
require.NoError(t, err)
expectedCachedResult := &concurrentUsersStats{
BucketLE3: 1,
BucketLE6: 2,
BucketLE9: 3,
BucketLE12: 4,
BucketLE15: 5,
BucketLEInf: 6,
}
assert.Equal(t, expectedCachedResult, actualResult)
createToken(t, 8, sqlStore)
require.NoError(t, err)
actualResult, err = uss.GetConcurrentUsersStats(ctx)
require.NoError(t, err)
assert.Equal(t, expectedCachedResult, actualResult)
}
func createToken(t *testing.T, uID int, sqlStore *sqlstore.SQLStore) {
t.Helper()
token, err := util.RandomHex(16)
require.NoError(t, err)
tokenWithSecret := fmt.Sprintf("%ssecret%d", token, uID)
hashBytes := sha256.Sum256([]byte(tokenWithSecret))
hashedToken := hex.EncodeToString(hashBytes[:])
now := time.Now().Unix()
userAuthToken := userAuthToken{
UserID: int64(uID),
AuthToken: hashedToken,
PrevAuthToken: hashedToken,
ClientIP: "192.168.10.11",
UserAgent: "Mozilla",
RotatedAt: now,
CreatedAt: now,
UpdatedAt: now,
SeenAt: 0,
AuthTokenSeen: false,
}
err = sqlStore.WithDbSession(context.Background(), func(dbSession *sqlstore.DBSession) error {
_, err = dbSession.Insert(&userAuthToken)
return err
})
require.NoError(t, err)
}
func createConcurrentTokens(t *testing.T, sqlStore *sqlstore.SQLStore) {
t.Helper()
for u := 1; u <= 6; u++ {
for tkn := 1; tkn <= u*3; tkn++ {
createToken(t, u, sqlStore)
}
}
}
type userAuthToken struct {
UserID int64 `xorm:"user_id"`
AuthToken string
PrevAuthToken string
UserAgent string
ClientIP string `xorm:"client_ip"`
AuthTokenSeen bool
SeenAt int64
RotatedAt int64
CreatedAt int64
UpdatedAt int64
}

View File

@ -2,6 +2,7 @@ package usagestats
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"io/ioutil" "io/ioutil"
"runtime" "runtime"
@ -159,6 +160,7 @@ func TestMetrics(t *testing.T) {
return nil return nil
}) })
createConcurrentTokens(t, uss.SQLStore)
uss.AlertingUsageStats = &alertingUsageMock{} uss.AlertingUsageStats = &alertingUsageMock{}
var wg sync.WaitGroup var wg sync.WaitGroup
@ -186,12 +188,12 @@ func TestMetrics(t *testing.T) {
"grafana_com": true, "grafana_com": true,
} }
err := uss.sendUsageStats() err := uss.sendUsageStats(context.Background())
require.NoError(t, err) require.NoError(t, err)
t.Run("Given reporting not enabled and sending usage stats", func(t *testing.T) { t.Run("Given reporting not enabled and sending usage stats", func(t *testing.T) {
setting.ReportingEnabled = false setting.ReportingEnabled = false
err := uss.sendUsageStats() err := uss.sendUsageStats(context.Background())
require.NoError(t, err) require.NoError(t, err)
t.Run("Should not gather stats or call http endpoint", func(t *testing.T) { t.Run("Should not gather stats or call http endpoint", func(t *testing.T) {
@ -212,7 +214,7 @@ func TestMetrics(t *testing.T) {
setting.Packaging = "deb" setting.Packaging = "deb"
wg.Add(1) wg.Add(1)
err := uss.sendUsageStats() err := uss.sendUsageStats(context.Background())
require.NoError(t, err) require.NoError(t, err)
t.Run("Should gather stats and call http endpoint", func(t *testing.T) { t.Run("Should gather stats and call http endpoint", func(t *testing.T) {
@ -291,6 +293,13 @@ func TestMetrics(t *testing.T) {
assert.Equal(t, 1, metrics.Get("stats.auth_enabled.oauth_grafana_com.count").MustInt()) assert.Equal(t, 1, metrics.Get("stats.auth_enabled.oauth_grafana_com.count").MustInt())
assert.Equal(t, 1, metrics.Get("stats.packaging.deb.count").MustInt()) assert.Equal(t, 1, metrics.Get("stats.packaging.deb.count").MustInt())
assert.Equal(t, 1, metrics.Get("stats.auth_token_per_user_le_3").MustInt())
assert.Equal(t, 2, metrics.Get("stats.auth_token_per_user_le_6").MustInt())
assert.Equal(t, 3, metrics.Get("stats.auth_token_per_user_le_9").MustInt())
assert.Equal(t, 4, metrics.Get("stats.auth_token_per_user_le_12").MustInt())
assert.Equal(t, 5, metrics.Get("stats.auth_token_per_user_le_15").MustInt())
assert.Equal(t, 6, metrics.Get("stats.auth_token_per_user_le_inf").MustInt())
}) })
}) })
}) })
@ -419,12 +428,26 @@ func TestMetrics(t *testing.T) {
return nil return nil
}) })
createConcurrentTokens(t, uss.SQLStore)
t.Run("Should include metrics for concurrent users", func(t *testing.T) {
report, err := uss.GetUsageReport(context.Background())
require.NoError(t, err)
assert.Equal(t, int32(1), report.Metrics["stats.auth_token_per_user_le_3"])
assert.Equal(t, int32(2), report.Metrics["stats.auth_token_per_user_le_6"])
assert.Equal(t, int32(3), report.Metrics["stats.auth_token_per_user_le_9"])
assert.Equal(t, int32(4), report.Metrics["stats.auth_token_per_user_le_12"])
assert.Equal(t, int32(5), report.Metrics["stats.auth_token_per_user_le_15"])
assert.Equal(t, int32(6), report.Metrics["stats.auth_token_per_user_le_inf"])
})
t.Run("Should include external metrics", func(t *testing.T) { t.Run("Should include external metrics", func(t *testing.T) {
uss.RegisterMetric(metricName, func() (interface{}, error) { uss.RegisterMetric(metricName, func() (interface{}, error) {
return 1, nil return 1, nil
}) })
report, err := uss.GetUsageReport() report, err := uss.GetUsageReport(context.Background())
assert.Nil(t, err, "Expected no error") assert.Nil(t, err, "Expected no error")
metric := report.Metrics[metricName] metric := report.Metrics[metricName]