| 
									
										
										
										
											2016-04-13 22:08:22 +08:00
										 |  |  | // Copyright 2016 The Prometheus Authors
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | //     http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | package treecache | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"bytes" | 
					
						
							| 
									
										
										
										
											2022-06-28 00:16:58 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-06-12 00:17:59 +08:00
										 |  |  | 	"github.com/go-kit/log" | 
					
						
							|  |  |  | 	"github.com/go-kit/log/level" | 
					
						
							| 
									
										
										
										
											2020-10-23 03:16:09 +08:00
										 |  |  | 	"github.com/go-zookeeper/zk" | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | var ( | 
					
						
							|  |  |  | 	failureCounter = prometheus.NewCounter(prometheus.CounterOpts{ | 
					
						
							|  |  |  | 		Namespace: "prometheus", | 
					
						
							|  |  |  | 		Subsystem: "treecache", | 
					
						
							|  |  |  | 		Name:      "zookeeper_failures_total", | 
					
						
							|  |  |  | 		Help:      "The total number of ZooKeeper failures.", | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	numWatchers = prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 		Namespace: "prometheus", | 
					
						
							|  |  |  | 		Subsystem: "treecache", | 
					
						
							|  |  |  | 		Name:      "watcher_goroutines", | 
					
						
							|  |  |  | 		Help:      "The current number of watcher goroutines.", | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func init() { | 
					
						
							|  |  |  | 	prometheus.MustRegister(failureCounter) | 
					
						
							|  |  |  | 	prometheus.MustRegister(numWatchers) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // ZookeeperLogger wraps a log.Logger into a zk.Logger.
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | type ZookeeperLogger struct { | 
					
						
							| 
									
										
										
										
											2017-06-16 18:22:44 +08:00
										 |  |  | 	logger log.Logger | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // NewZookeeperLogger is a constructor for ZookeeperLogger.
 | 
					
						
							| 
									
										
										
										
											2017-10-19 00:02:20 +08:00
										 |  |  | func NewZookeeperLogger(logger log.Logger) ZookeeperLogger { | 
					
						
							|  |  |  | 	return ZookeeperLogger{logger: logger} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Printf implements zk.Logger.
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | func (zl ZookeeperLogger) Printf(s string, i ...interface{}) { | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 	level.Info(zl.logger).Log("msg", fmt.Sprintf(s, i...)) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // A ZookeeperTreeCache keeps data from all children of a Zookeeper path
 | 
					
						
							|  |  |  | // locally cached and updated according to received events.
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | type ZookeeperTreeCache struct { | 
					
						
							| 
									
										
										
										
											2019-01-04 21:47:38 +08:00
										 |  |  | 	conn   *zk.Conn | 
					
						
							|  |  |  | 	prefix string | 
					
						
							|  |  |  | 	events chan ZookeeperTreeCacheEvent | 
					
						
							|  |  |  | 	stop   chan struct{} | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 	wg     *sync.WaitGroup | 
					
						
							| 
									
										
										
										
											2019-01-04 21:47:38 +08:00
										 |  |  | 	head   *zookeeperTreeCacheNode | 
					
						
							| 
									
										
										
										
											2017-06-16 18:22:44 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	logger log.Logger | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // A ZookeeperTreeCacheEvent models a Zookeeper event for a path.
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | type ZookeeperTreeCacheEvent struct { | 
					
						
							|  |  |  | 	Path string | 
					
						
							|  |  |  | 	Data *[]byte | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type zookeeperTreeCacheNode struct { | 
					
						
							|  |  |  | 	data     *[]byte | 
					
						
							|  |  |  | 	events   chan zk.Event | 
					
						
							|  |  |  | 	done     chan struct{} | 
					
						
							|  |  |  | 	stopped  bool | 
					
						
							|  |  |  | 	children map[string]*zookeeperTreeCacheNode | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // NewZookeeperTreeCache creates a new ZookeeperTreeCache for a given path.
 | 
					
						
							| 
									
										
										
										
											2017-06-16 18:22:44 +08:00
										 |  |  | func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent, logger log.Logger) *ZookeeperTreeCache { | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	tc := &ZookeeperTreeCache{ | 
					
						
							|  |  |  | 		conn:   conn, | 
					
						
							|  |  |  | 		prefix: path, | 
					
						
							|  |  |  | 		events: events, | 
					
						
							|  |  |  | 		stop:   make(chan struct{}), | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 		wg:     &sync.WaitGroup{}, | 
					
						
							| 
									
										
										
										
											2017-06-16 18:22:44 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		logger: logger, | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	tc.head = &zookeeperTreeCacheNode{ | 
					
						
							|  |  |  | 		events:   make(chan zk.Event), | 
					
						
							|  |  |  | 		children: map[string]*zookeeperTreeCacheNode{}, | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 		done:     make(chan struct{}, 1), | 
					
						
							|  |  |  | 		stopped:  true, // Set head's stop to be true so that recursiveDelete will not stop the head node.
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 	tc.wg.Add(1) | 
					
						
							| 
									
										
										
										
											2017-03-04 06:51:13 +08:00
										 |  |  | 	go tc.loop(path) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	return tc | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Stop stops the tree cache.
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | func (tc *ZookeeperTreeCache) Stop() { | 
					
						
							|  |  |  | 	tc.stop <- struct{}{} | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 	go func() { | 
					
						
							|  |  |  | 		// Drain tc.head.events so that go routines can make progress and exit.
 | 
					
						
							|  |  |  | 		for range tc.head.events { | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		tc.wg.Wait() | 
					
						
							|  |  |  | 		// Close the tc.head.events after all members of the wait group have exited.
 | 
					
						
							|  |  |  | 		// This makes the go routine above exit.
 | 
					
						
							|  |  |  | 		close(tc.head.events) | 
					
						
							|  |  |  | 		close(tc.events) | 
					
						
							|  |  |  | 	}() | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-04 06:51:13 +08:00
										 |  |  | func (tc *ZookeeperTreeCache) loop(path string) { | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 	defer tc.wg.Done() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-04 06:51:13 +08:00
										 |  |  | 	failureMode := false | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	retryChan := make(chan struct{}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	failure := func() { | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 		failureCounter.Inc() | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 		failureMode = true | 
					
						
							|  |  |  | 		time.AfterFunc(time.Second*10, func() { | 
					
						
							|  |  |  | 			retryChan <- struct{}{} | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2017-03-04 06:51:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	err := tc.recursiveNodeUpdate(path, tc.head) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 		level.Error(tc.logger).Log("msg", "Error during initial read of Zookeeper", "err", err) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 		failure() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case ev := <-tc.head.events: | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 			level.Debug(tc.logger).Log("msg", "Received Zookeeper event", "event", ev) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 			if failureMode { | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			if ev.Type == zk.EventNotWatching { | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 				level.Info(tc.logger).Log("msg", "Lost connection to Zookeeper.") | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 				failure() | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				path := strings.TrimPrefix(ev.Path, tc.prefix) | 
					
						
							|  |  |  | 				parts := strings.Split(path, "/") | 
					
						
							|  |  |  | 				node := tc.head | 
					
						
							|  |  |  | 				for _, part := range parts[1:] { | 
					
						
							|  |  |  | 					childNode := node.children[part] | 
					
						
							|  |  |  | 					if childNode == nil { | 
					
						
							|  |  |  | 						childNode = &zookeeperTreeCacheNode{ | 
					
						
							|  |  |  | 							events:   tc.head.events, | 
					
						
							|  |  |  | 							children: map[string]*zookeeperTreeCacheNode{}, | 
					
						
							|  |  |  | 							done:     make(chan struct{}, 1), | 
					
						
							|  |  |  | 						} | 
					
						
							|  |  |  | 						node.children[part] = childNode | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					node = childNode | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				err := tc.recursiveNodeUpdate(ev.Path, node) | 
					
						
							|  |  |  | 				if err != nil { | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 					level.Error(tc.logger).Log("msg", "Error during processing of Zookeeper event", "err", err) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 					failure() | 
					
						
							|  |  |  | 				} else if tc.head.data == nil { | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 					level.Error(tc.logger).Log("msg", "Error during processing of Zookeeper event", "err", "path no longer exists", "path", tc.prefix) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 					failure() | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		case <-retryChan: | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 			level.Info(tc.logger).Log("msg", "Attempting to resync state with Zookeeper") | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 			previousState := &zookeeperTreeCacheNode{ | 
					
						
							|  |  |  | 				children: tc.head.children, | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 			// Reset root child nodes before traversing the Zookeeper path.
 | 
					
						
							|  |  |  | 			tc.head.children = make(map[string]*zookeeperTreeCacheNode) | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 			if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil { | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 				level.Error(tc.logger).Log("msg", "Error during Zookeeper resync", "err", err) | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 				// Revert to our previous state.
 | 
					
						
							|  |  |  | 				tc.head.children = previousState.children | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 				failure() | 
					
						
							|  |  |  | 			} else { | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 				tc.resyncState(tc.prefix, tc.head, previousState) | 
					
						
							| 
									
										
										
										
											2017-08-12 02:45:52 +08:00
										 |  |  | 				level.Info(tc.logger).Log("Zookeeper resync successful") | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 				failureMode = false | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		case <-tc.stop: | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 			// Stop head as well.
 | 
					
						
							|  |  |  | 			tc.head.done <- struct{}{} | 
					
						
							| 
									
										
										
										
											2017-05-03 07:21:37 +08:00
										 |  |  | 			tc.recursiveStop(tc.head) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTreeCacheNode) error { | 
					
						
							|  |  |  | 	data, _, dataWatcher, err := tc.conn.GetW(path) | 
					
						
							| 
									
										
										
										
											2022-06-28 00:16:58 +08:00
										 |  |  | 	if errors.Is(err, zk.ErrNoNode) { | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 		tc.recursiveDelete(path, node) | 
					
						
							|  |  |  | 		if node == tc.head { | 
					
						
							| 
									
										
										
										
											2022-06-28 00:16:58 +08:00
										 |  |  | 			return fmt.Errorf("path %s does not exist", path) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} else if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if node.data == nil || !bytes.Equal(*node.data, data) { | 
					
						
							|  |  |  | 		node.data = &data | 
					
						
							|  |  |  | 		tc.events <- ZookeeperTreeCacheEvent{Path: path, Data: node.data} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	children, _, childWatcher, err := tc.conn.ChildrenW(path) | 
					
						
							| 
									
										
										
										
											2022-06-28 00:16:58 +08:00
										 |  |  | 	if errors.Is(err, zk.ErrNoNode) { | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 		tc.recursiveDelete(path, node) | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} else if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	currentChildren := map[string]struct{}{} | 
					
						
							|  |  |  | 	for _, child := range children { | 
					
						
							|  |  |  | 		currentChildren[child] = struct{}{} | 
					
						
							|  |  |  | 		childNode := node.children[child] | 
					
						
							|  |  |  | 		// Does not already exists or we previous had a watch that
 | 
					
						
							|  |  |  | 		// triggered.
 | 
					
						
							|  |  |  | 		if childNode == nil || childNode.stopped { | 
					
						
							|  |  |  | 			node.children[child] = &zookeeperTreeCacheNode{ | 
					
						
							|  |  |  | 				events:   node.events, | 
					
						
							|  |  |  | 				children: map[string]*zookeeperTreeCacheNode{}, | 
					
						
							|  |  |  | 				done:     make(chan struct{}, 1), | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child]) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Remove nodes that no longer exist
 | 
					
						
							|  |  |  | 	for name, childNode := range node.children { | 
					
						
							|  |  |  | 		if _, ok := currentChildren[name]; !ok || node.data == nil { | 
					
						
							|  |  |  | 			tc.recursiveDelete(path+"/"+name, childNode) | 
					
						
							|  |  |  | 			delete(node.children, name) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 	tc.wg.Add(1) | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	go func() { | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 		numWatchers.Inc() | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 		// Pass up zookeeper events, until the node is deleted.
 | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case event := <-dataWatcher: | 
					
						
							|  |  |  | 			node.events <- event | 
					
						
							|  |  |  | 		case event := <-childWatcher: | 
					
						
							|  |  |  | 			node.events <- event | 
					
						
							|  |  |  | 		case <-node.done: | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | 		numWatchers.Dec() | 
					
						
							| 
									
										
										
										
											2019-06-21 20:21:13 +08:00
										 |  |  | 		tc.wg.Done() | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | 	}() | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-09 06:27:23 +08:00
										 |  |  | func (tc *ZookeeperTreeCache) resyncState(path string, currentState, previousState *zookeeperTreeCacheNode) { | 
					
						
							|  |  |  | 	for child, previousNode := range previousState.children { | 
					
						
							|  |  |  | 		if currentNode, present := currentState.children[child]; present { | 
					
						
							|  |  |  | 			tc.resyncState(path+"/"+child, currentNode, previousNode) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			tc.recursiveDelete(path+"/"+child, previousNode) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-10 07:34:32 +08:00
										 |  |  | func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) { | 
					
						
							|  |  |  | 	if !node.stopped { | 
					
						
							|  |  |  | 		node.done <- struct{}{} | 
					
						
							|  |  |  | 		node.stopped = true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if node.data != nil { | 
					
						
							|  |  |  | 		tc.events <- ZookeeperTreeCacheEvent{Path: path, Data: nil} | 
					
						
							|  |  |  | 		node.data = nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for name, childNode := range node.children { | 
					
						
							|  |  |  | 		tc.recursiveDelete(path+"/"+name, childNode) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2017-05-03 07:21:37 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | func (tc *ZookeeperTreeCache) recursiveStop(node *zookeeperTreeCacheNode) { | 
					
						
							|  |  |  | 	if !node.stopped { | 
					
						
							|  |  |  | 		node.done <- struct{}{} | 
					
						
							|  |  |  | 		node.stopped = true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for _, childNode := range node.children { | 
					
						
							|  |  |  | 		tc.recursiveStop(childNode) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |