Add max_upstream_conn parameter for each proxy_cache project (#22348)
Build Package Workflow / BUILD_PACKAGE (push) Has been cancelled Details
Code scanning - action / CodeQL-Build (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-core, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-core, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-db, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-db, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-exporter, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-exporter, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-jobservice, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-jobservice, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-log, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-log, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-portal, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-portal, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-registryctl, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (harbor-registryctl, v2.12.0-dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (prepare, dev) (push) Has been cancelled Details
Trivy Nightly Scan / Trivy Scan nightly (prepare, v2.12.0-dev) (push) Has been cancelled Details
CONFORMANCE_TEST / CONFORMANCE_TEST (push) Has been cancelled Details
Housekeeping - Close stale issues and PRs / stale (push) Has been cancelled Details

limit the proxy connection to upstream registry

Signed-off-by: stonezdj <stonezdj@gmail.com>
This commit is contained in:
stonezdj(Daojun Zhang) 2025-09-30 18:27:10 +08:00 committed by GitHub
parent 4da6070872
commit c004f2d3e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 299 additions and 8 deletions

View File

@ -7321,6 +7321,10 @@ definitions:
type: string
description: 'The bandwidth limit of proxy cache, in Kbps (kilobits per second). It limits the communication between Harbor and the upstream registry, not the client and the Harbor.'
x-nullable: true
max_upstream_conn:
type: string
description: 'The max connection per artifact to the upstream registry in current proxy cache project, if it is -1, no limit to upstream registry connections'
x-nullable: true
ProjectSummary:
type: object
properties:

View File

@ -25,4 +25,5 @@ const (
ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist"
ProMetaAutoSBOMGen = "auto_sbom_generation"
ProMetaProxySpeed = "proxy_speed_kb"
ProMetaMaxUpstreamConn = "max_upstream_conn"
)

View File

@ -21,6 +21,7 @@ import (
"strings"
"time"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
allowlist "github.com/goharbor/harbor/src/pkg/allowlist/models"
)
@ -169,6 +170,20 @@ func (p *Project) ProxyCacheSpeed() int32 {
return int32(speedInt)
}
// MaxUpstreamConnection ...
func (p *Project) MaxUpstreamConnection() int {
countVal, exist := p.GetMetadata(ProMetaMaxUpstreamConn)
if !exist {
return 0
}
cnt, err := strconv.ParseInt(countVal, 10, 32)
if err != nil {
log.Warningf("failed th parse the max_upstream_conn, val:%s error %v", countVal, err)
return 0
}
return int(cnt)
}
// FilterByPublic returns orm.QuerySeter with public filter
func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value any) orm.QuerySeter {
subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'`

View File

@ -0,0 +1,79 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connection
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/goharbor/harbor/src/lib/log"
)
// ConLimiter is used to limit the number of connections to the upstream service
type ConnLimiter struct {
}
// Limiter is a global connection limiter instance
var Limiter = &ConnLimiter{}
// Used to compare and increase connection number in redis
//
// KEYS[1]: key of max_conn_upstream
// ARGV[1]: max connection limit
var increaseWithLimitText = `
local current = tonumber(redis.call('GET', KEYS[1]) or '0')
local max = tonumber(ARGV[1])
if current + 1 <= max then
redis.call('INCRBY', KEYS[1], 1)
redis.call('EXPIRE', KEYS[1], 3600) -- set expire to avoid always lock
return 1
else
return 0
end
`
var acquireScript = redis.NewScript(increaseWithLimitText)
// Acquire tries to acquire a connection, returns true if successful
func (c *ConnLimiter) Acquire(ctx context.Context, rdb *redis.Client, key string, limit int) bool {
result, err := acquireScript.Run(ctx, rdb, []string{key}, fmt.Sprintf("%v", limit)).Int()
if err != nil {
log.Errorf("failed to get the connection lock in redis, error %v", err)
return false
}
log.Debugf("Acquire script result is %d", result)
return result == 1
}
var decreaseText = `
local val = tonumber(redis.call("GET", KEYS[1]) or "0")
if val > 0 then
redis.call("DECR", KEYS[1])
end
return 0
`
var decreaseScript = redis.NewScript(decreaseText)
// Release releases a connection in redis
func (c *ConnLimiter) Release(ctx context.Context, rdb *redis.Client, key string) {
_, err := decreaseScript.Run(ctx, rdb, []string{key}).Int()
if err != nil {
log.Infof("release connection failed:%v", err)
}
}

View File

@ -0,0 +1,59 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connection
import (
"context"
"fmt"
"os"
"testing"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
)
func TestConnLimiter_Acquire_Release(t *testing.T) {
redisAddress := os.Getenv("REDIS_HOST")
redisHost := "localhost"
if len(redisAddress) > 0 {
redisHost = redisAddress
}
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:6379", redisHost), // Redis server address
Password: "", // No password set
DB: 0, // Use default DB
})
key := "test_max_connection_key"
maxConn := 10
for range 10 {
result := Limiter.Acquire(ctx, rdb, key, maxConn)
assert.True(t, result)
}
// after max connection reached, it should be false
result2 := Limiter.Acquire(ctx, rdb, key, maxConn)
assert.False(t, result2)
for range 10 {
Limiter.Release(ctx, rdb, key)
}
// connection in redis should be 0 finally
n, err := rdb.Get(ctx, key).Int()
assert.Nil(t, err)
assert.Equal(t, 0, n)
}

