mirror of https://github.com/helm/helm.git
				
				
				
			ref(pkg/kube): extract wait logic from install/update
This change adds a new method for waiting for kubernetes resources. Signed-off-by: Adam Reese <adam@reese.io>
This commit is contained in:
		
							parent
							
								
									b8bced2649
								
							
						
					
					
						commit
						2dd4744d23
					
				|  | @ -68,7 +68,7 @@ func newReleaseTestRunCmd(cfg *action.Configuration, out io.Writer) *cobra.Comma | |||
| 	} | ||||
| 
 | ||||
| 	f := cmd.Flags() | ||||
| 	f.Int64Var(&client.Timeout, "timeout", 300, "time in seconds to wait for any individual Kubernetes operation (like Jobs for hooks)") | ||||
| 	f.DurationVar(&client.Timeout, "timeout", 300, "time in seconds to wait for any individual Kubernetes operation (like Jobs for hooks)") | ||||
| 	f.BoolVar(&client.Cleanup, "cleanup", false, "delete test pods upon completion") | ||||
| 
 | ||||
| 	return cmd | ||||
|  |  | |||
|  | @ -184,6 +184,6 @@ type hookFailingKubeClient struct { | |||
| 	kube.PrintingKubeClient | ||||
| } | ||||
| 
 | ||||
