mirror of https://github.com/helm/helm.git
ref(*): Refactors kube client to be a bit more friendly
This changes most of the KubeClient interface to only ever build objects once and then pass in everything as lists of resources. As a consequence, we needed to refactor several of the actions. I took the opportunity to refactor out some duplicated code while I was in the same area Signed-off-by: Taylor Thomas <taylor.thomas@microsoft.com>
This commit is contained in:
parent
1dac8421ef
commit
15fc57f8a3
File diff suppressed because it is too large
Load Diff
24
Gopkg.toml
24
Gopkg.toml
|
@ -58,6 +58,18 @@
|
|||
name = "github.com/stretchr/testify"
|
||||
version = "^1.3.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/xeipuuv/gojsonschema"
|
||||
version = "1.1.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/spf13/cobra"
|
||||
version = "0.0.4"
|
||||
|
||||
[[constraint]]
|
||||
name = "sigs.k8s.io/yaml"
|
||||
version = "1.1.0"
|
||||
|
||||
[[override]]
|
||||
name = "sigs.k8s.io/kustomize"
|
||||
version = "2.0.3"
|
||||
|
@ -104,15 +116,3 @@
|
|||
|
||||
[prune]
|
||||
go-tests = true
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/xeipuuv/gojsonschema"
|
||||
version = "1.1.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/spf13/cobra"
|
||||
version = "0.0.4"
|
||||
|
||||
[[constraint]]
|
||||
name = "sigs.k8s.io/yaml"
|
||||
version = "1.1.0"
|
||||
|
|
|
@ -17,8 +17,11 @@ limitations under the License.
|
|||
package action
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -27,6 +30,7 @@ import (
|
|||
"k8s.io/client-go/rest"
|
||||
|
||||
"helm.sh/helm/pkg/chartutil"
|
||||
"helm.sh/helm/pkg/hooks"
|
||||
"helm.sh/helm/pkg/kube"
|
||||
"helm.sh/helm/pkg/registry"
|
||||
"helm.sh/helm/pkg/release"
|
||||
|
@ -190,3 +194,73 @@ type RESTClientGetter interface {
|
|||
ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
|
||||
ToRESTMapper() (meta.RESTMapper, error)
|
||||
}
|
||||
|
||||
// execHooks is a method for exec-ing all hooks of the given type. This is to
|
||||
// avoid duplicate code in various actions
|
||||
func execHooks(client kube.Interface, hs []*release.Hook, hook string, timeout time.Duration) error {
|
||||
executingHooks := []*release.Hook{}
|
||||
|
||||
for _, h := range hs {
|
||||
for _, e := range h.Events {
|
||||
if string(e) == hook {
|
||||
executingHooks = append(executingHooks, h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(hookByWeight(executingHooks))
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(client, h, hooks.BeforeHookCreation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resources, err := client.Build(bytes.NewBufferString(h.Manifest))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to build kubernetes object for %s hook %s", hook, h.Path)
|
||||
}
|
||||
if _, err := client.Create(resources); err != nil {
|
||||
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
|
||||
}
|
||||
|
||||
if err := client.WatchUntilReady(resources, timeout); err != nil {
|
||||
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
|
||||
// under failed condition. If so, then clear the corresponding resource object in the hook
|
||||
if err := deleteHookByPolicy(client, h, hooks.HookFailed); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
|
||||
// under succeeded condition. If so, then clear the corresponding resource object in each hook
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(client, h, hooks.HookSucceeded); err != nil {
|
||||
return err
|
||||
}
|
||||
h.LastRun = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteHookByPolicy deletes a hook if the hook policy instructs it to
|
||||
func deleteHookByPolicy(client kube.Interface, h *release.Hook, policy string) error {
|
||||
if hookHasDeletePolicy(h, policy) {
|
||||
resources, err := client.Build(bytes.NewBufferString(h.Manifest))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to build kubernetes object for deleting hook %s", h.Path)
|
||||
}
|
||||
_, errs := client.Delete(resources)
|
||||
return errors.New(joinErrors(errs))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func joinErrors(errs []error) string {
|
||||
es := make([]string, 0, len(errs))
|
||||
for _, e := range errs {
|
||||
es = append(es, e.Error())
|
||||
}
|
||||
return strings.Join(es, "; ")
|
||||
}
|
||||
|
|
|
@ -19,13 +19,11 @@ package action
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
@ -170,8 +168,10 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
|
|||
|
||||
// Mark this release as in-progress
|
||||
rel.SetStatus(release.StatusPendingInstall, "Initial install underway")
|
||||
if err := i.validateManifest(manifestDoc); err != nil {
|
||||
return rel, err
|
||||
|
||||
resources, err := i.cfg.KubeClient.Build(bytes.NewBufferString(rel.Manifest))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to build kubernetes objects from release manifest")
|
||||
}
|
||||
|
||||
// Bail out here if it is a dry run
|
||||
|
@ -198,7 +198,7 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
|
|||
|
||||
// pre-install hooks
|
||||
if !i.DisableHooks {
|
||||
if err := i.execHook(rel.Hooks, hooks.PreInstall); err != nil {
|
||||
if err := execHooks(i.cfg.KubeClient, rel.Hooks, hooks.PreInstall, i.Timeout); err != nil {
|
||||
return i.failRelease(rel, fmt.Errorf("failed pre-install: %s", err))
|
||||
}
|
||||
}
|
||||
|
@ -206,21 +206,19 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
|
|||
// At this point, we can do the install. Note that before we were detecting whether to
|
||||
// do an update, but it's not clear whether we WANT to do an update if the re-use is set
|
||||
// to true, since that is basically an upgrade operation.
|
||||
buf := bytes.NewBufferString(rel.Manifest)
|
||||
if err := i.cfg.KubeClient.Create(buf); err != nil {
|
||||
if _, err := i.cfg.KubeClient.Create(resources); err != nil {
|
||||
return i.failRelease(rel, err)
|
||||
}
|
||||
|
||||
if i.Wait {
|
||||
buf := bytes.NewBufferString(rel.Manifest)
|
||||
if err := i.cfg.KubeClient.Wait(buf, i.Timeout); err != nil {
|
||||
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
|
||||
return i.failRelease(rel, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if !i.DisableHooks {
|
||||
if err := i.execHook(rel.Hooks, hooks.PostInstall); err != nil {
|
||||
if err := execHooks(i.cfg.KubeClient, rel.Hooks, hooks.PostInstall, i.Timeout); err != nil {
|
||||
return i.failRelease(rel, fmt.Errorf("failed post-install: %s", err))
|
||||
}
|
||||
}
|
||||
|
@ -455,60 +453,6 @@ func ensureDirectoryForFile(file string) error {
|
|||
return os.MkdirAll(baseDir, defaultDirectoryPermission)
|
||||
}
|
||||
|
||||
// validateManifest checks to see whether the given manifest is valid for the current Kubernetes
|
||||
func (i *Install) validateManifest(manifest io.Reader) error {
|
||||
_, err := i.cfg.KubeClient.BuildUnstructured(manifest)
|
||||
return err
|
||||
}
|
||||
|
||||
// execHook executes all of the hooks for the given hook event.
|
||||
func (i *Install) execHook(hs []*release.Hook, hook string) error {
|
||||
executingHooks := []*release.Hook{}
|
||||
|
||||
for _, h := range hs {
|
||||
for _, e := range h.Events {
|
||||
if string(e) == hook {
|
||||
executingHooks = append(executingHooks, h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(hookByWeight(executingHooks))
|
||||
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(i.cfg, h, hooks.BeforeHookCreation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b := bytes.NewBufferString(h.Manifest)
|
||||
if err := i.cfg.KubeClient.Create(b); err != nil {
|
||||
return errors.Wrapf(err, "warning: Release %s %s %s failed", i.ReleaseName, hook, h.Path)
|
||||
}
|
||||
b.Reset()
|
||||
b.WriteString(h.Manifest)
|
||||
|
||||
if err := i.cfg.KubeClient.WatchUntilReady(b, i.Timeout); err != nil {
|
||||
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
|
||||
// under failed condition. If so, then clear the corresponding resource object in the hook
|
||||
if err := deleteHookByPolicy(i.cfg, h, hooks.HookFailed); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
|
||||
// under succeeded condition. If so, then clear the corresponding resource object in each hook
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(i.cfg, h, hooks.HookSucceeded); err != nil {
|
||||
return err
|
||||
}
|
||||
h.LastRun = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deletePolices represents a mapping between the key in the annotation for label deleting policy and its real meaning
|
||||
// FIXME: Can we refactor this out?
|
||||
var deletePolices = map[string]release.HookDeletePolicy{
|
||||
|
|
|
@ -19,7 +19,6 @@ package action
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -132,25 +131,32 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele
|
|||
}
|
||||
|
||||
func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) {
|
||||
|
||||
if r.DryRun {
|
||||
r.cfg.Log("dry run for %s", targetRelease.Name)
|
||||
return targetRelease, nil
|
||||
}
|
||||
|
||||
current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest))
|
||||
if err != nil {
|
||||
return targetRelease, errors.Wrap(err, "unable to build kubernetes objects from current release manifest")
|
||||
}
|
||||
target, err := r.cfg.KubeClient.Build(bytes.NewBufferString(targetRelease.Manifest))
|
||||
if err != nil {
|
||||
return targetRelease, errors.Wrap(err, "unable to build kubernetes objects from new release manifest")
|
||||
}
|
||||
|
||||
// pre-rollback hooks
|
||||
if !r.DisableHooks {
|
||||
if err := r.execHook(targetRelease.Hooks, hooks.PreRollback); err != nil {
|
||||
if err := execHooks(r.cfg.KubeClient, targetRelease.Hooks, hooks.PreRollback, r.Timeout); err != nil {
|
||||
return targetRelease, err
|
||||
}
|
||||
} else {
|
||||
r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name)
|
||||
}
|
||||
|
||||
cr := bytes.NewBufferString(currentRelease.Manifest)
|
||||
tr := bytes.NewBufferString(targetRelease.Manifest)
|
||||
results, err := r.cfg.KubeClient.Update(current, target, r.Force)
|
||||
|
||||
if err := r.cfg.KubeClient.Update(cr, tr, r.Force, r.Recreate); err != nil {
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
|
||||
r.cfg.Log("warning: %s", msg)
|
||||
currentRelease.Info.Status = release.StatusSuperseded
|
||||
|
@ -161,9 +167,18 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
|
|||
return targetRelease, err
|
||||
}
|
||||
|
||||
if r.Recreate {
|
||||
// NOTE: Because this is not critical for a release to succeed, we just
|
||||
// log if an error occurs and continue onward. If we ever introduce log
|
||||
// levels, we should make these error level logs so users are notified
|
||||
// that they'll need to go do the cleanup on their own
|
||||
if err := recreate(r.cfg.KubeClient, results.Updated); err != nil {
|
||||
r.cfg.Log(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if r.Wait {
|
||||
buf := bytes.NewBufferString(targetRelease.Manifest)
|
||||
if err := r.cfg.KubeClient.Wait(buf, r.Timeout); err != nil {
|
||||
if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil {
|
||||
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
|
||||
r.cfg.recordRelease(currentRelease)
|
||||
r.cfg.recordRelease(targetRelease)
|
||||
|
@ -173,7 +188,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
|
|||
|
||||
// post-rollback hooks
|
||||
if !r.DisableHooks {
|
||||
if err := r.execHook(targetRelease.Hooks, hooks.PostRollback); err != nil {
|
||||
if err := execHooks(r.cfg.KubeClient, targetRelease.Hooks, hooks.PostRollback, r.Timeout); err != nil {
|
||||
return targetRelease, err
|
||||
}
|
||||
}
|
||||
|
@ -193,61 +208,3 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
|
|||
|
||||
return targetRelease, nil
|
||||
}
|
||||
|
||||
// execHook executes all of the hooks for the given hook event.
|
||||
func (r *Rollback) execHook(hs []*release.Hook, hook string) error {
|
||||
timeout := r.Timeout
|
||||
executingHooks := []*release.Hook{}
|
||||
|
||||
for _, h := range hs {
|
||||
for _, e := range h.Events {
|
||||
if string(e) == hook {
|
||||
executingHooks = append(executingHooks, h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(hookByWeight(executingHooks))
|
||||
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(r.cfg, h, hooks.BeforeHookCreation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b := bytes.NewBufferString(h.Manifest)
|
||||
if err := r.cfg.KubeClient.Create(b); err != nil {
|
||||
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
|
||||
}
|
||||
b.Reset()
|
||||
b.WriteString(h.Manifest)
|
||||
|
||||
if err := r.cfg.KubeClient.WatchUntilReady(b, timeout); err != nil {
|
||||
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
|
||||
// under failed condition. If so, then clear the corresponding resource object in the hook
|
||||
if err := deleteHookByPolicy(r.cfg, h, hooks.HookFailed); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
|
||||
// under succeeded condition. If so, then clear the corresponding resource object in each hook
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(r.cfg, h, hooks.HookSucceeded); err != nil {
|
||||
return err
|
||||
}
|
||||
h.LastRun = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteHookByPolicy deletes a hook if the hook policy instructs it to
|
||||
func deleteHookByPolicy(cfg *Configuration, h *release.Hook, policy string) error {
|
||||
if hookHasDeletePolicy(h, policy) {
|
||||
b := bytes.NewBufferString(h.Manifest)
|
||||
return cfg.KubeClient.Delete(b)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,15 +17,12 @@ limitations under the License.
|
|||
package action
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"helm.sh/helm/pkg/hooks"
|
||||
"helm.sh/helm/pkg/kube"
|
||||
"helm.sh/helm/pkg/release"
|
||||
"helm.sh/helm/pkg/releaseutil"
|
||||
)
|
||||
|
@ -94,7 +91,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
|
|||
res := &release.UninstallReleaseResponse{Release: rel}
|
||||
|
||||
if !u.DisableHooks {
|
||||
if err := u.execHook(rel.Hooks, hooks.PreDelete); err != nil {
|
||||
if err := execHooks(u.cfg.KubeClient, rel.Hooks, hooks.PreDelete, u.Timeout); err != nil {
|
||||
return res, err
|
||||
}
|
||||
} else {
|
||||
|
@ -111,7 +108,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
|
|||
res.Info = kept
|
||||
|
||||
if !u.DisableHooks {
|
||||
if err := u.execHook(rel.Hooks, hooks.PostDelete); err != nil {
|
||||
if err := execHooks(u.cfg.KubeClient, rel.Hooks, hooks.PostDelete, u.Timeout); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
@ -153,64 +150,8 @@ func (u *Uninstall) purgeReleases(rels ...*release.Release) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func joinErrors(errs []error) string {
|
||||
es := make([]string, 0, len(errs))
|
||||
for _, e := range errs {
|
||||
es = append(es, e.Error())
|
||||
}
|
||||
return strings.Join(es, "; ")
|
||||
}
|
||||
|
||||
// execHook executes all of the hooks for the given hook event.
|
||||
func (u *Uninstall) execHook(hs []*release.Hook, hook string) error {
|
||||
executingHooks := []*release.Hook{}
|
||||
|
||||
for _, h := range hs {
|
||||
for _, e := range h.Events {
|
||||
if string(e) == hook {
|
||||
executingHooks = append(executingHooks, h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(hookByWeight(executingHooks))
|
||||
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(u.cfg, h, hooks.BeforeHookCreation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b := bytes.NewBufferString(h.Manifest)
|
||||
if err := u.cfg.KubeClient.Create(b); err != nil {
|
||||
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
|
||||
}
|
||||
b.Reset()
|
||||
b.WriteString(h.Manifest)
|
||||
|
||||
if err := u.cfg.KubeClient.WatchUntilReady(b, u.Timeout); err != nil {
|
||||
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
|
||||
// under failed condition. If so, then clear the corresponding resource object in the hook
|
||||
if err := deleteHookByPolicy(u.cfg, h, hooks.HookFailed); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
|
||||
// under succeeded condition. If so, then clear the corresponding resource object in each hook
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(u.cfg, h, hooks.HookSucceeded); err != nil {
|
||||
return err
|
||||
}
|
||||
h.LastRun = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteRelease deletes the release and returns manifests that were kept in the deletion process
|
||||
func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []error) {
|
||||
func (u *Uninstall) deleteRelease(rel *release.Release) (string, []error) {
|
||||
caps, err := u.cfg.getCapabilities()
|
||||
if err != nil {
|
||||
return rel.Manifest, []error{errors.Wrap(err, "could not get apiVersions from Kubernetes")}
|
||||
|
@ -227,23 +168,20 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err
|
|||
}
|
||||
|
||||
filesToKeep, filesToDelete := filterManifestsToKeep(files)
|
||||
var kept string
|
||||
for _, f := range filesToKeep {
|
||||
kept += f.Name + "\n"
|
||||
}
|
||||
|
||||
var builder strings.Builder
|
||||
for _, file := range filesToDelete {
|
||||
b := bytes.NewBufferString(strings.TrimSpace(file.Content))
|
||||
if b.Len() == 0 {
|
||||
continue
|
||||
}
|
||||
if err := u.cfg.KubeClient.Delete(b); err != nil {
|
||||
u.cfg.Log("uninstall: Failed deletion of %q: %s", rel.Name, err)
|
||||
if err == kube.ErrNoObjectsVisited {
|
||||
// Rewrite the message from "no objects visited"
|
||||
err = errors.New("object not found, skipping delete")
|
||||
}
|
||||
errs = append(errs, err)
|
||||
}
|
||||
builder.WriteString(file.Content)
|
||||
}
|
||||
resources, err := u.cfg.KubeClient.Build(strings.NewReader(builder.String()))
|
||||
if err != nil {
|
||||
return "", []error{errors.Wrap(err, "unable to build kubernetes objects for delete")}
|
||||
}
|
||||
|
||||
_, errs := u.cfg.KubeClient.Delete(resources)
|
||||
return kept, errs
|
||||
}
|
||||
|
|
|
@ -19,10 +19,10 @@ package action
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"helm.sh/helm/pkg/chart"
|
||||
"helm.sh/helm/pkg/chartutil"
|
||||
|
@ -76,7 +76,7 @@ func (u *Upgrade) Run(name string, chart *chart.Chart) (*release.Release, error)
|
|||
u.Wait = u.Wait || u.Atomic
|
||||
|
||||
if err := validateReleaseName(name); err != nil {
|
||||
return nil, errors.Errorf("upgradeRelease: Release name is invalid: %s", name)
|
||||
return nil, errors.Errorf("release name is invalid: %s", name)
|
||||
}
|
||||
u.cfg.Log("preparing upgrade for %s", name)
|
||||
currentRelease, upgradedRelease, err := u.prepareUpgrade(name, chart)
|
||||
|
@ -199,22 +199,42 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
|
|||
return upgradedRelease, nil
|
||||
}
|
||||
|
||||
current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest))
|
||||
if err != nil {
|
||||
return upgradedRelease, errors.Wrap(err, "unable to build kubernetes objects from current release manifest")
|
||||
}
|
||||
target, err := u.cfg.KubeClient.Build(bytes.NewBufferString(upgradedRelease.Manifest))
|
||||
if err != nil {
|
||||
return upgradedRelease, errors.Wrap(err, "unable to build kubernetes objects from new release manifest")
|
||||
}
|
||||
|
||||
// pre-upgrade hooks
|
||||
if !u.DisableHooks {
|
||||
if err := u.execHook(upgradedRelease.Hooks, hooks.PreUpgrade); err != nil {
|
||||
if err := execHooks(u.cfg.KubeClient, upgradedRelease.Hooks, hooks.PreUpgrade, u.Timeout); err != nil {
|
||||
return u.failRelease(upgradedRelease, fmt.Errorf("pre-upgrade hooks failed: %s", err))
|
||||
}
|
||||
} else {
|
||||
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
|
||||
}
|
||||
if err := u.upgradeRelease(originalRelease, upgradedRelease); err != nil {
|
||||
|
||||
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
|
||||
if err != nil {
|
||||
u.cfg.recordRelease(originalRelease)
|
||||
return u.failRelease(upgradedRelease, err)
|
||||
}
|
||||
|
||||
if u.Recreate {
|
||||
// NOTE: Because this is not critical for a release to succeed, we just
|
||||
// log if an error occurs and continue onward. If we ever introduce log
|
||||
// levels, we should make these error level logs so users are notified
|
||||
// that they'll need to go do the cleanup on their own
|
||||
if err := recreate(u.cfg.KubeClient, results.Updated); err != nil {
|
||||
u.cfg.Log(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if u.Wait {
|
||||
buf := bytes.NewBufferString(upgradedRelease.Manifest)
|
||||
if err := u.cfg.KubeClient.Wait(buf, u.Timeout); err != nil {
|
||||
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
|
||||
u.cfg.recordRelease(originalRelease)
|
||||
return u.failRelease(upgradedRelease, err)
|
||||
}
|
||||
|
@ -222,7 +242,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
|
|||
|
||||
// post-upgrade hooks
|
||||
if !u.DisableHooks {
|
||||
if err := u.execHook(upgradedRelease.Hooks, hooks.PostUpgrade); err != nil {
|
||||
if err := execHooks(u.cfg.KubeClient, upgradedRelease.Hooks, hooks.PostUpgrade, u.Timeout); err != nil {
|
||||
return u.failRelease(upgradedRelease, fmt.Errorf("post-upgrade hooks failed: %s", err))
|
||||
}
|
||||
}
|
||||
|
@ -282,14 +302,6 @@ func (u *Upgrade) failRelease(rel *release.Release, err error) (*release.Release
|
|||
return rel, err
|
||||
}
|
||||
|
||||
// upgradeRelease performs an upgrade from current to target release
|
||||
func (u *Upgrade) upgradeRelease(current, target *release.Release) error {
|
||||
cm := bytes.NewBufferString(current.Manifest)
|
||||
tm := bytes.NewBufferString(target.Manifest)
|
||||
// TODO add wait
|
||||
return u.cfg.KubeClient.Update(cm, tm, u.Force, u.Recreate)
|
||||
}
|
||||
|
||||
// reuseValues copies values from the current release to a new release if the
|
||||
// new release does not have any values.
|
||||
//
|
||||
|
@ -334,50 +346,38 @@ func validateManifest(c kube.Interface, manifest []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// execHook executes all of the hooks for the given hook event.
|
||||
func (u *Upgrade) execHook(hs []*release.Hook, hook string) error {
|
||||
timeout := u.Timeout
|
||||
executingHooks := []*release.Hook{}
|
||||
// recreate captures all the logic for recreating pods for both upgrade and
|
||||
// rollback. If we end up refactoring rollback to use upgrade, this can just be
|
||||
// made an unexported method on the upgrade action.
|
||||
func recreate(client kube.Interface, resources kube.ResourceList) error {
|
||||
for _, res := range resources {
|
||||
versioned := kube.AsVersioned(res)
|
||||
selector, err := kube.SelectorsForObject(versioned)
|
||||
if err != nil {
|
||||
// If no selector is returned, it means this object is
|
||||
// definitely not a pod, so continue onward
|
||||
continue
|
||||
}
|
||||
|
||||
for _, h := range hs {
|
||||
for _, e := range h.Events {
|
||||
if string(e) == hook {
|
||||
executingHooks = append(executingHooks, h)
|
||||
client, err := client.KubernetesClientSet()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
|
||||
}
|
||||
|
||||
pods, err := client.CoreV1().Pods(res.Namespace).List(metav1.ListOptions{
|
||||
LabelSelector: selector.String(),
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
|
||||
}
|
||||
|
||||
// Restart pods
|
||||
for _, pod := range pods.Items {
|
||||
// Delete each pod for get them restarted with changed spec.
|
||||
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
|
||||
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(hookByWeight(executingHooks))
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(u.cfg, h, hooks.BeforeHookCreation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b := bytes.NewBufferString(h.Manifest)
|
||||
if err := u.cfg.KubeClient.Create(b); err != nil {
|
||||
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
|
||||
}
|
||||
b.Reset()
|
||||
b.WriteString(h.Manifest)
|
||||
|
||||
if err := u.cfg.KubeClient.WatchUntilReady(b, timeout); err != nil {
|
||||
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
|
||||
// under failed condition. If so, then clear the corresponding resource object in the hook
|
||||
if err := deleteHookByPolicy(u.cfg, h, hooks.HookFailed); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
|
||||
// under succeeded condition. If so, then clear the corresponding resource object in each hook
|
||||
for _, h := range executingHooks {
|
||||
if err := deleteHookByPolicy(u.cfg, h, hooks.HookSucceeded); err != nil {
|
||||
return err
|
||||
}
|
||||
h.LastRun = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import (
|
|||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
)
|
||||
|
@ -65,30 +64,23 @@ func New(getter genericclioptions.RESTClientGetter) *Client {
|
|||
}
|
||||
|
||||
// KubernetesClientSet returns a client set from the client factory.
|
||||
func (c *Client) KubernetesClientSet() (*kubernetes.Clientset, error) {
|
||||
func (c *Client) KubernetesClientSet() (kubernetes.Interface, error) {
|
||||
return c.Factory.KubernetesClientSet()
|
||||
}
|
||||
|
||||
var nopLogger = func(_ string, _ ...interface{}) {}
|
||||
|
||||
// Create creates Kubernetes resources from an io.reader.
|
||||
//
|
||||
// Namespace will set the namespace.
|
||||
func (c *Client) Create(reader io.Reader) error {
|
||||
c.Log("building resources from manifest")
|
||||
infos, err := c.BuildUnstructured(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
// Create creates Kubernetes resources specified in the resource list.
|
||||
func (c *Client) Create(resources ResourceList) (*Result, error) {
|
||||
c.Log("creating %d resource(s)", len(resources))
|
||||
if err := perform(resources, createResource); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Log("creating %d resource(s)", len(infos))
|
||||
return perform(infos, createResource)
|
||||
return &Result{Created: resources}, nil
|
||||
}
|
||||
|
||||
func (c *Client) Wait(reader io.Reader, timeout time.Duration) error {
|
||||
infos, err := c.BuildUnstructured(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Wait up to the given timeout for the specified resources to be ready
|
||||
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
|
||||
cs, err := c.KubernetesClientSet()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -98,7 +90,7 @@ func (c *Client) Wait(reader io.Reader, timeout time.Duration) error {
|
|||
log: c.Log,
|
||||
timeout: timeout,
|
||||
}
|
||||
return w.waitForResources(infos)
|
||||
return w.waitForResources(resources)
|
||||
}
|
||||
|
||||
func (c *Client) namespace() string {
|
||||
|
@ -125,8 +117,8 @@ func (c *Client) validator() resource.ContentValidator {
|
|||
return schema
|
||||
}
|
||||
|
||||
// BuildUnstructured validates for Kubernetes objects and returns unstructured infos.
|
||||
func (c *Client) BuildUnstructured(reader io.Reader) (ResourceList, error) {
|
||||
// Build validates for Kubernetes objects and returns unstructured infos.
|
||||
func (c *Client) Build(reader io.Reader) (ResourceList, error) {
|
||||
result, err := c.newBuilder().
|
||||
Unstructured().
|
||||
Stream(reader, "").
|
||||
|
@ -134,39 +126,16 @@ func (c *Client) BuildUnstructured(reader io.Reader) (ResourceList, error) {
|
|||
return result, scrubValidationError(err)
|
||||
}
|
||||
|
||||
// Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
|
||||
func (c *Client) Build(reader io.Reader) (ResourceList, error) {
|
||||
result, err := c.newBuilder().
|
||||
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
|
||||
Schema(c.validator()).
|
||||
Stream(reader, "").
|
||||
Do().
|
||||
Infos()
|
||||
return result, scrubValidationError(err)
|
||||
}
|
||||
|
||||
// Update reads in the current configuration and a target configuration from io.reader
|
||||
// and creates resources that don't already exists, updates resources that have been modified
|
||||
// in the target configuration and deletes resources from the current configuration that are
|
||||
// not present in the target configuration.
|
||||
//
|
||||
// Namespace will set the namespaces.
|
||||
func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate bool) error {
|
||||
original, err := c.BuildUnstructured(originalReader)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed decoding reader into objects")
|
||||
}
|
||||
|
||||
c.Log("building resources from updated manifest")
|
||||
target, err := c.BuildUnstructured(targetReader)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed decoding reader into objects")
|
||||
}
|
||||
|
||||
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
|
||||
updateErrors := []string{}
|
||||
res := &Result{}
|
||||
|
||||
c.Log("checking %d resources for changes", len(target))
|
||||
err = target.Visit(func(info *resource.Info, err error) error {
|
||||
err := target.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -182,6 +151,9 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
|
|||
return errors.Wrap(err, "failed to create resource")
|
||||
}
|
||||
|
||||
// Append the created resource to the results
|
||||
res.Created = append(res.Created, info)
|
||||
|
||||
kind := info.Mapping.GroupVersionKind.Kind
|
||||
c.Log("Created a new %s called %q\n", kind, info.Name)
|
||||
return nil
|
||||
|
@ -193,43 +165,64 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
|
|||
return errors.Errorf("no %s with the name %q found", kind, info.Name)
|
||||
}
|
||||
|
||||
if err := updateResource(c, info, originalInfo.Object, force, recreate); err != nil {
|
||||
if err := updateResource(c, info, originalInfo.Object, force); err != nil {
|
||||
c.Log("error updating the resource %q:\n\t %v", info.Name, err)
|
||||
updateErrors = append(updateErrors, err.Error())
|
||||
}
|
||||
// Because we check for errors later, append the info regardless
|
||||
res.Updated = append(res.Updated, info)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
switch {
|
||||
case err != nil:
|
||||
return err
|
||||
return nil, err
|
||||
case len(updateErrors) != 0:
|
||||
return errors.Errorf(strings.Join(updateErrors, " && "))
|
||||
return nil, errors.Errorf(strings.Join(updateErrors, " && "))
|
||||
}
|
||||
|
||||
for _, info := range original.Difference(target) {
|
||||
c.Log("Deleting %q in %s...", info.Name, info.Namespace)
|
||||
if err := deleteResource(info); err != nil {
|
||||
c.Log("Failed to delete %q, err: %s", info.Name, err)
|
||||
} else {
|
||||
// Only append ones we succeeded in deleting
|
||||
res.Deleted = append(res.Deleted, info)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Delete deletes Kubernetes resources from an io.reader.
|
||||
//
|
||||
// Namespace will set the namespace.
|
||||
func (c *Client) Delete(reader io.Reader) error {
|
||||
infos, err := c.BuildUnstructured(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return perform(infos, func(info *resource.Info) error {
|
||||
// Delete deletes Kubernetes resources specified in the resources list. It will
|
||||
// attempt to delete all resources even if one or more fail and collect any
|
||||
// errors. All successfully deleted items will be returned in the `Deleted`
|
||||
// ResourceList that is part of the result.
|
||||
func (c *Client) Delete(resources ResourceList) (*Result, []error) {
|
||||
var errs []error
|
||||
res := &Result{}
|
||||
err := perform(resources, func(info *resource.Info) error {
|
||||
c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
|
||||
err := deleteResource(info)
|
||||
return c.skipIfNotFound(err)
|
||||
if err := c.skipIfNotFound(deleteResource(info)); err != nil {
|
||||
// Collect the error and continue on
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
res.Deleted = append(res.Deleted, info)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
// Rewrite the message from "no objects visited" if that is what we got
|
||||
// back
|
||||
if err == ErrNoObjectsVisited {
|
||||
err = errors.New("object not found, skipping delete")
|
||||
}
|
||||
errs = append(errs, err)
|
||||
}
|
||||
if errs != nil {
|
||||
return nil, errs
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Client) skipIfNotFound(err error) error {
|
||||
|
@ -246,7 +239,7 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
|
|||
}
|
||||
}
|
||||
|
||||
// WatchUntilReady watches the resource given in the reader, and waits until it is ready.
|
||||
// WatchUntilReady watches the resources given and waits until it is ready.
|
||||
//
|
||||
// This function is mainly for hook implementations. It watches for a resource to
|
||||
// hit a particular milestone. The milestone depends on the Kind.
|
||||
|
@ -258,14 +251,10 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
|
|||
// ascertained by watching the Status fields in a job's output.
|
||||
//
|
||||
// Handling for other kinds will be added as necessary.
|
||||
func (c *Client) WatchUntilReady(reader io.Reader, timeout time.Duration) error {
|
||||
infos, err := c.Build(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
|
||||
// For jobs, there's also the option to do poll c.Jobs(namespace).Get():
|
||||
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
|
||||
return perform(infos, c.watchTimeout(timeout))
|
||||
return perform(resources, c.watchTimeout(timeout))
|
||||
}
|
||||
|
||||
func perform(infos ResourceList, fn func(*resource.Info) error) error {
|
||||
|
@ -315,7 +304,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
|
|||
}
|
||||
|
||||
// Get a versioned object
|
||||
versionedObject := asVersioned(target)
|
||||
versionedObject := AsVersioned(target)
|
||||
|
||||
// Unstructured objects, such as CRDs, may not have an not registered error
|
||||
// returned from ConvertToVersion. Anything that's unstructured should
|
||||
|
@ -330,7 +319,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
|
|||
return patch, types.StrategicMergePatchType, err
|
||||
}
|
||||
|
||||
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force, recreate bool) error {
|
||||
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
|
||||
patch, patchType, err := createPatch(target, currentObj)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create patch")
|
||||
|
@ -377,37 +366,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
|
|||
}
|
||||
}
|
||||
|
||||
if !recreate {
|
||||
return nil
|
||||
}
|
||||
|
||||
versioned := asVersioned(target)
|
||||
selector, err := selectorsForObject(versioned)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
client, err := c.KubernetesClientSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pods, err := client.CoreV1().Pods(target.Namespace).List(metav1.ListOptions{
|
||||
LabelSelector: selector.String(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Restart pods
|
||||
for _, pod := range pods.Items {
|
||||
c.Log("Restarting pod: %v/%v", pod.Namespace, pod.Name)
|
||||
|
||||
// Delete each pod for get them restarted with changed spec.
|
||||
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -142,7 +142,16 @@ func TestUpdate(t *testing.T) {
|
|||
}
|
||||
}),
|
||||
}
|
||||
if err := c.Update(objBody(&listA), objBody(&listB), false, false); err != nil {
|
||||
first, err := c.Build(objBody(&listA))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
second, err := c.Build(objBody(&listB))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := c.Update(first, second, false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// TODO: Find a way to test methods that use Client Set
|
||||
|
@ -188,11 +197,6 @@ func TestBuild(t *testing.T) {
|
|||
namespace: "test",
|
||||
reader: strings.NewReader(guestbookManifest),
|
||||
count: 6,
|
||||
}, {
|
||||
name: "Invalid schema",
|
||||
namespace: "test",
|
||||
reader: strings.NewReader(testInvalidServiceManifest),
|
||||
err: true,
|
||||
}, {
|
||||
name: "Valid input, deploying resources into different namespaces",
|
||||
namespace: "test",
|
||||
|
@ -272,23 +276,40 @@ func TestPerform(t *testing.T) {
|
|||
func TestReal(t *testing.T) {
|
||||
t.Skip("This is a live test, comment this line to run")
|
||||
c := New(nil)
|
||||
if err := c.Create(strings.NewReader(guestbookManifest)); err != nil {
|
||||
resources, err := c.Build(strings.NewReader(guestbookManifest))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Create(resources); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest
|
||||
c = New(nil)
|
||||
if err := c.Create(strings.NewReader(testSvcEndpointManifest)); err != nil {
|
||||
resources, err = c.Build(strings.NewReader(testSvcEndpointManifest))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Create(resources); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := c.Delete(strings.NewReader(testEndpointManifest)); err != nil {
|
||||
resources, err = c.Build(strings.NewReader(testEndpointManifest))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, errs := c.Delete(resources); errs != nil {
|
||||
t.Fatal(errs)
|
||||
}
|
||||
|
||||
resources, err = c.Build(strings.NewReader(testSvcEndpointManifest))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// ensures that delete does not fail if a resource is not found
|
||||
if err := c.Delete(strings.NewReader(testSvcEndpointManifest)); err != nil {
|
||||
t.Fatal(err)
|
||||
if _, errs := c.Delete(resources); errs != nil {
|
||||
t.Fatal(errs)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,9 @@ import (
|
|||
"k8s.io/client-go/kubernetes/scheme"
|
||||
)
|
||||
|
||||
func asVersioned(info *resource.Info) runtime.Object {
|
||||
// AsVersioned converts the given info into a runtime.Object with the correct
|
||||
// group and version set
|
||||
func AsVersioned(info *resource.Info) runtime.Object {
|
||||
gv := runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups()))
|
||||
if info.Mapping != nil {
|
||||
gv = info.Mapping.GroupVersionKind.GroupVersion()
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"helm.sh/helm/pkg/kube"
|
||||
)
|
||||
|
@ -34,7 +35,6 @@ type FailingKubeClient struct {
|
|||
PrintingKubeClient
|
||||
CreateError error
|
||||
WaitError error
|
||||
GetError error
|
||||
DeleteError error
|
||||
WatchUntilReadyError error
|
||||
UpdateError error
|
||||
|
@ -44,51 +44,43 @@ type FailingKubeClient struct {
|
|||
}
|
||||
|
||||
// Create returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) Create(r io.Reader) error {
|
||||
func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
|
||||
if f.CreateError != nil {
|
||||
return f.CreateError
|
||||
return nil, f.CreateError
|
||||
}
|
||||
return f.PrintingKubeClient.Create(r)
|
||||
return f.PrintingKubeClient.Create(resources)
|
||||
}
|
||||
|
||||
// Wait returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) Wait(r io.Reader, d time.Duration) error {
|
||||
func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error {
|
||||
if f.WaitError != nil {
|
||||
return f.WaitError
|
||||
}
|
||||
return f.PrintingKubeClient.Wait(r, d)
|
||||
}
|
||||
|
||||
// Create returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) Get(r io.Reader) (string, error) {
|
||||
if f.GetError != nil {
|
||||
return "", f.GetError
|
||||
}
|
||||
return f.PrintingKubeClient.Get(r)
|
||||
return f.PrintingKubeClient.Wait(resources, d)
|
||||
}
|
||||
|
||||
// Delete returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) Delete(r io.Reader) error {
|
||||
func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
|
||||
if f.DeleteError != nil {
|
||||
return f.DeleteError
|
||||
return nil, []error{f.DeleteError}
|
||||
}
|
||||
return f.PrintingKubeClient.Delete(r)
|
||||
return f.PrintingKubeClient.Delete(resources)
|
||||
}
|
||||
|
||||
// WatchUntilReady returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) WatchUntilReady(r io.Reader, d time.Duration) error {
|
||||
func (f *FailingKubeClient) WatchUntilReady(resources kube.ResourceList, d time.Duration) error {
|
||||
if f.WatchUntilReadyError != nil {
|
||||
return f.WatchUntilReadyError
|
||||
}
|
||||
return f.PrintingKubeClient.WatchUntilReady(r, d)
|
||||
return f.PrintingKubeClient.WatchUntilReady(resources, d)
|
||||
}
|
||||
|
||||
// Update returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) Update(r, modifiedReader io.Reader, not, needed bool) error {
|
||||
func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) {
|
||||
if f.UpdateError != nil {
|
||||
return f.UpdateError
|
||||
return nil, f.UpdateError
|
||||
}
|
||||
return f.PrintingKubeClient.Update(r, modifiedReader, not, needed)
|
||||
return f.PrintingKubeClient.Update(r, modified, ignoreMe)
|
||||
}
|
||||
|
||||
// Build returns the configured error if set or prints
|
||||
|
@ -99,12 +91,9 @@ func (f *FailingKubeClient) Build(r io.Reader) (kube.ResourceList, error) {
|
|||
return f.PrintingKubeClient.Build(r)
|
||||
}
|
||||
|
||||
// BuildUnstructured returns the configured error if set or prints
|
||||
func (f *FailingKubeClient) BuildUnstructured(r io.Reader) (kube.ResourceList, error) {
|
||||
if f.BuildUnstructuredError != nil {
|
||||
return []*resource.Info{}, f.BuildUnstructuredError
|
||||
}
|
||||
return f.PrintingKubeClient.Build(r)
|
||||
// KubernetesClientSet implements the KubeClient interface
|
||||
func (f *FailingKubeClient) KubernetesClientSet() (kubernetes.Interface, error) {
|
||||
return f.PrintingKubeClient.KubernetesClientSet()
|
||||
}
|
||||
|
||||
// WaitAndGetCompletedPodPhase returns the configured error if set or prints
|
||||
|
|
|
@ -18,10 +18,13 @@ package fake
|
|||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"helm.sh/helm/pkg/kube"
|
||||
)
|
||||
|
@ -33,40 +36,46 @@ type PrintingKubeClient struct {
|
|||
}
|
||||
|
||||
// Create prints the values of what would be created with a real KubeClient.
|
||||
func (p *PrintingKubeClient) Create(r io.Reader) error {
|
||||
_, err := io.Copy(p.Out, r)
|
||||
return err
|
||||
func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
|
||||
_, err := io.Copy(p.Out, bufferize(resources))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &kube.Result{Created: resources}, nil
|
||||
}
|
||||
|
||||
func (p *PrintingKubeClient) Wait(r io.Reader, _ time.Duration) error {
|
||||
_, err := io.Copy(p.Out, r)
|
||||
func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error {
|
||||
_, err := io.Copy(p.Out, bufferize(resources))
|
||||
return err
|
||||
}
|
||||
|
||||
// Get prints the values of what would be created with a real KubeClient.
|
||||
func (p *PrintingKubeClient) Get(r io.Reader) (string, error) {
|
||||
_, err := io.Copy(p.Out, r)
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Delete implements KubeClient delete.
|
||||
//
|
||||
// It only prints out the content to be deleted.
|
||||
func (p *PrintingKubeClient) Delete(r io.Reader) error {
|
||||
_, err := io.Copy(p.Out, r)
|
||||
return err
|
||||
func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
|
||||
_, err := io.Copy(p.Out, bufferize(resources))
|
||||
if err != nil {
|
||||
return nil, []error{err}
|
||||
}
|
||||
return &kube.Result{Deleted: resources}, nil
|
||||
}
|
||||
|
||||
// WatchUntilReady implements KubeClient WatchUntilReady.
|
||||
func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, _ time.Duration) error {
|
||||
_, err := io.Copy(p.Out, r)
|
||||
func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time.Duration) error {
|
||||
_, err := io.Copy(p.Out, bufferize(resources))
|
||||
return err
|
||||
}
|
||||
|
||||
// Update implements KubeClient Update.
|
||||
func (p *PrintingKubeClient) Update(_, modifiedReader io.Reader, _, _ bool) error {
|
||||
_, err := io.Copy(p.Out, modifiedReader)
|
||||
return err
|
||||
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
|
||||
_, err := io.Copy(p.Out, bufferize(modified))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: This doesn't completely mock out have some that get created,
|
||||
// updated, and deleted. I don't think these are used in any unit tests, but
|
||||
// we may want to refactor a way to handle future tests
|
||||
return &kube.Result{Updated: modified}, nil
|
||||
}
|
||||
|
||||
// Build implements KubeClient Build.
|
||||
|
@ -74,11 +83,20 @@ func (p *PrintingKubeClient) Build(_ io.Reader) (kube.ResourceList, error) {
|
|||
return []*resource.Info{}, nil
|
||||
}
|
||||
|
||||
func (p *PrintingKubeClient) BuildUnstructured(_ io.Reader) (kube.ResourceList, error) {
|
||||
return p.Build(nil)
|
||||
}
|
||||
|
||||
// WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase.
|
||||
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) {
|
||||
return v1.PodSucceeded, nil
|
||||
}
|
||||
|
||||
// KubernetesClientSet implements the KubeClient interface
|
||||
func (p *PrintingKubeClient) KubernetesClientSet() (kubernetes.Interface, error) {
|
||||
return fake.NewSimpleClientset(), nil
|
||||
}
|
||||
|
||||
func bufferize(resources kube.ResourceList) io.Reader {
|
||||
var builder strings.Builder
|
||||
for _, info := range resources {
|
||||
builder.WriteString(info.String() + "\n")
|
||||
}
|
||||
return strings.NewReader(builder.String())
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// KubernetesClient represents a client capable of communicating with the Kubernetes API.
|
||||
|
@ -28,39 +29,36 @@ import (
|
|||
// A KubernetesClient must be concurrency safe.
|
||||
type Interface interface {
|
||||
// Create creates one or more resources.
|
||||
//
|
||||
// reader must contain a YAML stream (one or more YAML documents separated
|
||||
// by "\n---\n").
|
||||
Create(reader io.Reader) error
|
||||
Create(resources ResourceList) (*Result, error)
|
||||
|
||||
Wait(r io.Reader, timeout time.Duration) error
|
||||
Wait(resources ResourceList, timeout time.Duration) error
|
||||
|
||||
// Delete destroys one or more resources.
|
||||
//
|
||||
// reader must contain a YAML stream (one or more YAML documents separated
|
||||
// by "\n---\n").
|
||||
Delete(io.Reader) error
|
||||
Delete(resources ResourceList) (*Result, []error)
|
||||
|
||||
// Watch the resource in reader until it is "ready".
|
||||
// Watch the resource in reader until it is "ready". This method
|
||||
//
|
||||
// For Jobs, "ready" means the job ran to completion (excited without error).
|
||||
// For all other kinds, it means the kind was created or modified without
|
||||
// error.
|
||||
WatchUntilReady(reader io.Reader, timeout time.Duration) error
|
||||
WatchUntilReady(resources ResourceList, timeout time.Duration) error
|
||||
|
||||
// Update updates one or more resources or creates the resource
|
||||
// if it doesn't exist.
|
||||
Update(original, target ResourceList, force bool) (*Result, error)
|
||||
|
||||
// Build creates a resource list from a Reader
|
||||
//
|
||||
// reader must contain a YAML stream (one or more YAML documents separated
|
||||
// by "\n---\n").
|
||||
Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error
|
||||
|
||||
// by "\n---\n")
|
||||
Build(reader io.Reader) (ResourceList, error)
|
||||
BuildUnstructured(reader io.Reader) (ResourceList, error)
|
||||
|
||||
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
|
||||
// and returns said phase (PodSucceeded or PodFailed qualify).
|
||||
WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error)
|
||||
|
||||
// KubernetesClientSet returns the underlying kubernetes clientset
|
||||
KubernetesClientSet() (kubernetes.Interface, error)
|
||||
}
|
||||
|
||||
var _ Interface = (*Client)(nil)
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
Copyright The Helm Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kube
|
||||
|
||||
// Result contains the information of created, updated, and deleted resources
|
||||
// for various kube API calls along with helper methods for using those
|
||||
// resources
|
||||
type Result struct {
|
||||
Created ResourceList
|
||||
Updated ResourceList
|
||||
Deleted ResourceList
|
||||
}
|
||||
|
||||
// If needed, we can add methods to the Result type for things like diffing
|
|
@ -56,7 +56,7 @@ func (w *waiter) waitForResources(created ResourceList) error {
|
|||
ok = true
|
||||
err error
|
||||
)
|
||||
switch value := asVersioned(v).(type) {
|
||||
switch value := AsVersioned(v).(type) {
|
||||
case *corev1.Pod:
|
||||
pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{})
|
||||
if err != nil || !w.isPodReady(pod) {
|
||||
|
@ -178,7 +178,7 @@ func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool,
|
|||
}
|
||||
|
||||
func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) {
|
||||
selector, err := selectorsForObject(obj)
|
||||
selector, err := SelectorsForObject(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -300,10 +300,10 @@ func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.
|
|||
return list.Items, err
|
||||
}
|
||||
|
||||
// selectorsForObject returns the pod label selector for a given object
|
||||
// SelectorsForObject returns the pod label selector for a given object
|
||||
//
|
||||
// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
|
||||
func selectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
|
||||
func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
|
||||
switch t := object.(type) {
|
||||
case *extensionsv1beta1.ReplicaSet:
|
||||
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
|
||||
|
|
|
@ -37,8 +37,11 @@ type Environment struct {
|
|||
}
|
||||
|
||||
func (env *Environment) createTestPod(test *test) error {
|
||||
b := bytes.NewBufferString(test.manifest)
|
||||
if err := env.KubeClient.Create(b); err != nil {
|
||||
resources, err := env.KubeClient.Build(bytes.NewBufferString(test.manifest))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := env.KubeClient.Create(resources); err != nil {
|
||||
test.result.Info = err.Error()
|
||||
test.result.Status = release.TestRunFailure
|
||||
return err
|
||||
|
@ -112,9 +115,15 @@ func (env *Environment) streamMessage(msg string, status release.TestRunStatus)
|
|||
// DeleteTestPods deletes resources given in testManifests
|
||||
func (env *Environment) DeleteTestPods(testManifests []string) {
|
||||
for _, testManifest := range testManifests {
|
||||
err := env.KubeClient.Delete(bytes.NewBufferString(testManifest))
|
||||
resources, err := env.KubeClient.Build(bytes.NewBufferString(testManifest))
|
||||
if err != nil {
|
||||
env.streamError(err.Error())
|
||||
}
|
||||
_, errs := env.KubeClient.Delete(resources)
|
||||
if err != nil {
|
||||
for _, e := range errs {
|
||||
env.streamError(e.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,13 +17,14 @@ limitations under the License.
|
|||
package releasetesting
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
||||
"helm.sh/helm/pkg/kube"
|
||||
"helm.sh/helm/pkg/kube/fake"
|
||||
"helm.sh/helm/pkg/release"
|
||||
)
|
||||
|
||||
|
@ -237,14 +238,14 @@ func testSuiteFixture(testManifests []string) *TestSuite {
|
|||
func testEnvFixture() *Environment {
|
||||
return &Environment{
|
||||
Namespace: "default",
|
||||
KubeClient: &mockKubeClient{},
|
||||
KubeClient: &mockKubeClient{PrintingKubeClient: fake.PrintingKubeClient{Out: ioutil.Discard}},
|
||||
Timeout: 1,
|
||||
Messages: make(chan *release.TestReleaseResponse, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type mockKubeClient struct {
|
||||
kube.Interface
|
||||
fake.PrintingKubeClient
|
||||
podFail bool
|
||||
err error
|
||||
}
|
||||
|
@ -255,5 +256,4 @@ func (c *mockKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration)
|
|||
}
|
||||
return v1.PodSucceeded, nil
|
||||
}
|
||||
func (c *mockKubeClient) Create(_ io.Reader) error { return c.err }
|
||||
func (c *mockKubeClient) Delete(_ io.Reader) error { return nil }
|
||||
func (c *mockKubeClient) Create(_ kube.ResourceList) (*kube.Result, error) { return nil, c.err }
|
||||
|
|
Loading…
Reference in New Issue