diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 16d5d12044..6821ed65ca 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -7321,6 +7321,10 @@ definitions: type: string description: 'The bandwidth limit of proxy cache, in Kbps (kilobits per second). It limits the communication between Harbor and the upstream registry, not the client and the Harbor.' x-nullable: true + max_upstream_conn: + type: string + description: 'The max connection per artifact to the upstream registry in current proxy cache project, if it is -1, no limit to upstream registry connections' + x-nullable: true ProjectSummary: type: object properties: diff --git a/src/pkg/project/models/pro_meta.go b/src/pkg/project/models/pro_meta.go index 25f7e41bee..6bb82b1b33 100644 --- a/src/pkg/project/models/pro_meta.go +++ b/src/pkg/project/models/pro_meta.go @@ -25,4 +25,5 @@ const ( ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist" ProMetaAutoSBOMGen = "auto_sbom_generation" ProMetaProxySpeed = "proxy_speed_kb" + ProMetaMaxUpstreamConn = "max_upstream_conn" ) diff --git a/src/pkg/project/models/project.go b/src/pkg/project/models/project.go index ae8256a7d7..42332697fa 100644 --- a/src/pkg/project/models/project.go +++ b/src/pkg/project/models/project.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" allowlist "github.com/goharbor/harbor/src/pkg/allowlist/models" ) @@ -169,6 +170,20 @@ func (p *Project) ProxyCacheSpeed() int32 { return int32(speedInt) } +// MaxUpstreamConnection ... +func (p *Project) MaxUpstreamConnection() int { + countVal, exist := p.GetMetadata(ProMetaMaxUpstreamConn) + if !exist { + return 0 + } + cnt, err := strconv.ParseInt(countVal, 10, 32) + if err != nil { + log.Warningf("failed th parse the max_upstream_conn, val:%s error %v", countVal, err) + return 0 + } + return int(cnt) +} + // FilterByPublic returns orm.QuerySeter with public filter func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value any) orm.QuerySeter { subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'` diff --git a/src/pkg/proxy/connection/limit.go b/src/pkg/proxy/connection/limit.go new file mode 100644 index 0000000000..72a20261b9 --- /dev/null +++ b/src/pkg/proxy/connection/limit.go @@ -0,0 +1,79 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "fmt" + + "github.com/go-redis/redis/v8" + + "github.com/goharbor/harbor/src/lib/log" +) + +// ConLimiter is used to limit the number of connections to the upstream service +type ConnLimiter struct { +} + +// Limiter is a global connection limiter instance +var Limiter = &ConnLimiter{} + +// Used to compare and increase connection number in redis +// +// KEYS[1]: key of max_conn_upstream +// ARGV[1]: max connection limit +var increaseWithLimitText = ` +local current = tonumber(redis.call('GET', KEYS[1]) or '0') +local max = tonumber(ARGV[1]) + +if current + 1 <= max then + redis.call('INCRBY', KEYS[1], 1) + redis.call('EXPIRE', KEYS[1], 3600) -- set expire to avoid always lock + return 1 +else + return 0 +end +` + +var acquireScript = redis.NewScript(increaseWithLimitText) + +// Acquire tries to acquire a connection, returns true if successful +func (c *ConnLimiter) Acquire(ctx context.Context, rdb *redis.Client, key string, limit int) bool { + result, err := acquireScript.Run(ctx, rdb, []string{key}, fmt.Sprintf("%v", limit)).Int() + if err != nil { + log.Errorf("failed to get the connection lock in redis, error %v", err) + return false + } + log.Debugf("Acquire script result is %d", result) + return result == 1 +} + +var decreaseText = ` +local val = tonumber(redis.call("GET", KEYS[1]) or "0") +if val > 0 then + redis.call("DECR", KEYS[1]) +end +return 0 +` + +var decreaseScript = redis.NewScript(decreaseText) + +// Release releases a connection in redis +func (c *ConnLimiter) Release(ctx context.Context, rdb *redis.Client, key string) { + _, err := decreaseScript.Run(ctx, rdb, []string{key}).Int() + if err != nil { + log.Infof("release connection failed:%v", err) + } +} diff --git a/src/pkg/proxy/connection/limit_test.go b/src/pkg/proxy/connection/limit_test.go new file mode 100644 index 0000000000..309a9c2b65 --- /dev/null +++ b/src/pkg/proxy/connection/limit_test.go @@ -0,0 +1,59 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" +) + +func TestConnLimiter_Acquire_Release(t *testing.T) { + redisAddress := os.Getenv("REDIS_HOST") + redisHost := "localhost" + if len(redisAddress) > 0 { + redisHost = redisAddress + } + + ctx := context.Background() + rdb := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:6379", redisHost), // Redis server address + Password: "", // No password set + DB: 0, // Use default DB + }) + key := "test_max_connection_key" + maxConn := 10 + for range 10 { + result := Limiter.Acquire(ctx, rdb, key, maxConn) + assert.True(t, result) + } + // after max connection reached, it should be false + result2 := Limiter.Acquire(ctx, rdb, key, maxConn) + assert.False(t, result2) + + for range 10 { + Limiter.Release(ctx, rdb, key) + } + + // connection in redis should be 0 finally + n, err := rdb.Get(ctx, key).Int() + assert.Nil(t, err) + assert.Equal(t, 0, n) + +} diff --git a/src/server/middleware/repoproxy/proxy.go b/src/server/middleware/repoproxy/proxy.go index 75abdcae26..a3524add3f 100644 --- a/src/server/middleware/repoproxy/proxy.go +++ b/src/server/middleware/repoproxy/proxy.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "net/http" + "os" "strings" "time" @@ -33,18 +34,21 @@ import ( httpLib "github.com/goharbor/harbor/src/lib/http" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/redis" proModels "github.com/goharbor/harbor/src/pkg/project/models" + "github.com/goharbor/harbor/src/pkg/proxy/connection" "github.com/goharbor/harbor/src/pkg/reg/model" "github.com/goharbor/harbor/src/server/middleware" ) const ( - contentLength = "Content-Length" - contentType = "Content-Type" - dockerContentDigest = "Docker-Content-Digest" - etag = "Etag" - ensureTagInterval = 10 * time.Second - ensureTagMaxRetry = 60 + contentLength = "Content-Length" + contentType = "Content-Type" + dockerContentDigest = "Docker-Content-Digest" + etag = "Etag" + ensureTagInterval = 10 * time.Second + ensureTagMaxRetry = 60 + upstreamRegistryLimitOnProject = "UPSTREAM_REGISTRY_LIMIT_ON_PROJECT" // if UPSTREAM_REGISTRY_LIMIT_ON_PROJECT is true, the upstream registry connection is based on project level, by default it is artifact level ) var tooManyRequestsError = errors.New("too many requests to upstream registry").WithCode(errors.RateLimitCode) @@ -99,6 +103,22 @@ func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error next.ServeHTTP(w, r) return nil } + + if p.MaxUpstreamConnection() > 0 { + client, err := redis.GetHarborClient() + if err != nil { + return errors.NewErrs(err) + } + key := upstreamRegistryConnectionKey(art) + log.Debugf("handle blob, upstream registry connection limit key: %s", key) + if !connection.Limiter.Acquire(ctx, client, key, p.MaxUpstreamConnection()) { + log.Infof("current connection exceed max connections to upstream registry") + // send http code 429 to client + return tooManyRequestsError + } + defer connection.Limiter.Release(context.Background(), client, key) // use background context in defer to avoid been canceled + } + size, reader, err := proxyCtl.ProxyBlob(ctx, p, art) if err != nil { return err @@ -173,6 +193,15 @@ func defaultBlobURL(projectName string, name string, digest string) string { return fmt.Sprintf("/v2/%s/library/%s/blobs/%s", projectName, name, digest) } +// upstreamRegistryConnectionKey get upstream registry connection key +func upstreamRegistryConnectionKey(art lib.ArtifactInfo) string { + limitOnProject := os.Getenv(upstreamRegistryLimitOnProject) + if strings.EqualFold("true", limitOnProject) { + return fmt.Sprintf("{upstream_registry_connection}:%s", art.ProjectName) + } + return fmt.Sprintf("{upstream_registry_connection}:%s:%s", art.Repository, art.Digest) +} + func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error { ctx := r.Context() art, p, proxyCtl, err := preCheck(ctx, true) @@ -219,6 +248,20 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e next.ServeHTTP(w, r) return nil } + if p.MaxUpstreamConnection() > 0 { + client, err := redis.GetHarborClient() + if err != nil { + return errors.NewErrs(err) + } + key := upstreamRegistryConnectionKey(art) + log.Debugf("handle manifest key %v", key) + if !connection.Limiter.Acquire(ctx, client, key, p.MaxUpstreamConnection()) { + log.Infof("current connection exceed max connections to upstream registry") + // send http code 429 to client + return tooManyRequestsError + } + defer connection.Limiter.Release(context.Background(), client, key) // use background context in defer to avoid been canceled + } log.Debugf("the tag is %v, digest is %v", art.Tag, art.Digest) if r.Method == http.MethodHead { diff --git a/src/server/v2.0/handler/project.go b/src/server/v2.0/handler/project.go index 23d4835754..de75072b8c 100644 --- a/src/server/v2.0/handler/project.go +++ b/src/server/v2.0/handler/project.go @@ -163,9 +163,10 @@ func (a *projectAPI) CreateProject(ctx context.Context, params operation.CreateP } } - // ignore metadata.proxy_speed_kb for non-proxy-cache project + // ignore metadata.proxy_speed_kb and metadata.max_upstream_conn for non-proxy-cache project if req.RegistryID == nil { req.Metadata.ProxySpeedKb = nil + req.Metadata.MaxUpstreamConn = nil } // ignore enable_content_trust metadata for proxy cache project @@ -566,9 +567,10 @@ func (a *projectAPI) UpdateProject(ctx context.Context, params operation.UpdateP } } - // ignore metadata.proxy_speed_kb for non-proxy-cache project + // ignore metadata.proxy_speed_kb and metadata.max_upstream_conn for non-proxy-cache project if params.Project.Metadata != nil && !p.IsProxy() { params.Project.Metadata.ProxySpeedKb = nil + params.Project.Metadata.MaxUpstreamConn = nil } // ignore enable_content_trust metadata for proxy cache project @@ -818,6 +820,12 @@ func (a *projectAPI) validateProjectReq(ctx context.Context, req *models.Project return errors.BadRequestError(nil).WithMessagef("metadata.proxy_speed_kb should by an int32, but got: '%s', err: %s", *ps, err) } } + + if cnt := req.Metadata.MaxUpstreamConn; cnt != nil { + if _, err := strconv.ParseInt(*cnt, 10, 32); err != nil { + return errors.BadRequestError(nil).WithMessagef("metadata.max_upstream_conn should be an int, but got '%s', err: %s", *cnt, err) + } + } } if req.StorageLimit != nil { diff --git a/src/server/v2.0/handler/project_metadata.go b/src/server/v2.0/handler/project_metadata.go index b58c59ffa6..61933af25b 100644 --- a/src/server/v2.0/handler/project_metadata.go +++ b/src/server/v2.0/handler/project_metadata.go @@ -161,6 +161,12 @@ func (p *projectMetadataAPI) validate(metas map[string]string) (map[string]strin return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid value: %s", value) } metas[proModels.ProMetaProxySpeed] = strconv.FormatInt(v, 10) + case proModels.ProMetaMaxUpstreamConn: + v, err := strconv.ParseInt(value, 10, 32) + if err != nil { + return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid value: %s", value) + } + metas[proModels.ProMetaMaxUpstreamConn] = strconv.FormatInt(v, 10) default: return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid key: %s", key) } diff --git a/src/server/v2.0/handler/project_metadata_test.go b/src/server/v2.0/handler/project_metadata_test.go new file mode 100644 index 0000000000..46a8f398ae --- /dev/null +++ b/src/server/v2.0/handler/project_metadata_test.go @@ -0,0 +1,76 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "testing" + + proModels "github.com/goharbor/harbor/src/pkg/project/models" + "github.com/stretchr/testify/assert" +) + +func TestValidate(t *testing.T) { + api := &projectMetadataAPI{} + + tests := []struct { + name string + metas map[string]string + expectErr bool + }{ + { + name: "Invalid max upstream conn value", + metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "invalid"}, + expectErr: true, + }, + { + name: "max upstream conn value 0", + metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "0"}, + expectErr: false, + }, + { + name: "max upstream conn value -1", + metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "-1"}, + expectErr: false, + }, + { + name: "normal max upstream conn value", + metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "30"}, + expectErr: false, + }, + { + name: "Unsupported key", + metas: map[string]string{"unsupported_key": "value"}, + expectErr: true, + }, + { + name: "Empty map", + metas: map[string]string{}, + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := api.validate(tt.metas) + if tt.expectErr { + assert.Error(t, err) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.NotNil(t, result) + } + }) + } +}