| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | // Copyright 2016 The Prometheus Authors
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package legacymanager | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"reflect" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/go-kit/log" | 
					
						
							|  |  |  | 	"github.com/go-kit/log/level" | 
					
						
							|  |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/discovery" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/discovery/targetgroup" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type poolKey struct { | 
					
						
							|  |  |  | 	setName  string | 
					
						
							|  |  |  | 	provider string | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // provider holds a Discoverer instance, its configuration and its subscribers.
 | 
					
						
							|  |  |  | type provider struct { | 
					
						
							|  |  |  | 	name   string | 
					
						
							|  |  |  | 	d      discovery.Discoverer | 
					
						
							|  |  |  | 	subs   []string | 
					
						
							|  |  |  | 	config interface{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewManager is the Discovery Manager constructor.
 | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, metrics *discovery.Metrics, options ...func(*Manager)) *Manager { | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 	if logger == nil { | 
					
						
							|  |  |  | 		logger = log.NewNopLogger() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	mgr := &Manager{ | 
					
						
							|  |  |  | 		logger:         logger, | 
					
						
							|  |  |  | 		syncCh:         make(chan map[string][]*targetgroup.Group), | 
					
						
							|  |  |  | 		targets:        make(map[poolKey]map[string]*targetgroup.Group), | 
					
						
							|  |  |  | 		discoverCancel: []context.CancelFunc{}, | 
					
						
							|  |  |  | 		ctx:            ctx, | 
					
						
							|  |  |  | 		updatert:       5 * time.Second, | 
					
						
							|  |  |  | 		triggerSend:    make(chan struct{}, 1), | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 		registerer:     registerer, | 
					
						
							|  |  |  | 		metrics:        metrics, | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	for _, option := range options { | 
					
						
							|  |  |  | 		option(mgr) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return mgr | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Name sets the name of the manager.
 | 
					
						
							|  |  |  | func Name(n string) func(*Manager) { | 
					
						
							|  |  |  | 	return func(m *Manager) { | 
					
						
							|  |  |  | 		m.mtx.Lock() | 
					
						
							|  |  |  | 		defer m.mtx.Unlock() | 
					
						
							|  |  |  | 		m.name = n | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Manager maintains a set of discovery providers and sends each update to a map channel.
 | 
					
						
							|  |  |  | // Targets are grouped by the target set name.
 | 
					
						
							|  |  |  | type Manager struct { | 
					
						
							|  |  |  | 	logger         log.Logger | 
					
						
							|  |  |  | 	name           string | 
					
						
							|  |  |  | 	mtx            sync.RWMutex | 
					
						
							|  |  |  | 	ctx            context.Context | 
					
						
							|  |  |  | 	discoverCancel []context.CancelFunc | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Some Discoverers(eg. k8s) send only the updates for a given target group
 | 
					
						
							|  |  |  | 	// so we use map[tg.Source]*targetgroup.Group to know which group to update.
 | 
					
						
							|  |  |  | 	targets map[poolKey]map[string]*targetgroup.Group | 
					
						
							|  |  |  | 	// providers keeps track of SD providers.
 | 
					
						
							|  |  |  | 	providers []*provider | 
					
						
							|  |  |  | 	// The sync channel sends the updates as a map where the key is the job value from the scrape config.
 | 
					
						
							|  |  |  | 	syncCh chan map[string][]*targetgroup.Group | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// How long to wait before sending updates to the channel. The variable
 | 
					
						
							|  |  |  | 	// should only be modified in unit tests.
 | 
					
						
							|  |  |  | 	updatert time.Duration | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The triggerSend channel signals to the manager that new updates have been received from providers.
 | 
					
						
							|  |  |  | 	triggerSend chan struct{} | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// A registerer for all service discovery metrics.
 | 
					
						
							|  |  |  | 	registerer prometheus.Registerer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	metrics *discovery.Metrics | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-10-04 04:09:25 +08:00
										 |  |  | // Run starts the background processing.
 | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | func (m *Manager) Run() error { | 
					
						
							|  |  |  | 	go m.sender() | 
					
						
							| 
									
										
										
										
											2022-04-09 00:03:54 +08:00
										 |  |  | 	<-m.ctx.Done() | 
					
						
							|  |  |  | 	m.cancelDiscoverers() | 
					
						
							|  |  |  | 	return m.ctx.Err() | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SyncCh returns a read only channel used by all the clients to receive target updates.
 | 
					
						
							|  |  |  | func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { | 
					
						
							|  |  |  | 	return m.syncCh | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ApplyConfig removes all running discovery providers and starts new ones using the provided config.
 | 
					
						
							|  |  |  | func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error { | 
					
						
							|  |  |  | 	m.mtx.Lock() | 
					
						
							|  |  |  | 	defer m.mtx.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for pk := range m.targets { | 
					
						
							|  |  |  | 		if _, ok := cfg[pk.setName]; !ok { | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 			m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName) | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	m.cancelDiscoverers() | 
					
						
							|  |  |  | 	m.targets = make(map[poolKey]map[string]*targetgroup.Group) | 
					
						
							|  |  |  | 	m.providers = nil | 
					
						
							|  |  |  | 	m.discoverCancel = nil | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	failedCount := 0 | 
					
						
							|  |  |  | 	for name, scfg := range cfg { | 
					
						
							|  |  |  | 		failedCount += m.registerProviders(scfg, name) | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 		m.metrics.DiscoveredTargets.WithLabelValues(m.name, name).Set(0) | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 	m.metrics.FailedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	for _, prov := range m.providers { | 
					
						
							|  |  |  | 		m.startProvider(m.ctx, prov) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
 | 
					
						
							|  |  |  | func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) { | 
					
						
							|  |  |  | 	p := &provider{ | 
					
						
							|  |  |  | 		name: name, | 
					
						
							|  |  |  | 		d:    worker, | 
					
						
							|  |  |  | 		subs: []string{name}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	m.providers = append(m.providers, p) | 
					
						
							|  |  |  | 	m.startProvider(ctx, p) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *Manager) startProvider(ctx context.Context, p *provider) { | 
					
						
							|  |  |  | 	level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) | 
					
						
							|  |  |  | 	ctx, cancel := context.WithCancel(ctx) | 
					
						
							|  |  |  | 	updates := make(chan []*targetgroup.Group) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	m.discoverCancel = append(m.discoverCancel, cancel) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go p.d.Run(ctx, updates) | 
					
						
							|  |  |  | 	go m.updater(ctx, p, updates) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		case tgs, ok := <-updates: | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 			m.metrics.ReceivedUpdates.WithLabelValues(m.name).Inc() | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 			if !ok { | 
					
						
							|  |  |  | 				level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			for _, s := range p.subs { | 
					
						
							|  |  |  | 				m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			select { | 
					
						
							|  |  |  | 			case m.triggerSend <- struct{}{}: | 
					
						
							|  |  |  | 			default: | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *Manager) sender() { | 
					
						
							|  |  |  | 	ticker := time.NewTicker(m.updatert) | 
					
						
							|  |  |  | 	defer ticker.Stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-m.ctx.Done(): | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
 | 
					
						
							|  |  |  | 			select { | 
					
						
							|  |  |  | 			case <-m.triggerSend: | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 				m.metrics.SentUpdates.WithLabelValues(m.name).Inc() | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 				select { | 
					
						
							|  |  |  | 				case m.syncCh <- m.allGroups(): | 
					
						
							|  |  |  | 				default: | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 					m.metrics.DelayedUpdates.WithLabelValues(m.name).Inc() | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 					level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") | 
					
						
							|  |  |  | 					select { | 
					
						
							|  |  |  | 					case m.triggerSend <- struct{}{}: | 
					
						
							|  |  |  | 					default: | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			default: | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *Manager) cancelDiscoverers() { | 
					
						
							|  |  |  | 	for _, c := range m.discoverCancel { | 
					
						
							|  |  |  | 		c() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { | 
					
						
							|  |  |  | 	m.mtx.Lock() | 
					
						
							|  |  |  | 	defer m.mtx.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if _, ok := m.targets[poolKey]; !ok { | 
					
						
							|  |  |  | 		m.targets[poolKey] = make(map[string]*targetgroup.Group) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for _, tg := range tgs { | 
					
						
							|  |  |  | 		if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
 | 
					
						
							| 
									
										
										
										
											2023-08-15 05:28:23 +08:00
										 |  |  | 			m.targets[poolKey][tg.Source] = tg | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *Manager) allGroups() map[string][]*targetgroup.Group { | 
					
						
							|  |  |  | 	m.mtx.RLock() | 
					
						
							|  |  |  | 	defer m.mtx.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	tSets := map[string][]*targetgroup.Group{} | 
					
						
							|  |  |  | 	n := map[string]int{} | 
					
						
							|  |  |  | 	for pkey, tsets := range m.targets { | 
					
						
							|  |  |  | 		for _, tg := range tsets { | 
					
						
							|  |  |  | 			// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
 | 
					
						
							|  |  |  | 			// to signal that it needs to stop all scrape loops for this target set.
 | 
					
						
							|  |  |  | 			tSets[pkey.setName] = append(tSets[pkey.setName], tg) | 
					
						
							|  |  |  | 			n[pkey.setName] += len(tg.Targets) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for setName, v := range n { | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 		m.metrics.DiscoveredTargets.WithLabelValues(m.name, setName).Set(float64(v)) | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return tSets | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // registerProviders returns a number of failed SD config.
 | 
					
						
							|  |  |  | func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int { | 
					
						
							|  |  |  | 	var ( | 
					
						
							|  |  |  | 		failed int | 
					
						
							|  |  |  | 		added  bool | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 	add := func(cfg discovery.Config) { | 
					
						
							|  |  |  | 		for _, p := range m.providers { | 
					
						
							|  |  |  | 			if reflect.DeepEqual(cfg, p.config) { | 
					
						
							|  |  |  | 				p.subs = append(p.subs, setName) | 
					
						
							|  |  |  | 				added = true | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		typ := cfg.Name() | 
					
						
							|  |  |  | 		d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{ | 
					
						
							| 
									
										
										
										
											2023-10-23 21:55:36 +08:00
										 |  |  | 			Logger:     log.With(m.logger, "discovery", typ, "config", setName), | 
					
						
							|  |  |  | 			Registerer: m.registerer, | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2022-12-22 19:19:06 +08:00
										 |  |  | 			level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName) | 
					
						
							| 
									
										
										
										
											2021-10-20 16:15:54 +08:00
										 |  |  | 			failed++ | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		m.providers = append(m.providers, &provider{ | 
					
						
							|  |  |  | 			name:   fmt.Sprintf("%s/%d", typ, len(m.providers)), | 
					
						
							|  |  |  | 			d:      d, | 
					
						
							|  |  |  | 			config: cfg, | 
					
						
							|  |  |  | 			subs:   []string{setName}, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		added = true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for _, cfg := range cfgs { | 
					
						
							|  |  |  | 		add(cfg) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if !added { | 
					
						
							|  |  |  | 		// Add an empty target group to force the refresh of the corresponding
 | 
					
						
							|  |  |  | 		// scrape pool and to notify the receiver that this target set has no
 | 
					
						
							|  |  |  | 		// current targets.
 | 
					
						
							|  |  |  | 		// It can happen because the combined set of SD configurations is empty
 | 
					
						
							|  |  |  | 		// or because we fail to instantiate all the SD configurations.
 | 
					
						
							|  |  |  | 		add(discovery.StaticConfig{{}}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return failed | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // StaticProvider holds a list of target groups that never change.
 | 
					
						
							|  |  |  | type StaticProvider struct { | 
					
						
							|  |  |  | 	TargetGroups []*targetgroup.Group | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Run implements the Worker interface.
 | 
					
						
							|  |  |  | func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { | 
					
						
							|  |  |  | 	// We still have to consider that the consumer exits right away in which case
 | 
					
						
							|  |  |  | 	// the context will be canceled.
 | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case ch <- sd.TargetGroups: | 
					
						
							|  |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	close(ch) | 
					
						
							|  |  |  | } |