| func (h *hookFailingKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, shouldWait bool) error { | ||||
| func (h *hookFailingKubeClient) WatchUntilReady(r io.Reader, timeout int64) error { | ||||
| 	return errors.New("Failed watch") | ||||
| } | ||||
|  |  | |||
|  | @ -184,12 +184,21 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) { | |||
| 	// do an update, but it's not clear whether we WANT to do an update if the re-use is set
 | ||||
| 	// to true, since that is basically an upgrade operation.
 | ||||
| 	buf := bytes.NewBufferString(rel.Manifest) | ||||
| 	if err := i.cfg.KubeClient.Create(i.Namespace, buf, i.Timeout, i.Wait); err != nil { | ||||
| 	if err := i.cfg.KubeClient.Create(buf); err != nil { | ||||
| 		rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) | ||||
| 		i.recordRelease(rel) // Ignore the error, since we have another error to deal with.
 | ||||
| 		return rel, errors.Wrapf(err, "release %s failed", i.ReleaseName) | ||||
| 	} | ||||
| 
 | ||||
| 	if i.Wait { | ||||
| 		if err := i.cfg.KubeClient.Wait(buf, i.Timeout); err != nil { | ||||
| 			rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) | ||||
| 			i.recordRelease(rel) // Ignore the error, since we have another error to deal with.
 | ||||
| 			return rel, errors.Wrapf(err, "release %s failed", i.ReleaseName) | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	if !i.DisableHooks { | ||||
| 		if err := i.execHook(rel.Hooks, hooks.PostInstall); err != nil { | ||||
| 			rel.SetStatus(release.StatusFailed, "failed post-install: "+err.Error()) | ||||
|  | @ -362,15 +371,12 @@ func (c *Configuration) renderResources(ch *chart.Chart, values chartutil.Values | |||
| 
 | ||||
| // validateManifest checks to see whether the given manifest is valid for the current Kubernetes
 | ||||
| func (i *Install) validateManifest(manifest io.Reader) error { | ||||
| 	_, err := i.cfg.KubeClient.BuildUnstructured(i.Namespace, manifest) | ||||
| 	_, err := i.cfg.KubeClient.BuildUnstructured(manifest) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // execHook executes all of the hooks for the given hook event.
 | ||||
| func (i *Install) execHook(hs []*release.Hook, hook string) error { | ||||
| 	name := i.ReleaseName | ||||
| 	namespace := i.Namespace | ||||
| 	timeout := i.Timeout | ||||
| 	executingHooks := []*release.Hook{} | ||||
| 
 | ||||
| 	for _, h := range hs { | ||||
|  | @ -384,21 +390,21 @@ func (i *Install) execHook(hs []*release.Hook, hook string) error { | |||
| 	sort.Sort(hookByWeight(executingHooks)) | ||||
| 
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(i.cfg, i.Namespace, h, hooks.BeforeHookCreation, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(i.cfg, h, hooks.BeforeHookCreation); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		b := bytes.NewBufferString(h.Manifest) | ||||
| 		if err := i.cfg.KubeClient.Create(namespace, b, timeout, false); err != nil { | ||||
| 			return errors.Wrapf(err, "warning: Release %s %s %s failed", name, hook, h.Path) | ||||
| 		if err := i.cfg.KubeClient.Create(b); err != nil { | ||||
| 			return errors.Wrapf(err, "warning: Release %s %s %s failed", i.ReleaseName, hook, h.Path) | ||||
| 		} | ||||
| 		b.Reset() | ||||
| 		b.WriteString(h.Manifest) | ||||
| 
 | ||||
| 		if err := i.cfg.KubeClient.WatchUntilReady(namespace, b, timeout, false); err != nil { | ||||
| 		if err := i.cfg.KubeClient.WatchUntilReady(b, i.Timeout); err != nil { | ||||
| 			// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
 | ||||
| 			// under failed condition. If so, then clear the corresponding resource object in the hook
 | ||||
| 			if err := deleteHookByPolicy(i.cfg, i.Namespace, h, hooks.HookFailed, hook); err != nil { | ||||
| 			if err := deleteHookByPolicy(i.cfg, h, hooks.HookFailed); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 			return err | ||||
|  | @ -408,7 +414,7 @@ func (i *Install) execHook(hs []*release.Hook, hook string) error { | |||
| 	// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
 | ||||
| 	// under succeeded condition. If so, then clear the corresponding resource object in each hook
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(i.cfg, i.Namespace, h, hooks.HookSucceeded, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(i.cfg, h, hooks.HookSucceeded); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		h.LastRun = time.Now() | ||||
|  |  | |||
|  | @ -17,6 +17,8 @@ limitations under the License. | |||
| package action | ||||
| 
 | ||||
| import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/pkg/errors" | ||||
| 
 | ||||
| 	"helm.sh/helm/pkg/release" | ||||
|  | @ -29,7 +31,7 @@ import ( | |||
| type ReleaseTesting struct { | ||||
| 	cfg *Configuration | ||||
| 
 | ||||
| 	Timeout int64 | ||||
| 	Timeout time.Duration | ||||
| 	Cleanup bool | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -58,11 +58,11 @@ func filterManifestsToKeep(manifests []releaseutil.Manifest) ([]releaseutil.Mani | |||
| 	return keep, remaining | ||||
| } | ||||
| 
 | ||||
| func summarizeKeptManifests(manifests []releaseutil.Manifest, kubeClient kube.KubernetesClient, namespace string) string { | ||||
| func summarizeKeptManifests(manifests []releaseutil.Manifest, kubeClient kube.KubernetesClient) string { | ||||
| 	var message string | ||||
| 	for _, m := range manifests { | ||||
| 		// check if m is in fact present from k8s client's POV.
 | ||||
| 		output, err := kubeClient.Get(namespace, bytes.NewBufferString(m.Content)) | ||||
| 		output, err := kubeClient.Get(bytes.NewBufferString(m.Content)) | ||||
| 		if err != nil || strings.Contains(output, kube.MissingGetHeader) { | ||||
| 			continue | ||||
| 		} | ||||
|  |  | |||
|  | @ -140,7 +140,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas | |||
| 
 | ||||
| 	// pre-rollback hooks
 | ||||
| 	if !r.DisableHooks { | ||||
| 		if err := r.execHook(targetRelease.Hooks, targetRelease.Namespace, hooks.PreRollback); err != nil { | ||||
| 		if err := r.execHook(targetRelease.Hooks, hooks.PreRollback); err != nil { | ||||
| 			return targetRelease, err | ||||
| 		} | ||||
| 	} else { | ||||
|  | @ -149,7 +149,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas | |||
| 
 | ||||
| 	cr := bytes.NewBufferString(currentRelease.Manifest) | ||||
| 	tr := bytes.NewBufferString(targetRelease.Manifest) | ||||
| 	if err := r.cfg.KubeClient.Update(targetRelease.Namespace, cr, tr, r.Force, r.Recreate, r.Timeout, r.Wait); err != nil { | ||||
| 	// TODO add wait
 | ||||
| 	if err := r.cfg.KubeClient.Update(cr, tr, r.Force, r.Recreate); err != nil { | ||||
| 		msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) | ||||
| 		r.cfg.Log("warning: %s", msg) | ||||
| 		currentRelease.Info.Status = release.StatusSuperseded | ||||
|  | @ -162,7 +163,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas | |||
| 
 | ||||
| 	// post-rollback hooks
 | ||||
| 	if !r.DisableHooks { | ||||
| 		if err := r.execHook(targetRelease.Hooks, targetRelease.Namespace, hooks.PostRollback); err != nil { | ||||
| 		if err := r.execHook(targetRelease.Hooks, hooks.PostRollback); err != nil { | ||||
| 			return targetRelease, err | ||||
| 		} | ||||
| 	} | ||||
|  | @ -184,7 +185,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas | |||
| } | ||||
| 
 | ||||
| // execHook executes all of the hooks for the given hook event.
 | ||||
| func (r *Rollback) execHook(hs []*release.Hook, namespace, hook string) error { | ||||
| func (r *Rollback) execHook(hs []*release.Hook, hook string) error { | ||||
| 	timeout := r.Timeout | ||||
| 	executingHooks := []*release.Hook{} | ||||
| 
 | ||||
|  | @ -199,21 +200,21 @@ func (r *Rollback) execHook(hs []*release.Hook, namespace, hook string) error { | |||
| 	sort.Sort(hookByWeight(executingHooks)) | ||||
| 
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(r.cfg, namespace, h, hooks.BeforeHookCreation, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(r.cfg, h, hooks.BeforeHookCreation); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		b := bytes.NewBufferString(h.Manifest) | ||||
| 		if err := r.cfg.KubeClient.Create(namespace, b, timeout, false); err != nil { | ||||
| 		if err := r.cfg.KubeClient.Create(b); err != nil { | ||||
| 			return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) | ||||
| 		} | ||||
| 		b.Reset() | ||||
| 		b.WriteString(h.Manifest) | ||||
| 
 | ||||
| 		if err := r.cfg.KubeClient.WatchUntilReady(namespace, b, timeout, false); err != nil { | ||||
| 		if err := r.cfg.KubeClient.WatchUntilReady(b, timeout); err != nil { | ||||
| 			// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
 | ||||
| 			// under failed condition. If so, then clear the corresponding resource object in the hook
 | ||||
| 			if err := deleteHookByPolicy(r.cfg, namespace, h, hooks.HookFailed, hook); err != nil { | ||||
| 			if err := deleteHookByPolicy(r.cfg, h, hooks.HookFailed); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 			return err | ||||
|  | @ -223,7 +224,7 @@ func (r *Rollback) execHook(hs []*release.Hook, namespace, hook string) error { | |||
| 	// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
 | ||||
| 	// under succeeded condition. If so, then clear the corresponding resource object in each hook
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(r.cfg, namespace, h, hooks.HookSucceeded, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(r.cfg, h, hooks.HookSucceeded); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		h.LastRun = time.Now() | ||||
|  | @ -233,10 +234,10 @@ func (r *Rollback) execHook(hs []*release.Hook, namespace, hook string) error { | |||
| } | ||||
| 
 | ||||
| // deleteHookByPolicy deletes a hook if the hook policy instructs it to
 | ||||
| func deleteHookByPolicy(cfg *Configuration, namespace string, h *release.Hook, policy, hook string) error { | ||||
| func deleteHookByPolicy(cfg *Configuration, h *release.Hook, policy string) error { | ||||
| 	b := bytes.NewBufferString(h.Manifest) | ||||
| 	if hookHasDeletePolicy(h, policy) { | ||||
| 		if errHookDelete := cfg.KubeClient.Delete(namespace, b); errHookDelete != nil { | ||||
| 		if errHookDelete := cfg.KubeClient.Delete(b); errHookDelete != nil { | ||||
| 			return errHookDelete | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -94,7 +94,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) | |||
| 	res := &release.UninstallReleaseResponse{Release: rel} | ||||
| 
 | ||||
| 	if !u.DisableHooks { | ||||
| 		if err := u.execHook(rel.Hooks, rel.Namespace, hooks.PreDelete); err != nil { | ||||
| 		if err := u.execHook(rel.Hooks, hooks.PreDelete); err != nil { | ||||
| 			return res, err | ||||
| 		} | ||||
| 	} else { | ||||
|  | @ -111,7 +111,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) | |||
| 	res.Info = kept | ||||
| 
 | ||||
| 	if !u.DisableHooks { | ||||
| 		if err := u.execHook(rel.Hooks, rel.Namespace, hooks.PostDelete); err != nil { | ||||
| 		if err := u.execHook(rel.Hooks, hooks.PostDelete); err != nil { | ||||
| 			errs = append(errs, err) | ||||
| 		} | ||||
| 	} | ||||
|  | @ -153,8 +153,7 @@ func joinErrors(errs []error) string { | |||
| } | ||||
| 
 | ||||
| // execHook executes all of the hooks for the given hook event.
 | ||||
| func (u *Uninstall) execHook(hs []*release.Hook, namespace, hook string) error { | ||||
| 	timeout := u.Timeout | ||||
| func (u *Uninstall) execHook(hs []*release.Hook, hook string) error { | ||||
| 	executingHooks := []*release.Hook{} | ||||
| 
 | ||||
| 	for _, h := range hs { | ||||
|  | @ -168,21 +167,21 @@ func (u *Uninstall) execHook(hs []*release.Hook, namespace, hook string) error { | |||
| 	sort.Sort(hookByWeight(executingHooks)) | ||||
| 
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(u.cfg, namespace, h, hooks.BeforeHookCreation, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(u.cfg, h, hooks.BeforeHookCreation); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		b := bytes.NewBufferString(h.Manifest) | ||||
| 		if err := u.cfg.KubeClient.Create(namespace, b, timeout, false); err != nil { | ||||
| 		if err := u.cfg.KubeClient.Create(b); err != nil { | ||||
| 			return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) | ||||
| 		} | ||||
| 		b.Reset() | ||||
| 		b.WriteString(h.Manifest) | ||||
| 
 | ||||
| 		if err := u.cfg.KubeClient.WatchUntilReady(namespace, b, timeout, false); err != nil { | ||||
| 		if err := u.cfg.KubeClient.WatchUntilReady(b, u.Timeout); err != nil { | ||||
| 			// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
 | ||||
| 			// under failed condition. If so, then clear the corresponding resource object in the hook
 | ||||
| 			if err := deleteHookByPolicy(u.cfg, namespace, h, hooks.HookFailed, hook); err != nil { | ||||
| 			if err := deleteHookByPolicy(u.cfg, h, hooks.HookFailed); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 			return err | ||||
|  | @ -192,7 +191,7 @@ func (u *Uninstall) execHook(hs []*release.Hook, namespace, hook string) error { | |||
| 	// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
 | ||||
| 	// under succeeded condition. If so, then clear the corresponding resource object in each hook
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(u.cfg, namespace, h, hooks.HookSucceeded, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(u.cfg, h, hooks.HookSucceeded); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		h.LastRun = time.Now() | ||||
|  | @ -220,7 +219,7 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err | |||
| 
 | ||||
| 	filesToKeep, filesToDelete := filterManifestsToKeep(files) | ||||
| 	if len(filesToKeep) > 0 { | ||||
| 		kept = summarizeKeptManifests(filesToKeep, u.cfg.KubeClient, rel.Namespace) | ||||
| 		kept = summarizeKeptManifests(filesToKeep, u.cfg.KubeClient) | ||||
| 	} | ||||
| 
 | ||||
| 	for _, file := range filesToDelete { | ||||
|  | @ -228,7 +227,7 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err | |||
| 		if b.Len() == 0 { | ||||
| 			continue | ||||
| 		} | ||||
| 		if err := u.cfg.KubeClient.Delete(rel.Namespace, b); err != nil { | ||||
| 		if err := u.cfg.KubeClient.Delete(b); err != nil { | ||||
| 			u.cfg.Log("uninstall: Failed deletion of %q: %s", rel.Name, err) | ||||
| 			if err == kube.ErrNoObjectsVisited { | ||||
| 				// Rewrite the message from "no objects visited"
 | ||||
|  |  | |||
|  | @ -183,7 +183,7 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chart.Chart) (*release.Rele | |||
| 	if len(notesTxt) > 0 { | ||||
| 		upgradedRelease.Info.Notes = notesTxt | ||||
| 	} | ||||
| 	err = validateManifest(u.cfg.KubeClient, currentRelease.Namespace, manifestDoc.Bytes()) | ||||
| 	err = validateManifest(u.cfg.KubeClient, manifestDoc.Bytes()) | ||||
| 	return currentRelease, upgradedRelease, err | ||||
| } | ||||
| 
 | ||||
|  | @ -232,7 +232,8 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea | |||
| func (u *Upgrade) upgradeRelease(current, target *release.Release) error { | ||||
| 	cm := bytes.NewBufferString(current.Manifest) | ||||
| 	tm := bytes.NewBufferString(target.Manifest) | ||||
| 	return u.cfg.KubeClient.Update(target.Namespace, cm, tm, u.Force, u.Recreate, u.Timeout, u.Wait) | ||||
| 	// TODO add wait
 | ||||
| 	return u.cfg.KubeClient.Update(cm, tm, u.Force, u.Recreate) | ||||
| } | ||||
| 
 | ||||
| // reuseValues copies values from the current release to a new release if the
 | ||||
|  | @ -295,21 +296,21 @@ func (u *Upgrade) execHook(hs []*release.Hook, hook string) error { | |||
| 	sort.Sort(hookByWeight(executingHooks)) | ||||
| 
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(u.cfg, u.Namespace, h, hooks.BeforeHookCreation, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(u.cfg, h, hooks.BeforeHookCreation); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		b := bytes.NewBufferString(h.Manifest) | ||||
| 		if err := u.cfg.KubeClient.Create(u.Namespace, b, timeout, false); err != nil { | ||||
| 		if err := u.cfg.KubeClient.Create(b); err != nil { | ||||
| 			return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) | ||||
| 		} | ||||
| 		b.Reset() | ||||
| 		b.WriteString(h.Manifest) | ||||
| 
 | ||||
| 		if err := u.cfg.KubeClient.WatchUntilReady(u.Namespace, b, timeout, false); err != nil { | ||||
| 		if err := u.cfg.KubeClient.WatchUntilReady(b, timeout); err != nil { | ||||
| 			// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
 | ||||
| 			// under failed condition. If so, then clear the corresponding resource object in the hook
 | ||||
| 			if err := deleteHookByPolicy(u.cfg, u.Namespace, h, hooks.HookFailed, hook); err != nil { | ||||
| 			if err := deleteHookByPolicy(u.cfg, h, hooks.HookFailed); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 			return err | ||||
|  | @ -319,7 +320,7 @@ func (u *Upgrade) execHook(hs []*release.Hook, hook string) error { | |||
| 	// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
 | ||||
| 	// under succeeded condition. If so, then clear the corresponding resource object in each hook
 | ||||
| 	for _, h := range executingHooks { | ||||
| 		if err := deleteHookByPolicy(u.cfg, u.Namespace, h, hooks.HookSucceeded, hook); err != nil { | ||||
| 		if err := deleteHookByPolicy(u.cfg, h, hooks.HookSucceeded); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		h.LastRun = time.Now() | ||||
|  |  | |||
|  | @ -83,26 +83,38 @@ func (c *Client) KubernetesClientSet() (*kubernetes.Clientset, error) { | |||
| 
 | ||||
| var nopLogger = func(_ string, _ ...interface{}) {} | ||||
| 
 | ||||
| // ResourceActorFunc performs an action on a single resource.
 | ||||
| type ResourceActorFunc func(*resource.Info) error | ||||
| // resourceActorFunc performs an action on a single resource.
 | ||||
| type resourceActorFunc func(*resource.Info) error | ||||
| 
 | ||||
| // Create creates Kubernetes resources from an io.reader.
 | ||||
| //
 | ||||
| // Namespace will set the namespace.
 | ||||
| func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error { | ||||
| func (c *Client) Create(reader io.Reader) error { | ||||
| 	c.Log("building resources from manifest") | ||||
| 	infos, err := c.BuildUnstructured(namespace, reader) | ||||
| 	infos, err := c.BuildUnstructured(reader) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	c.Log("creating %d resource(s)", len(infos)) | ||||
| 	if err := perform(infos, createResource); err != nil { | ||||
| 	err = perform(infos, createResource) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (c *Client) Wait(reader io.Reader, timeout int64) error { | ||||
| 	infos, err := c.BuildUnstructured(reader) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if shouldWait { | ||||
| 		return c.waitForResources(time.Duration(timeout)*time.Second, infos) | ||||
| 	cs, err := c.KubernetesClientSet() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| 	w := waiter{ | ||||
| 		c:       cs, | ||||
| 		log:     c.Log, | ||||
| 		timeout: time.Duration(timeout), | ||||
| 	} | ||||
| 	return w.waitForResources(infos) | ||||
| } | ||||
| 
 | ||||
| func (c *Client) namespace() string { | ||||
|  | @ -131,7 +143,7 @@ func (c *Client) validator() resource.ContentValidator { | |||
| } | ||||
| 
 | ||||
| // BuildUnstructured validates for Kubernetes objects and returns unstructured infos.
 | ||||
| func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, error) { | ||||
| func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) { | ||||
| 	var result Result | ||||
| 
 | ||||
| 	result, err := c.newBuilder(). | ||||
|  | @ -142,7 +154,7 @@ func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, | |||
| } | ||||
| 
 | ||||
| // Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
 | ||||
| func (c *Client) Build(namespace string, reader io.Reader) (Result, error) { | ||||
| func (c *Client) Build(reader io.Reader) (Result, error) { | ||||
| 	var result Result | ||||
| 	result, err := c.newBuilder(). | ||||
| 		WithScheme(legacyscheme.Scheme). | ||||
|  | @ -156,11 +168,11 @@ func (c *Client) Build(namespace string, reader io.Reader) (Result, error) { | |||
| // Get gets Kubernetes resources as pretty-printed string.
 | ||||
| //
 | ||||
| // Namespace will set the namespace.
 | ||||
| func (c *Client) Get(namespace string, reader io.Reader) (string, error) { | ||||
| func (c *Client) Get(reader io.Reader) (string, error) { | ||||
| 	// Since we don't know what order the objects come in, let's group them by the types, so
 | ||||
| 	// that when we print them, they come out looking good (headers apply to subgroups, etc.).
 | ||||
| 	objs := make(map[string][]runtime.Object) | ||||
| 	infos, err := c.BuildUnstructured(namespace, reader) | ||||
| 	infos, err := c.BuildUnstructured(reader) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
|  | @ -182,7 +194,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { | |||
| 		vk := gvk.Version + "/" + gvk.Kind | ||||
| 		objs[vk] = append(objs[vk], asVersioned(info)) | ||||
| 
 | ||||
| 		//Get the relation pods
 | ||||
| 		// Get the relation pods
 | ||||
| 		objPods, err = c.getSelectRelationPod(info, objPods) | ||||
| 		if err != nil { | ||||
| 			c.Log("Warning: get the relation pod is failed, err:%s", err) | ||||
|  | @ -194,7 +206,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { | |||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	//here, we will add the objPods to the objs
 | ||||
| 	// here, we will add the objPods to the objs
 | ||||
| 	for key, podItems := range objPods { | ||||
| 		for i := range podItems { | ||||
| 			objs[key+"(related)"] = append(objs[key+"(related)"], &podItems[i]) | ||||
|  | @ -235,14 +247,14 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { | |||
| // not present in the target configuration.
 | ||||
| //
 | ||||
| // Namespace will set the namespaces.
 | ||||
| func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, force, recreate bool, timeout int64, shouldWait bool) error { | ||||
| 	original, err := c.BuildUnstructured(namespace, originalReader) | ||||
| func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate bool) error { | ||||
| 	original, err := c.BuildUnstructured(originalReader) | ||||
| 	if err != nil { | ||||
| 		return goerrors.Wrap(err, "failed decoding reader into objects") | ||||
| 	} | ||||
| 
 | ||||
| 	c.Log("building resources from updated manifest") | ||||
| 	target, err := c.BuildUnstructured(namespace, targetReader) | ||||
| 	target, err := c.BuildUnstructured(targetReader) | ||||
| 	if err != nil { | ||||
| 		return goerrors.Wrap(err, "failed decoding reader into objects") | ||||
| 	} | ||||
|  | @ -298,17 +310,14 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader | |||
| 			c.Log("Failed to delete %q, err: %s", info.Name, err) | ||||
| 		} | ||||
| 	} | ||||
| 	if shouldWait { | ||||
| 		return c.waitForResources(time.Duration(timeout)*time.Second, target) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Delete deletes Kubernetes resources from an io.reader.
 | ||||
| //
 | ||||
| // Namespace will set the namespace.
 | ||||
| func (c *Client) Delete(namespace string, reader io.Reader) error { | ||||
| 	infos, err := c.BuildUnstructured(namespace, reader) | ||||
| func (c *Client) Delete(reader io.Reader) error { | ||||
| 	infos, err := c.BuildUnstructured(reader) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | @ -327,7 +336,7 @@ func (c *Client) skipIfNotFound(err error) error { | |||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc { | ||||
| func (c *Client) watchTimeout(t time.Duration) resourceActorFunc { | ||||
| 	return func(info *resource.Info) error { | ||||
| 		return c.watchUntilReady(t, info) | ||||
| 	} | ||||
|  | @ -345,8 +354,8 @@ func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc { | |||
| //   ascertained by watching the Status fields in a job's output.
 | ||||
| //
 | ||||
| // Handling for other kinds will be added as necessary.
 | ||||
| func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int64, shouldWait bool) error { | ||||
| 	infos, err := c.Build(namespace, reader) | ||||
| func (c *Client) WatchUntilReady(reader io.Reader, timeout int64) error { | ||||
| 	infos, err := c.Build(reader) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | @ -355,7 +364,7 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int | |||
| 	return perform(infos, c.watchTimeout(time.Duration(timeout)*time.Second)) | ||||
| } | ||||
| 
 | ||||
| func perform(infos Result, fn ResourceActorFunc) error { | ||||
| func perform(infos Result, fn resourceActorFunc) error { | ||||
| 	if len(infos) == 0 { | ||||
| 		return ErrNoObjectsVisited | ||||
| 	} | ||||
|  | @ -620,12 +629,12 @@ func scrubValidationError(err error) error { | |||
| 
 | ||||
| // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
 | ||||
| // and returns said phase (PodSucceeded or PodFailed qualify).
 | ||||
| func (c *Client) WaitAndGetCompletedPodPhase(namespace, name string, timeout int64) (v1.PodPhase, error) { | ||||
| func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { | ||||
| 	client, _ := c.KubernetesClientSet() | ||||
| 
 | ||||
| 	watcher, err := client.CoreV1().Pods(namespace).Watch(metav1.ListOptions{ | ||||
| 	to := int64(timeout) | ||||
| 	watcher, err := client.CoreV1().Pods(c.namespace()).Watch(metav1.ListOptions{ | ||||
| 		FieldSelector:  fmt.Sprintf("metadata.name=%s", name), | ||||
| 		TimeoutSeconds: &timeout, | ||||
| 		TimeoutSeconds: &to, | ||||
| 	}) | ||||
| 
 | ||||
| 	for event := range watcher.ResultChan() { | ||||
|  | @ -644,7 +653,7 @@ func (c *Client) WaitAndGetCompletedPodPhase(namespace, name string, timeout int | |||
| 	return v1.PodUnknown, err | ||||
| } | ||||
| 
 | ||||
| //get a kubernetes resources' relation pods
 | ||||
| // get a kubernetes resources' relation pods
 | ||||
| // kubernetes resource used select labels to relate pods
 | ||||
| func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][]v1.Pod) (map[string][]v1.Pod, error) { | ||||
| 	if info == nil { | ||||
|  |  | |||
|  | @ -153,7 +153,7 @@ func TestUpdate(t *testing.T) { | |||
| 		Factory: tf, | ||||
| 		Log:     nopLogger, | ||||
| 	} | ||||
| 	if err := c.Update(v1.NamespaceDefault, objBody(&listA), objBody(&listB), false, false, 0, false); err != nil { | ||||
| 	if err := c.Update(objBody(&listA), objBody(&listB), false, false); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	// TODO: Find a way to test methods that use Client Set
 | ||||
|  | @ -213,7 +213,7 @@ func TestBuild(t *testing.T) { | |||
| 			c.Cleanup() | ||||
| 
 | ||||
| 			// Test for an invalid manifest
 | ||||
| 			infos, err := c.Build(tt.namespace, tt.reader) | ||||
| 			infos, err := c.Build(tt.reader) | ||||
| 			if err != nil && !tt.err { | ||||
| 				t.Errorf("Got error message when no error should have occurred: %v", err) | ||||
| 			} else if err != nil && strings.Contains(err.Error(), "--validate=false") { | ||||
|  | @ -251,7 +251,7 @@ func TestGet(t *testing.T) { | |||
| 
 | ||||
| 	// Test Success
 | ||||
| 	data := strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n  name: otter") | ||||
| 	o, err := c.Get("default", data) | ||||
| 	o, err := c.Get(data) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Expected missing results, got %q", err) | ||||
| 	} | ||||
|  | @ -261,7 +261,7 @@ func TestGet(t *testing.T) { | |||
| 
 | ||||
| 	// Test failure
 | ||||
| 	data = strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n  name: starfish") | ||||
| 	o, err = c.Get("default", data) | ||||
| 	o, err = c.Get(data) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Expected missing results, got %q", err) | ||||
| 	} | ||||
|  | @ -301,7 +301,7 @@ func TestPerform(t *testing.T) { | |||
| 
 | ||||
| 			c := newTestClient() | ||||
| 			defer c.Cleanup() | ||||
| 			infos, err := c.Build("default", tt.reader) | ||||
| 			infos, err := c.Build(tt.reader) | ||||
| 			if err != nil && err.Error() != tt.errMessage { | ||||
| 				t.Errorf("Error while building manifests: %v", err) | ||||
| 			} | ||||
|  | @ -324,22 +324,22 @@ func TestPerform(t *testing.T) { | |||
| func TestReal(t *testing.T) { | ||||
| 	t.Skip("This is a live test, comment this line to run") | ||||
| 	c := New(nil) | ||||
| 	if err := c.Create("test", strings.NewReader(guestbookManifest), 300, false); err != nil { | ||||
| 	if err := c.Create(strings.NewReader(guestbookManifest)); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest | ||||
| 	c = New(nil) | ||||
| 	if err := c.Create("test-delete", strings.NewReader(testSvcEndpointManifest), 300, false); err != nil { | ||||
| 	if err := c.Create(strings.NewReader(testSvcEndpointManifest)); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.Delete("test-delete", strings.NewReader(testEndpointManifest)); err != nil { | ||||
| 	if err := c.Delete(strings.NewReader(testEndpointManifest)); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	// ensures that delete does not fail if a resource is not found
 | ||||
| 	if err := c.Delete("test-delete", strings.NewReader(testSvcEndpointManifest)); err != nil { | ||||
| 	if err := c.Delete(strings.NewReader(testSvcEndpointManifest)); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -18,6 +18,7 @@ package kube | |||
| 
 | ||||
| import ( | ||||
| 	"io" | ||||
| 	"time" | ||||
| 
 | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	"k8s.io/cli-runtime/pkg/resource" | ||||
|  | @ -29,51 +30,45 @@ import ( | |||
| type KubernetesClient interface { | ||||
| 	// Create creates one or more resources.
 | ||||
| 	//
 | ||||
| 	// namespace must contain a valid existing namespace.
 | ||||
| 	//
 | ||||
| 	// reader must contain a YAML stream (one or more YAML documents separated
 | ||||
| 	// by "\n---\n").
 | ||||
| 	Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error | ||||
| 	Create(reader io.Reader) error | ||||
| 
 | ||||
| 	Wait(r io.Reader, timeout int64) error | ||||
| 
 | ||||
| 	// Get gets one or more resources. Returned string hsa the format like kubectl
 | ||||
| 	// provides with the column headers separating the resource types.
 | ||||
| 	//
 | ||||
| 	// namespace must contain a valid existing namespace.
 | ||||
| 	//
 | ||||
| 	// reader must contain a YAML stream (one or more YAML documents separated
 | ||||
| 	// by "\n---\n").
 | ||||
| 	Get(namespace string, reader io.Reader) (string, error) | ||||
| 	Get(reader io.Reader) (string, error) | ||||
| 
 | ||||
| 	// Delete destroys one or more resources.
 | ||||
| 	//
 | ||||
| 	// namespace must contain a valid existing namespace.
 | ||||
| 	//
 | ||||
| 	// reader must contain a YAML stream (one or more YAML documents separated
 | ||||
| 	// by "\n---\n").
 | ||||
| 	Delete(namespace string, reader io.Reader) error | ||||
| 	Delete(reader io.Reader) error | ||||
| 
 | ||||
| 	// Watch the resource in reader until it is "ready".
 | ||||
| 	//
 | ||||
| 	// For Jobs, "ready" means the job ran to completion (excited without error).
 | ||||
| 	// For all other kinds, it means the kind was created or modified without
 | ||||
| 	// error.
 | ||||
| 	WatchUntilReady(namespace string, reader io.Reader, timeout int64, shouldWait bool) error | ||||
| 	WatchUntilReady(reader io.Reader, timeout int64) error | ||||
| 
 | ||||
| 	// Update updates one or more resources or creates the resource
 | ||||
| 	// if it doesn't exist.
 | ||||
| 	//
 | ||||
| 	// namespace must contain a valid existing namespace.
 | ||||
| 	//
 | ||||
| 	// reader must contain a YAML stream (one or more YAML documents separated
 | ||||
| 	// by "\n---\n").
 | ||||
| 	Update(namespace string, originalReader, modifiedReader io.Reader, force bool, recreate bool, timeout int64, shouldWait bool) error | ||||
| 	Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error | ||||
| 
 | ||||
| 	Build(namespace string, reader io.Reader) (Result, error) | ||||
| 	BuildUnstructured(namespace string, reader io.Reader) (Result, error) | ||||
| 	Build(reader io.Reader) (Result, error) | ||||
| 	BuildUnstructured(reader io.Reader) (Result, error) | ||||
| 
 | ||||
| 	// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
 | ||||
| 	// and returns said phase (PodSucceeded or PodFailed qualify).
 | ||||
| 	WaitAndGetCompletedPodPhase(namespace, name string, timeout int64) (v1.PodPhase, error) | ||||
| 	WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) | ||||
| } | ||||
| 
 | ||||
| // PrintingKubeClient implements KubeClient, but simply prints the reader to
 | ||||
|  | @ -83,13 +78,18 @@ type PrintingKubeClient struct { | |||
| } | ||||
| 
 | ||||
| // Create prints the values of what would be created with a real KubeClient.
 | ||||
| func (p *PrintingKubeClient) Create(ns string, r io.Reader, timeout int64, shouldWait bool) error { | ||||
| func (p *PrintingKubeClient) Create(r io.Reader) error { | ||||
| 	_, err := io.Copy(p.Out, r) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (p *PrintingKubeClient) Wait(r io.Reader, timeout int64) error { | ||||
| 	_, err := io.Copy(p.Out, r) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // Get prints the values of what would be created with a real KubeClient.
 | ||||
| func (p *PrintingKubeClient) Get(ns string, r io.Reader) (string, error) { | ||||
| func (p *PrintingKubeClient) Get(r io.Reader) (string, error) { | ||||
| 	_, err := io.Copy(p.Out, r) | ||||
| 	return "", err | ||||
| } | ||||
|  | @ -97,34 +97,34 @@ func (p *PrintingKubeClient) Get(ns string, r io.Reader) (string, error) { | |||
| // Delete implements KubeClient delete.
 | ||||
| //
 | ||||
| // It only prints out the content to be deleted.
 | ||||
| func (p *PrintingKubeClient) Delete(ns string, r io.Reader) error { | ||||
| func (p *PrintingKubeClient) Delete(r io.Reader) error { | ||||
| 	_, err := io.Copy(p.Out, r) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // WatchUntilReady implements KubeClient WatchUntilReady.
 | ||||
| func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, shouldWait bool) error { | ||||
| func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, timeout int64) error { | ||||
| 	_, err := io.Copy(p.Out, r) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // Update implements KubeClient Update.
 | ||||
| func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, force, recreate bool, timeout int64, shouldWait bool) error { | ||||
| func (p *PrintingKubeClient) Update(currentReader, modifiedReader io.Reader, force, recreate bool) error { | ||||
| 	_, err := io.Copy(p.Out, modifiedReader) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // Build implements KubeClient Build.
 | ||||
| func (p *PrintingKubeClient) Build(ns string, reader io.Reader) (Result, error) { | ||||
| func (p *PrintingKubeClient) Build(reader io.Reader) (Result, error) { | ||||
| 	return []*resource.Info{}, nil | ||||
| } | ||||
| 
 | ||||
| // BuildUnstructured implements KubeClient BuildUnstructured.
 | ||||
| func (p *PrintingKubeClient) BuildUnstructured(ns string, reader io.Reader) (Result, error) { | ||||
| func (p *PrintingKubeClient) BuildUnstructured(reader io.Reader) (Result, error) { | ||||
| 	return []*resource.Info{}, nil | ||||
| } | ||||
| 
 | ||||
| // WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase.
 | ||||
| func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(namespace, name string, timeout int64) (v1.PodPhase, error) { | ||||
| func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { | ||||
| 	return v1.PodSucceeded, nil | ||||
| } | ||||
|  | @ -28,35 +28,34 @@ import ( | |||
| 
 | ||||
| type mockKubeClient struct{} | ||||
| 
 | ||||
| func (k *mockKubeClient) Create(ns string, r io.Reader, timeout int64, shouldWait bool) error { | ||||
| func (k *mockKubeClient) Wait(r io.Reader, _ int64) error { | ||||
| 	return nil | ||||
| } | ||||
| func (k *mockKubeClient) Get(ns string, r io.Reader) (string, error) { | ||||
| func (k *mockKubeClient) Create(r io.Reader) error { | ||||
| 	return nil | ||||
| } | ||||
| func (k *mockKubeClient) Get(r io.Reader) (string, error) { | ||||
| 	return "", nil | ||||
| } | ||||
| func (k *mockKubeClient) Delete(ns string, r io.Reader) error { | ||||
| func (k *mockKubeClient) Delete(r io.Reader) error { | ||||
| 	return nil | ||||
| } | ||||
| func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, force, recreate bool, timeout int64, shouldWait bool) error { | ||||
| func (k *mockKubeClient) Update(currentReader, modifiedReader io.Reader, force, recreate bool) error { | ||||
| 	return nil | ||||
| } | ||||
| func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, shouldWait bool) error { | ||||
| func (k *mockKubeClient) WatchUntilReady(r io.Reader, timeout int64) error { | ||||
| 	return nil | ||||
| } | ||||
| func (k *mockKubeClient) Build(ns string, reader io.Reader) (Result, error) { | ||||
| func (k *mockKubeClient) Build(reader io.Reader) (Result, error) { | ||||
| 	return []*resource.Info{}, nil | ||||
| } | ||||
| func (k *mockKubeClient) BuildUnstructured(ns string, reader io.Reader) (Result, error) { | ||||
| func (k *mockKubeClient) BuildUnstructured(reader io.Reader) (Result, error) { | ||||
| 	return []*resource.Info{}, nil | ||||
| } | ||||
| func (k *mockKubeClient) WaitAndGetCompletedPodPhase(namespace, name string, timeout int64) (v1.PodPhase, error) { | ||||
| func (k *mockKubeClient) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { | ||||
| 	return v1.PodUnknown, nil | ||||
| } | ||||
| 
 | ||||
| func (k *mockKubeClient) WaitAndGetCompletedPodStatus(namespace string, reader io.Reader, timeout time.Duration) (v1.PodPhase, error) { | ||||
| 	return "", nil | ||||
| } | ||||
| 
 | ||||
| var _ KubernetesClient = &mockKubeClient{} | ||||
| var _ KubernetesClient = &PrintingKubeClient{} | ||||
| 
 | ||||
|  | @ -74,7 +73,7 @@ func TestKubeClient(t *testing.T) { | |||
| 		b.WriteString(content) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := kc.Create("sharry-bobbins", b, 300, false); err != nil { | ||||
| 	if err := kc.Create(b); err != nil { | ||||
| 		t.Errorf("Kubeclient failed: %s", err) | ||||
| 	} | ||||
| } | ||||
|  | @ -38,16 +38,18 @@ type deployment struct { | |||
| 	deployment  *appsv1.Deployment | ||||
| } | ||||
| 
 | ||||
| type waiter struct { | ||||
| 	c       kubernetes.Interface | ||||
| 	timeout time.Duration | ||||
| 	log     func(string, ...interface{}) | ||||
| } | ||||
| 
 | ||||
| // waitForResources polls to get the current status of all pods, PVCs, and Services
 | ||||
| // until all are ready or a timeout is reached
 | ||||
| func (c *Client) waitForResources(timeout time.Duration, created Result) error { | ||||
| 	c.Log("beginning wait for %d resources with timeout of %v", len(created), timeout) | ||||
| func (w *waiter) waitForResources(created Result) error { | ||||
| 	w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) | ||||
| 
 | ||||
| 	kcs, err := c.KubernetesClientSet() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return wait.Poll(2*time.Second, timeout, func() (bool, error) { | ||||
| 	return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { | ||||
| 		var ( | ||||
| 			pods        []v1.Pod | ||||
| 			services    []v1.Service | ||||
|  | @ -57,24 +59,24 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { | |||
| 		for _, v := range created[:0] { | ||||
| 			switch value := asVersioned(v).(type) { | ||||
| 			case *v1.ReplicationController: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *v1.Pod: | ||||
| 				pod, err := kcs.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, *pod) | ||||
| 			case *appsv1.Deployment: | ||||
| 				currentDeployment, err := kcs.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				// Find RS associated with deployment
 | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, kcs.AppsV1()) | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1()) | ||||
| 				if err != nil || newReplicaSet == nil { | ||||
| 					return false, err | ||||
| 				} | ||||
|  | @ -84,12 +86,12 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { | |||
| 				} | ||||
| 				deployments = append(deployments, newDeployment) | ||||
| 			case *appsv1beta1.Deployment: | ||||
| 				currentDeployment, err := kcs.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				// Find RS associated with deployment
 | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, kcs.AppsV1()) | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1()) | ||||
| 				if err != nil || newReplicaSet == nil { | ||||
| 					return false, err | ||||
| 				} | ||||
|  | @ -99,12 +101,12 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { | |||
| 				} | ||||
| 				deployments = append(deployments, newDeployment) | ||||
| 			case *appsv1beta2.Deployment: | ||||
| 				currentDeployment, err := kcs.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				// Find RS associated with deployment
 | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, kcs.AppsV1()) | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1()) | ||||
| 				if err != nil || newReplicaSet == nil { | ||||
| 					return false, err | ||||
| 				} | ||||
|  | @ -114,12 +116,12 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { | |||
| 				} | ||||
| 				deployments = append(deployments, newDeployment) | ||||
| 			case *extensions.Deployment: | ||||
| 				currentDeployment, err := kcs.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				// Find RS associated with deployment
 | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, kcs.AppsV1()) | ||||
| 				newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1()) | ||||
| 				if err != nil || newReplicaSet == nil { | ||||
| 					return false, err | ||||
| 				} | ||||
|  | @ -129,82 +131,82 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { | |||
| 				} | ||||
| 				deployments = append(deployments, newDeployment) | ||||
| 			case *extensions.DaemonSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1.DaemonSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1beta2.DaemonSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1.StatefulSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1beta1.StatefulSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1beta2.StatefulSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *extensions.ReplicaSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1beta2.ReplicaSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *appsv1.ReplicaSet: | ||||
| 				list, err := getPods(kcs, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pods = append(pods, list...) | ||||
| 			case *v1.PersistentVolumeClaim: | ||||
| 				claim, err := kcs.CoreV1().PersistentVolumeClaims(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				claim, err := w.c.CoreV1().PersistentVolumeClaims(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				pvc = append(pvc, *claim) | ||||
| 			case *v1.Service: | ||||
| 				svc, err := kcs.CoreV1().Services(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				svc, err := w.c.CoreV1().Services(value.Namespace).Get(value.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				services = append(services, *svc) | ||||
| 			} | ||||
| 		} | ||||
| 		isReady := c.podsReady(pods) && c.servicesReady(services) && c.volumesReady(pvc) && c.deploymentsReady(deployments) | ||||
| 		isReady := w.podsReady(pods) && w.servicesReady(services) && w.volumesReady(pvc) && w.deploymentsReady(deployments) | ||||
| 		return isReady, nil | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (c *Client) podsReady(pods []v1.Pod) bool { | ||||
| func (w *waiter) podsReady(pods []v1.Pod) bool { | ||||
| 	for _, pod := range pods { | ||||
| 		if !IsPodReady(&pod) { | ||||
| 			c.Log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) | ||||
| 			w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
|  | @ -221,7 +223,7 @@ func IsPodReady(pod *v1.Pod) bool { | |||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (c *Client) servicesReady(svc []v1.Service) bool { | ||||
| func (w *waiter) servicesReady(svc []v1.Service) bool { | ||||
| 	for _, s := range svc { | ||||
| 		// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
 | ||||
| 		if s.Spec.Type == v1.ServiceTypeExternalName { | ||||
|  | @ -230,12 +232,12 @@ func (c *Client) servicesReady(svc []v1.Service) bool { | |||
| 
 | ||||
| 		// Make sure the service is not explicitly set to "None" before checking the IP
 | ||||
| 		if s.Spec.ClusterIP != v1.ClusterIPNone && !IsServiceIPSet(&s) { | ||||
| 			c.Log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) | ||||
| 			w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) | ||||
| 			return false | ||||
| 		} | ||||
| 		// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
 | ||||
| 		if s.Spec.Type == v1.ServiceTypeLoadBalancer && s.Status.LoadBalancer.Ingress == nil { | ||||
| 			c.Log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) | ||||
| 			w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
|  | @ -248,20 +250,20 @@ func IsServiceIPSet(service *v1.Service) bool { | |||
| 	return service.Spec.ClusterIP != v1.ClusterIPNone && service.Spec.ClusterIP != "" | ||||
| } | ||||
| 
 | ||||
| func (c *Client) volumesReady(vols []v1.PersistentVolumeClaim) bool { | ||||
| func (w *waiter) volumesReady(vols []v1.PersistentVolumeClaim) bool { | ||||
| 	for _, v := range vols { | ||||
| 		if v.Status.Phase != v1.ClaimBound { | ||||
| 			c.Log("PersistentVolumeClaim is not ready: %s/%s", v.GetNamespace(), v.GetName()) | ||||
| 			w.log("PersistentVolumeClaim is not ready: %s/%s", v.GetNamespace(), v.GetName()) | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (c *Client) deploymentsReady(deployments []deployment) bool { | ||||
| func (w *waiter) deploymentsReady(deployments []deployment) bool { | ||||
| 	for _, v := range deployments { | ||||
| 		if !(v.replicaSets.Status.ReadyReplicas >= *v.deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*v.deployment)) { | ||||
| 			c.Log("Deployment is not ready: %s/%s", v.deployment.GetNamespace(), v.deployment.GetName()) | ||||
| 			w.log("Deployment is not ready: %s/%s", v.deployment.GetNamespace(), v.deployment.GetName()) | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -20,6 +20,7 @@ import ( | |||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"time" | ||||
| 
 | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 
 | ||||
|  | @ -32,12 +33,12 @@ type Environment struct { | |||
| 	Namespace  string | ||||
| 	KubeClient kube.KubernetesClient | ||||
| 	Messages   chan *release.TestReleaseResponse | ||||
| 	Timeout    int64 | ||||
| 	Timeout    time.Duration | ||||
| } | ||||
| 
 | ||||
| func (env *Environment) createTestPod(test *test) error { | ||||
| 	b := bytes.NewBufferString(test.manifest) | ||||
| 	if err := env.KubeClient.Create(env.Namespace, b, env.Timeout, false); err != nil { | ||||
| 	if err := env.KubeClient.Create(b); err != nil { | ||||
| 		test.result.Info = err.Error() | ||||
| 		test.result.Status = release.TestRunFailure | ||||
| 		return err | ||||
|  | @ -47,7 +48,7 @@ func (env *Environment) createTestPod(test *test) error { | |||
| } | ||||
| 
 | ||||
| func (env *Environment) getTestPodStatus(test *test) (v1.PodPhase, error) { | ||||
| 	status, err := env.KubeClient.WaitAndGetCompletedPodPhase(env.Namespace, test.name, env.Timeout) | ||||
| 	status, err := env.KubeClient.WaitAndGetCompletedPodPhase(test.name, env.Timeout) | ||||
| 	if err != nil { | ||||
| 		log.Printf("Error getting status for pod %s: %s", test.result.Name, err) | ||||
| 		test.result.Info = err.Error() | ||||
|  | @ -111,7 +112,7 @@ func (env *Environment) streamMessage(msg string, status release.TestRunStatus) | |||
| // DeleteTestPods deletes resources given in testManifests
 | ||||
| func (env *Environment) DeleteTestPods(testManifests []string) { | ||||
| 	for _, testManifest := range testManifests { | ||||
| 		err := env.KubeClient.Delete(env.Namespace, bytes.NewBufferString(testManifest)) | ||||
| 		err := env.KubeClient.Delete(bytes.NewBufferString(testManifest)) | ||||
| 		if err != nil { | ||||
| 			env.streamError(err.Error()) | ||||
| 		} | ||||
|  |  | |||
|  | @ -19,6 +19,7 @@ package releasetesting | |||
| import ( | ||||
| 	"io" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 
 | ||||
|  | @ -248,18 +249,18 @@ type mockKubeClient struct { | |||
| 	err     error | ||||
| } | ||||
| 
 | ||||
| func (c *mockKubeClient) WaitAndGetCompletedPodPhase(_ string, _ string, _ int64) (v1.PodPhase, error) { | ||||
| func (c *mockKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) { | ||||
| 	if c.podFail { | ||||
| 		return v1.PodFailed, nil | ||||
| 	} | ||||
| 	return v1.PodSucceeded, nil | ||||
| } | ||||
| func (c *mockKubeClient) Get(_ string, _ io.Reader) (string, error) { | ||||
| func (c *mockKubeClient) Get(_ io.Reader) (string, error) { | ||||
| 	return "", nil | ||||
| } | ||||
| func (c *mockKubeClient) Create(_ string, _ io.Reader, _ int64, _ bool) error { | ||||
| func (c *mockKubeClient) Create(_ io.Reader) error { | ||||
| 	return c.err | ||||
| } | ||||
| func (c *mockKubeClient) Delete(_ string, _ io.Reader) error { | ||||
| func (c *mockKubeClient) Delete(_ io.Reader) error { | ||||
| 	return nil | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue