KEP-5334: Image Pull Progress

This commit is contained in:
Shiming Zhang 2025-05-23 20:05:32 +08:00
parent 5be7941786
commit 00eabf0d8b
15 changed files with 1306 additions and 555 deletions

View File

@ -151,6 +151,7 @@ type StreamingRuntime interface {
GetExec(ctx context.Context, id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(ctx context.Context, id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID) (*url.URL, error)
}
// ImageService interfaces allows to work with image service.

View File

@ -511,6 +511,14 @@ func (f *FakeStreamingRuntime) GetPortForward(_ context.Context, podName, podNam
return &url.URL{Host: FakeHost}, f.Err
}
func (f *FakeStreamingRuntime) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID) (*url.URL, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetImagePullProgress")
return &url.URL{Host: FakeHost}, f.Err
}
type FakeContainerCommandRunner struct {
// what to return
Stdout string

View File

@ -44,6 +44,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubelet/pkg/cri/streaming/imagepullprogress"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -2450,6 +2451,24 @@ func (kl *Kubelet) GetPortForward(ctx context.Context, podName, podNamespace str
return kl.streamingRuntime.GetPortForward(ctx, podName, podNamespace, podUID, portForwardOpts.Ports)
}
// GetImagePullProgress gets the URL the image-pull-progress will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts imagepullprogress.Options) (*url.URL, error) {
pods, err := kl.containerRuntime.GetPods(ctx, false)
if err != nil {
return nil, err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}
return kl.streamingRuntime.GetImagePullProgress(ctx, podName, podNamespace, podUID)
}
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.
// it reconciles the cached state of cgroupPods with the specified list of runningPods
func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupPods map[types.UID]cm.CgroupName, possiblyRunningPods map[types.UID]sets.Empty) {

View File

@ -272,6 +272,15 @@ func (in instrumentedRuntimeService) PortForward(ctx context.Context, req *runti
return resp, err
}
func (in instrumentedRuntimeService) ImagePullProgress(ctx context.Context, req *runtimeapi.ImagePullProgressRequest) (*runtimeapi.ImagePullProgressResponse, error) {
const operation = "image_pull_progress"
defer recordOperation(operation, time.Now())
resp, err := in.service.ImagePullProgress(ctx, req)
recordError(operation, err)
return resp, err
}
func (in instrumentedRuntimeService) UpdatePodSandboxResources(ctx context.Context, req *runtimeapi.UpdatePodSandboxResourcesRequest) (*runtimeapi.UpdatePodSandboxResourcesResponse, error) {
const operation = "update_podsandbox_resources"
defer recordOperation(operation, time.Now())

View File

@ -380,3 +380,22 @@ func (m *kubeGenericRuntimeManager) GetPortForward(ctx context.Context, podName,
}
return url.Parse(resp.Url)
}
// GetImagePullProgress gets the endpoint the runtime will serve the image-pull-progress request from.
func (m *kubeGenericRuntimeManager) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) {
sandboxIDs, err := m.getSandboxIDByPodUID(ctx, podUID, nil)
if err != nil {
return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)
}
if len(sandboxIDs) == 0 {
return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))
}
req := &runtimeapi.ImagePullProgressRequest{
PodSandboxId: sandboxIDs[0],
}
resp, err := m.runtimeService.ImagePullProgress(ctx, req)
if err != nil {
return nil, err
}
return url.Parse(resp.Url)
}

View File

@ -273,6 +273,21 @@ func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace
return url.Parse(resp.GetUrl())
}
func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
if fk.getPortForwardCheck != nil {
fk.getPortForwardCheck(podName, podNamespace, podUID, portForwardOpts)
}
// Always use testPodSandboxID
resp, err := fk.streamingRuntime.GetPortForward(&runtimeapi.PortForwardRequest{
PodSandboxId: testPodSandboxID,
Port: portForwardOpts.Ports,
})
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
}
// Unused functions
func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }

View File

@ -0,0 +1,24 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package imagepullprogress
type Layer struct {
Name string `json:"name"`
Progress int64 `json:"progress"`
Total int64 `json:"total"`
Error string `json:"error,omitempty"`
}

File diff suppressed because it is too large Load Diff

View File