View File

@ -19,6 +19,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
@ -33,7 +34,9 @@ import (
httpLib "github.com/goharbor/harbor/src/lib/http"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/redis"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
"github.com/goharbor/harbor/src/pkg/proxy/connection"
"github.com/goharbor/harbor/src/pkg/reg/model"
"github.com/goharbor/harbor/src/server/middleware"
)
@ -45,6 +48,7 @@ const (
etag = "Etag"
ensureTagInterval = 10 * time.Second
ensureTagMaxRetry = 60
upstreamRegistryLimitOnProject = "UPSTREAM_REGISTRY_LIMIT_ON_PROJECT" // if UPSTREAM_REGISTRY_LIMIT_ON_PROJECT is true, the upstream registry connection is based on project level, by default it is artifact level
)
var tooManyRequestsError = errors.New("too many requests to upstream registry").WithCode(errors.RateLimitCode)
@ -99,6 +103,22 @@ func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error
next.ServeHTTP(w, r)
return nil
}
if p.MaxUpstreamConnection() > 0 {
client, err := redis.GetHarborClient()
if err != nil {
return errors.NewErrs(err)
}
key := upstreamRegistryConnectionKey(art)
log.Debugf("handle blob, upstream registry connection limit key: %s", key)
if !connection.Limiter.Acquire(ctx, client, key, p.MaxUpstreamConnection()) {
log.Infof("current connection exceed max connections to upstream registry")
// send http code 429 to client
return tooManyRequestsError
}
defer connection.Limiter.Release(context.Background(), client, key) // use background context in defer to avoid been canceled
}
size, reader, err := proxyCtl.ProxyBlob(ctx, p, art)
if err != nil {
return err
@ -173,6 +193,15 @@ func defaultBlobURL(projectName string, name string, digest string) string {
return fmt.Sprintf("/v2/%s/library/%s/blobs/%s", projectName, name, digest)
}
// upstreamRegistryConnectionKey get upstream registry connection key
func upstreamRegistryConnectionKey(art lib.ArtifactInfo) string {
limitOnProject := os.Getenv(upstreamRegistryLimitOnProject)
if strings.EqualFold("true", limitOnProject) {
return fmt.Sprintf("{upstream_registry_connection}:%s", art.ProjectName)
}
return fmt.Sprintf("{upstream_registry_connection}:%s:%s", art.Repository, art.Digest)
}
func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
ctx := r.Context()
art, p, proxyCtl, err := preCheck(ctx, true)
@ -219,6 +248,20 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e
next.ServeHTTP(w, r)
return nil
}
if p.MaxUpstreamConnection() > 0 {
client, err := redis.GetHarborClient()
if err != nil {
return errors.NewErrs(err)
}
key := upstreamRegistryConnectionKey(art)
log.Debugf("handle manifest key %v", key)
if !connection.Limiter.Acquire(ctx, client, key, p.MaxUpstreamConnection()) {
log.Infof("current connection exceed max connections to upstream registry")
// send http code 429 to client
return tooManyRequestsError
}
defer connection.Limiter.Release(context.Background(), client, key) // use background context in defer to avoid been canceled
}
log.Debugf("the tag is %v, digest is %v", art.Tag, art.Digest)
if r.Method == http.MethodHead {

View File

@ -163,9 +163,10 @@ func (a *projectAPI) CreateProject(ctx context.Context, params operation.CreateP
}
}
// ignore metadata.proxy_speed_kb for non-proxy-cache project
// ignore metadata.proxy_speed_kb and metadata.max_upstream_conn for non-proxy-cache project
if req.RegistryID == nil {
req.Metadata.ProxySpeedKb = nil
req.Metadata.MaxUpstreamConn = nil
}
// ignore enable_content_trust metadata for proxy cache project
@ -566,9 +567,10 @@ func (a *projectAPI) UpdateProject(ctx context.Context, params operation.UpdateP
}
}
// ignore metadata.proxy_speed_kb for non-proxy-cache project
// ignore metadata.proxy_speed_kb and metadata.max_upstream_conn for non-proxy-cache project
if params.Project.Metadata != nil && !p.IsProxy() {
params.Project.Metadata.ProxySpeedKb = nil
params.Project.Metadata.MaxUpstreamConn = nil
}
// ignore enable_content_trust metadata for proxy cache project
@ -818,6 +820,12 @@ func (a *projectAPI) validateProjectReq(ctx context.Context, req *models.Project
return errors.BadRequestError(nil).WithMessagef("metadata.proxy_speed_kb should by an int32, but got: '%s', err: %s", *ps, err)
}
}
if cnt := req.Metadata.MaxUpstreamConn; cnt != nil {
if _, err := strconv.ParseInt(*cnt, 10, 32); err != nil {
return errors.BadRequestError(nil).WithMessagef("metadata.max_upstream_conn should be an int, but got '%s', err: %s", *cnt, err)
}
}
}
if req.StorageLimit != nil {

View File

@ -161,6 +161,12 @@ func (p *projectMetadataAPI) validate(metas map[string]string) (map[string]strin
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid value: %s", value)
}
metas[proModels.ProMetaProxySpeed] = strconv.FormatInt(v, 10)
case proModels.ProMetaMaxUpstreamConn:
v, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid value: %s", value)
}
metas[proModels.ProMetaMaxUpstreamConn] = strconv.FormatInt(v, 10)
default:
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid key: %s", key)
}

View File

@ -0,0 +1,76 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handler
import (
"testing"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
"github.com/stretchr/testify/assert"
)
func TestValidate(t *testing.T) {
api := &projectMetadataAPI{}
tests := []struct {
name string
metas map[string]string
expectErr bool
}{
{
name: "Invalid max upstream conn value",
metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "invalid"},
expectErr: true,
},
{
name: "max upstream conn value 0",
metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "0"},
expectErr: false,
},
{
name: "max upstream conn value -1",
metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "-1"},
expectErr: false,
},
{
name: "normal max upstream conn value",
metas: map[string]string{proModels.ProMetaMaxUpstreamConn: "30"},
expectErr: false,
},
{
name: "Unsupported key",
metas: map[string]string{"unsupported_key": "value"},
expectErr: true,
},
{
name: "Empty map",
metas: map[string]string{},
expectErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := api.validate(tt.metas)
if tt.expectErr {
assert.Error(t, err)
assert.Nil(t, result)
} else {
assert.NoError(t, err)
assert.NotNil(t, result)
}
})
}
}