@ -97,6 +97,8 @@ service RuntimeService {
rpc Attach(AttachRequest) returns (AttachResponse) {}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}
// ImagePullProgress prepares a streaming endpoint to image pull from a PodSandbox.
rpc ImagePullProgress(ImagePullProgressRequest) returns (ImagePullProgressResponse) {}
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
@ -1571,6 +1573,16 @@ message PortForwardResponse {
string url = 1;
}
message ImagePullProgressRequest {
// ID of the pod to which to progresss of the image pull.
string pod_sandbox_id = 1;
}
message ImagePullProgressResponse {
// Fully qualified URL of the image pull progresss streaming server.
string url = 1;
}
message ImageFilter {
// Spec of the image.
ImageSpec image = 1;

View File

@ -54,6 +54,8 @@ type ContainerManager interface {
Exec(ctx context.Context, request *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
// ImagePullProgress prepares a streaming endpoint to image pull progress from a PodSandbox, and returns the address.
ImagePullProgress(ctx context.Context, req *runtimeapi.ImagePullProgressRequest) (*runtimeapi.ImagePullProgressResponse, error)
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. If it returns error, new container log file MUST NOT
// be created.

View File

@ -610,6 +610,15 @@ func (r *remoteRuntimeService) portForwardV1(ctx context.Context, req *runtimeap
return resp, nil
}
// ImagePullProgress prepares a streaming endpoint to image pull progress from a PodSandbox, and returns the address.
func (r *remoteRuntimeService) ImagePullProgress(ctx context.Context, req *runtimeapi.ImagePullProgressRequest) (*runtimeapi.ImagePullProgressResponse, error) {
r.log(10, "[RemoteRuntimeService] ImagePullProgress", "podSandboxID", req.PodSandboxId, "timeout", r.timeout)
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.runtimeClient.ImagePullProgress(ctx, req)
}
// UpdatePodSandboxResources synchronously updates the PodSandboxConfig with
// the pod-level resource configuration.
func (r *remoteRuntimeService) UpdatePodSandboxResources(ctx context.Context, req *runtimeapi.UpdatePodSandboxResourcesRequest) (*runtimeapi.UpdatePodSandboxResourcesResponse, error) {

View File

@ -0,0 +1,24 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package imagepullprogress contains server-side logic for handling image pull progress requests.
package imagepullprogress
// ProtocolV1Name is the name of the subprotocol used for image pull progress.
const ProtocolV1Name = "imagepullprogress.k8s.io"
// SupportedProtocols are the supported image pull progress protocols.
var SupportedProtocols = []string{ProtocolV1Name}

View File

@ -0,0 +1,117 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package imagepullprogress
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/imagepullprogress"
"k8s.io/klog/v2"
)
func handleHTTPStreams(req *http.Request, w http.ResponseWriter, imagePullProgresser ImagePullProgresser, podName string, uid types.UID, supportedImagePullProgressProtocols []string, idleTimeout time.Duration) error {
_, err := httpstream.Handshake(req, w, supportedImagePullProgressProtocols)
// negotiated protocol isn't currently used server side, but could be in the future
if err != nil {
// Handshake writes the error to the client
return err
}
streamChan := make(chan httpstream.Stream, 1)
klog.V(5).InfoS("Upgrading image pull progress response")
upgrader := spdy.NewResponseUpgrader()
conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan))
if conn == nil {
return errors.New("unable to upgrade httpstream connection")
}
defer conn.Close()
klog.V(5).InfoS("Connection setting image pull progress streaming connection idle timeout", "connection", conn, "idleTimeout", idleTimeout)
conn.SetIdleTimeout(idleTimeout)
h := &httpStreamHandler{
conn: conn,
streamChan: streamChan,
pod: podName,
uid: uid,
progresser: imagePullProgresser,
}
h.run(req.Context())
return nil
}
// httpStreamReceived is the httpstream.NewStreamHandler for image pull progress streams.
func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
return func(stream httpstream.Stream, replySent <-chan struct{}) error {
streams <- stream
return nil
}
}
type httpStreamHandler struct {
conn httpstream.Connection
streamChan chan httpstream.Stream
pod string
uid types.UID
progresser ImagePullProgresser
}
func (h *httpStreamHandler) run(ctx context.Context) {
klog.V(5).InfoS("Connection waiting for port forward streams", "connection", h.conn)
Loop:
for {
select {
case <-h.conn.CloseChan():
klog.V(5).InfoS("Connection upgraded connection closed", "connection", h.conn)
break Loop
case stream := <-h.streamChan:
go h.imagePullProgress(ctx, stream)
}
}
}
func (h *httpStreamHandler) imagePullProgress(ctx context.Context, stream httpstream.Stream) {
layerProgress := make(chan imagepullprogress.Layer)
encoder := json.NewEncoder(stream)
go func() {
for layer := range layerProgress {
err := encoder.Encode(layer)
if err != nil {
msg := fmt.Errorf("error sending image pull progress for pod %s, uid %v: %v", h.pod, h.uid, err)
utilruntime.HandleError(msg)
}
}
}()
err := h.progresser.ImagePullProgress(ctx, h.pod, h.uid, layerProgress)
if err != nil {
msg := fmt.Errorf("error get image pull progress for pod %s, uid %v: %v", h.pod, h.uid, err)
utilruntime.HandleError(msg)
}
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package imagepullprogress
import (
"context"
"net/http"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/imagepullprogress"
)
type Options struct {
}
type ImagePullProgresser interface {
ImagePullProgress(ctx context.Context, name string, uid types.UID, layerProgress chan<- imagepullprogress.Layer) error
}
func ServeImagePullProgressed(w http.ResponseWriter, req *http.Request, imagePullProgresser ImagePullProgresser, podName string, uid types.UID, idleTimeout time.Duration, supportedProtocols []string) {
err := handleHTTPStreams(req, w, imagePullProgresser, podName, uid, supportedProtocols, idleTimeout)
if err != nil {
runtime.HandleError(err)
return
}
}

View File

@ -34,8 +34,10 @@ import (
"k8s.io/apimachinery/pkg/types"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
imagepullprogress "k8s.io/client-go/tools/imagepullprogress"
"k8s.io/client-go/tools/remotecommand"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
imagepullprogressserver "k8s.io/kubelet/pkg/cri/streaming/imagepullprogress"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
)
@ -65,6 +67,7 @@ type Runtime interface {
Exec(ctx context.Context, containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
Attach(ctx context.Context, containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
PortForward(ctx context.Context, podSandboxID string, port int32, stream io.ReadWriteCloser) error
ImagePullProgress(ctx context.Context, podSandboxID string, layerProgress chan<- imagepullprogress.Layer) error
}
// Config defines the options used for running the stream server.
@ -131,6 +134,7 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
{"/exec/{token}", s.serveExec},
{"/attach/{token}", s.serveAttach},
{"/portforward/{token}", s.servePortForward},
{"/imagepullprogress/{token}", s.serveImagePullProgress},
}
// If serving relative to a base path, set that here.
pathPrefix := path.Dir(s.config.BaseURL.Path)
@ -231,6 +235,19 @@ func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi
}, nil
}
func (s *server) GetImagePullProgress(req *runtimeapi.ImagePullProgressRequest) (*runtimeapi.ImagePullProgressResponse, error) {
if req.PodSandboxId == "" {
return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id")
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.ImagePullProgressResponse{
Url: s.buildURL("imagepullprogress", token),
}, nil
}
func (s *server) Start(stayUp bool) error {
if !stayUp {
// TODO(tallclair): Implement this.
@ -360,6 +377,30 @@ func (s *server) servePortForward(req *restful.Request, resp *restful.Response)
s.config.SupportedPortForwardProtocols)
}
func (s *server) serveImagePullProgress(req *restful.Request, resp *restful.Response) {
token := req.PathParameter("token")
cachedRequest, ok := s.cache.Consume(token)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest)
if !ok {
http.NotFound(resp.ResponseWriter, req.Request)
return
}
imagepullprogressserver.ServeImagePullProgressed(
resp.ResponseWriter,
req.Request,
s.runtime,
pf.PodSandboxId,
"",
s.config.StreamIdleTimeout,
s.config.SupportedPortForwardProtocols,
)
}
// criAdapter wraps the Runtime functions to conform to the remotecommand interfaces.
// The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name.
type criAdapter struct {
@ -381,3 +422,7 @@ func (a *criAdapter) AttachContainer(ctx context.Context, podName string, podUID
func (a *criAdapter) PortForward(ctx context.Context, podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
return a.Runtime.PortForward(ctx, podName, port, stream)
}
func (a *criAdapter) ImagePullProgress(ctx context.Context, podSandboxID string, podUID types.UID, layerProgress chan<- imagepullprogress.Layer) error {
return a.Runtime.ImagePullProgress(ctx, podSandboxID, layerProgress)
}