目录

prometheus源码(二)-服务发现管理器

prometheus 的服务发现,源码为 prometheus 的 v2.45.0 分支 .

写在前面

Prometheus 已经内置了 consul 、 kubernetes 、 zookeeper 、 eureka 及 dns 等十几种的服务发现方式。在深入研究之前,我们先看看 Prometheus 的服务发现机制到底是如何工作的。

配置解析

服务发现的第一步是先解析配置文件,得到一份包含所有服务发现的映射,再根据映射逐个启动监听。

我们在前面的阅读中可以看到 reloaders 里面有这么一个片段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
 {
            // The Scrape and notifier managers need to reload before the Discovery manager as
            // they need to read the most updated config when receiving the new targets list.
            name:     "scrape",
            reloader: scrapeManager.ApplyConfig,
        }, {
            name: "scrape_sd",
            reloader: func(cfg *config.Config) error {
                c := make(map[string]discovery.Configs)
                scfgs, err := cfg.GetScrapeConfigs()
                if err != nil {
                    return err
                }
                for _, v := range scfgs {
                    c[v.JobName] = v.ServiceDiscoveryConfigs
                }
                return discoveryManagerScrape.ApplyConfig(c)
            },
        }
  • reloadConfig() 加载配置文件。

  • 应用配置信息到指标采集管理器

  • 应用配置信息到服务发现管理器

服务发现

我们先看看服务发现的配置应用:

  • 在 Load 文件的时候,程序把 yaml 配置文件里的内容都 load 到 c.ScrapeConfigs 对应的字段里面。

  • cfg.GetScrapeConfigs() ,获取所有的服务发现部分的配置信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) {
    scfgs := make([]*ScrapeConfig, len(c.ScrapeConfigs))

    jobNames := map[string]string{}
    for i, scfg := range c.ScrapeConfigs {
        // We do these checks for library users that would not call Validate in
        // Unmarshal.
        if err := scfg.Validate(c.GlobalConfig); err != nil {
            return nil, err
        }

        if _, ok := jobNames[scfg.JobName]; ok {
            return nil, fmt.Errorf("found multiple scrape configs with job name %q", scfg.JobName)
        }
        jobNames[scfg.JobName] = "main config file"
        scfgs[i] = scfg
    }
    for _, pat := range c.ScrapeConfigFiles {
        fs, err := filepath.Glob(pat)
        if err != nil {
            // The only error can be a bad pattern.
            return nil, fmt.Errorf("error retrieving scrape config files for %q: %w", pat, err)
        }
        for _, filename := range fs {
            cfg := ScrapeConfigs{}
            content, err := os.ReadFile(filename)
            if err != nil {
                return nil, fileErr(filename, err)
            }
            err = yaml.UnmarshalStrict(content, &cfg)
            if err != nil {
                return nil, fileErr(filename, err)
            }
            for _, scfg := range cfg.ScrapeConfigs {
                if err := scfg.Validate(c.GlobalConfig); err != nil {
                    return nil, fileErr(filename, err)
                }

                if f, ok := jobNames[scfg.JobName]; ok {
                    return nil, fileErr(filename, fmt.Errorf("found multiple scrape configs with job name %q, first found in %s", scfg.JobName, f))
                }
                jobNames[scfg.JobName] = fmt.Sprintf("%q", filePath(filename))

                scfg.SetDirectory(filepath.Dir(filename))
                scfgs = append(scfgs, scfg)
            }
        }
    }
    return scfgs, nil
}
  • discoveryManagerScrape.ApplyConfig(c) 将配置信息应用到服务发现管理器。

配置信息应用到服务发现管理器的逻辑我们看下。

  • m.registerProviders(scfg, name) ,注册 Providers ,即一个个服务发现的目标。

  • Providers 有 2 个字段比较重要,subs 和 newSubs ,subs 存放当前配置信息, newSubs 存放每次新 reload 出来的配置信息,它们每次都会做一个对比,如果新 reload 的配置目前已存在且在运行,则 newSubs 里面存放该配置的 job 名,表示后续还继续使用该服务发现,如果重新加载的配置文件里面以及没有该服务发现了,则调用 prov.cancel() 停止发现。

  • 等配置文件加载完成后,检查服务发现是否已经启动过了,没有则启动新的服务发现。

  • 通过触发 m.triggerSend 通道通知服务发现对目标进行更新。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
    m.mtx.Lock()
    defer m.mtx.Unlock()

    var failedCount int
    for name, scfg := range cfg {
        failedCount += m.registerProviders(scfg, name)
    }
    failedConfigs.WithLabelValues(m.name).Set(float64(failedCount))

    var (
        wg sync.WaitGroup
        // keep shows if we keep any providers after reload.
        keep         bool
        newProviders []*Provider
    )
    for _, prov := range m.providers {
        // 通过 m.registerProviders() 注册后,newSubs 还为空的肯定就是不需要监听的
        if len(prov.newSubs) == 0 {
            wg.Add(1)
            prov.done = func() {
                wg.Done()
            }
            prov.cancel()
            continue
        }
        newProviders = append(newProviders, prov)
        // refTargets keeps reference targets used to populate new subs' targets
        var refTargets map[string]*targetgroup.Group
        prov.mu.Lock()

        m.targetsMtx.Lock()
        for s := range prov.subs {
            keep = true
            refTargets = m.targets[poolKey{s, prov.name}]
            // Remove obsolete subs' targets.
            if _, ok := prov.newSubs[s]; !ok {
                delete(m.targets, poolKey{s, prov.name})
                discoveredTargets.DeleteLabelValues(m.name, s)
            }
        }
        // Set metrics and targets for new subs.
        for s := range prov.newSubs {
            if _, ok := prov.subs[s]; !ok {
                discoveredTargets.WithLabelValues(m.name, s).Set(0)
            }
            if l := len(refTargets); l > 0 {
                m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l)
                for k, v := range refTargets {
                    m.targets[poolKey{s, prov.name}][k] = v
                }
            }
        }
        m.targetsMtx.Unlock()

        prov.subs = prov.newSubs
        prov.newSubs = map[string]struct{}{}
        prov.mu.Unlock()
        if !prov.IsStarted() {
            m.startProvider(m.ctx, prov)
        }
    }
    // Currently downstream managers expect full target state upon config reload, so we must oblige.
    // While startProvider does pull the trigger, it may take some time to do so, therefore
    // we pull the trigger as soon as possible so that downstream managers can populate their state.
    // See https://github.com/prometheus/prometheus/pull/8639 for details.
    if keep {
        select {
        case m.triggerSend <- struct{}{}:
        default:
        }
    }
    m.providers = newProviders
    wg.Wait()

    return nil
}

func (m *Manager) registerProviders(cfgs Configs, setName string) int {
    var (
        failed int
        added  bool
    )
    add := func(cfg Config) {
        for _, p := range m.providers {
            if reflect.DeepEqual(cfg, p.config) {
                p.newSubs[setName] = struct{}{}
                added = true
                return
            }
        }
        typ := cfg.Name()
        d, err := cfg.NewDiscoverer(DiscovererOptions{
            Logger:            log.With(m.logger, "discovery", typ, "config", setName),
            HTTPClientOptions: m.httpOpts,
        })
        if err != nil {
            level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName)
            failed++
            return
        }
        m.providers = append(m.providers, &Provider{
            name:   fmt.Sprintf("%s/%d", typ, m.lastProvider),
            d:      d,
            config: cfg,
            newSubs: map[string]struct{}{
                setName: {},
            },
        })
        m.lastProvider++
        added = true
    }
    for _, cfg := range cfgs {
        add(cfg)
    }
    if !added {
        // Add an empty target group to force the refresh of the corresponding
        // scrape pool and to notify the receiver that this target set has no
        // current targets.
        // It can happen because the combined set of SD configurations is empty
        // or because we fail to instantiate all the SD configurations.
        add(StaticConfig{{}})
    }
    return failed
}

启动服务发现 m.startProvider(m.ctx, prov) 的逻辑我们有必要阅读一下。

  • 拿到 Provider 实例后,调用该实例的 Run() 接口运行。

  • 同时运行 m.updater() 等待接收新的服务发现目标,并将其更新到内存缓存 m.targets map 对象中,与 ApplyConfig() 接口逻辑中的 m.targets 数据读取的逻辑形成闭环。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (m *Manager) startProvider(ctx context.Context, p *Provider) {
    level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
    ctx, cancel := context.WithCancel(ctx)
    updates := make(chan []*targetgroup.Group)

    p.cancel = cancel

    go p.d.Run(ctx, updates)
    go m.updater(ctx, p, updates)
}

func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
    // Ensure targets from this provider are cleaned up.
    defer m.cleaner(p)
    for {
        select {
        case <-ctx.Done():
            return
        case tgs, ok := <-updates:
            receivedUpdates.WithLabelValues(m.name).Inc()
            if !ok {
                level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
                // Wait for provider cancellation to ensure targets are cleaned up when expected.
                <-ctx.Done()
                return
            }

            p.mu.RLock()
            for s := range p.subs {
                m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
            }
            p.mu.RUnlock()

            select {
            case m.triggerSend <- struct{}{}:
            default:
            }
        }
    }
}

Configs 是如何被解析的?

  • 所有的服务发现方式,都有一个 SDConfig 结构体,实现了 Config 接口(Config 接口位于 discovery/discovery.go ),接口函数有 Name() 、NewDiscoverer() 以及 UnmarshalYAML() ,其中,UnmarshalYAML() 用来实现将 yaml 格式的配置文件数据序列化为结构体的。
1
2
3
4
5
6
7
8
type Config interface {
    // Name returns the name of the discovery mechanism.
    Name() string

    // NewDiscoverer returns a Discoverer for the Config
    // with the given DiscovererOptions.
    NewDiscoverer(DiscovererOptions) (Discoverer, error)
}

每种类型的服务发现方式,都有 init() 函数,即程序启动的时候进行初始化,把服务发现的结构体信息通过 discovery.RegisterConfig()(RegisterConfig() 位于 discovery/discovery.go ) 接口注册进去,方便后续对 yaml 文件进行数据格式化时数据结构的转换。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func init() {
    discovery.RegisterConfig(&SDConfig{})
    prometheus.MustRegister(rpcFailuresCount, rpcDuration)
}

func RegisterConfig(config Config) {
    // consul_sd_configs, consul.SDConfig, &SDConfig{}
    registerConfig(config.Name()+"_sd_configs", reflect.TypeOf(config), config)
}

func registerConfig(yamlKey string, elemType reflect.Type, config Config) {
    name := config.Name()
    if _, ok := configNames[name]; ok {
        panic(fmt.Sprintf("discovery: Config named %q is already registered", name))
    }
    configNames[name] = config

    fieldName := configFieldPrefix + yamlKey // Field must be exported.
    configFieldNames[elemType] = fieldName

    // Insert fields in sorted order.
    i := sort.Search(len(configFields), func(k int) bool {
        return fieldName < configFields[k].Name
    })
    configFields = append(configFields, reflect.StructField{}) // Add empty field at end.
    copy(configFields[i+1:], configFields[i:])                 // Shift fields to the right.
    // 以consul 为例, configFields 的 key 是 consul_sd_configs ,值是 consul 对应的 SDConfig 结构体切片
    configFields[i] = reflect.StructField{                     // Write new field in place.
        Name: fieldName,
        Type: reflect.SliceOf(elemType),
        // 这里给字段加了 yaml 的 tag ,在做解析的时候会将这个字段解析,这里 yamlKey 就是 consul_sd_configs 。
        Tag:  reflect.StructTag(`yaml:"` + yamlKey + `,omitempty"`),
    }
}

yaml 序列化逻辑位于 discovery/discovery.go 。

  • ScrapeConfig 结构体中没有给 Configs 类型的字段定义 yaml 的 tag ,通过上面反射的方式给添加上了。

  • Configs 就是 job 下的所有服务发现的集合。

  • getConfigType(configsType) 就是返回一个集合了所有服务发现类型字段的空的结构体。

  • unmarshal() 自定义解析接口,就是将配置文件的值映射到上面的空的结构体中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Configs []Config

// SetDirectory joins any relative file paths with dir.
func (c *Configs) SetDirectory(dir string) {
    for _, c := range *c {
        if v, ok := c.(config.DirectorySetter); ok {
            v.SetDirectory(dir)
        }
    }
}

// UnmarshalYAML implements yaml.Unmarshaler.
func (c *Configs) UnmarshalYAML(unmarshal func(interface{}) error) error {
    cfgTyp := getConfigType(configsType)
    cfgPtr := reflect.New(cfgTyp)
    cfgVal := cfgPtr.Elem()

    if err := unmarshal(cfgPtr.Interface()); err != nil {
        return replaceYAMLTypeError(err, cfgTyp, configsType)
    }

    var err error
    *c, err = readConfigs(cfgVal, 0)
    return err
}

映射完成后,scrape_configs 下的 job 的结构如下(这里只是举例说明,根据你自己的配置生成的不一样):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
scrape_configs:
    - job_name: xxx
      consul_sd_config:
        - server: "localhost:1234"
          ...
        - server: "localhost:4567"
          ...
      files_sd_config:
        - - foo/*.slow.json
          ...
      kubernetes_sd_configs:
        - role: endpoints
          ...
      static_configs:
        - targets: ["localhost:9090", "localhost:9191"]
  • static_configs 是一种特殊的服务发现。

  • 拿 consul 来举例,每个 server 区块的服务发现就是一个 Config 实例。

定时更新

通过前面配置文件的应用会启动服务发现外,在 cmd/prometheus/main.go 中 Prometheus 还通过 discoveryManagerScrape.Run() 接口来启动服务发现管理器。

通过进一步查看 Run() 接口,我们可以看到,它是实际是一个 discoveryManager 接口类型的对象,有接口就有实现。

1
2
3
4
5
type discoveryManager interface {
    ApplyConfig(cfg map[string]discovery.Configs) error
    Run() error
    SyncCh() <-chan map[string][]*targetgroup.Group
}

我们查看 discoveryManager 接口的实现,可以发现它是在 discovery/manager.go 文件里面实现了该接口的。

  • goroutine 启动 m.sender()

  • 监听 m.ctx.Done() 通道,若收到信号则关停所有服务发现。

1
2
3
4
5
6
7
8
func (m *Manager) Run() error {
    go m.sender()
    for range m.ctx.Done() {
        m.cancelDiscoverers()
        return m.ctx.Err()
    }
    return nil
}

sender() 逻辑如下:

  • 新建定时器,每个一段时间更新服务发现的目标,注释说明是为了防止某些更新频率过快,定时器是用来限速的。

  • 接收到定时器的通道信息后,等待 m.triggerSend 通道的信息。

  • 收到 m.triggerSend 通道信息后,计算除所有 targetGroup 信息,发送给 m.syncCh 通道,交给采集管理器处理。

  • m.syncCh 通道若满了,则重新触发下一次循环。

  • select 都配置了 default 区块,说明这里的通道监听都是非阻塞的。如果 select 没有 default ,则会阻塞住,程序不往下执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Manager) sender() {
    ticker := time.NewTicker(m.updatert)
    defer ticker.Stop()

    for {
        select {
        case <-m.ctx.Done():
            return
        case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
            select {
            case <-m.triggerSend:
                sentUpdates.WithLabelValues(m.name).Inc()
                select {
                case m.syncCh <- m.allGroups():
                default:
                    delayedUpdates.WithLabelValues(m.name).Inc()
                    level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
                    select {
                    case m.triggerSend <- struct{}{}:
                    default:
                    }
                }
            default:
            }
        }
    }
}

目标采集

m.allGroups() 是根据配置文件获取到的所有目标组。发送往指标采集管理器,交由他进行数据采集。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
    tSets := map[string][]*targetgroup.Group{}
    n := map[string]int{}

    m.targetsMtx.Lock()
    defer m.targetsMtx.Unlock()
    for pkey, tsets := range m.targets {
        for _, tg := range tsets {
            // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
            // to signal that it needs to stop all scrape loops for this target set.
            tSets[pkey.setName] = append(tSets[pkey.setName], tg)
            n[pkey.setName] += len(tg.Targets)
        }
    }
    for setName, v := range n {
        discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v))
    }
    return tSets
}

服务发现是怎么收集这些目标条目的呢?以 kuberbetes 为例。

kuberbetes 类型下又分很多种类型的发现,分别如下。

  • endpoints

  • endpointslice

  • ingress

  • node

  • pod

  • service

我们在这几样中拿 endpoints 出来为例,配置一般如下。

1
2
3
4
scrape_configs:
      kubernetes_sd_configs:
        - role: endpoints
          ...

又或者是 serviceMonitor:

1
2
3
4
5
6
7
8
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
...
spec:
  endpoints:
  - metricRelabelings:
    - action: drop
...

拿到配置文件信息后, dkubernetes类型的服务发现会检查 role 是哪种类型的发现,并执行相应的逻辑,我们拿 endpoints 为例,那这里对应就是 RoleEndpoints 下的逻辑。

  • 它的底层原理是利用 client-go ,根据我们给定的参数,构建自定义实现了 ListWatch 函数的 Informer 。

  • 再利用 discovery/kubernetes/endpoints.go 下的 NewEndpoints() 接口,构建成一个个服务发现实例的闭包函数。

  • 最后再通过调用 d.Run(ctx, ch) 启动 endpoints Informer 来进行服务发现,熟悉 kubernetes 的话,这一步的源码阅读不在话下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
    d.Lock()
    namespaces := d.getNamespaces()

    switch d.role {
    ...
    case RoleEndpoint:
        for _, namespace := range namespaces {
            e := d.client.CoreV1().Endpoints(namespace)
            elw := &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    options.FieldSelector = d.selectors.endpoints.field
                    options.LabelSelector = d.selectors.endpoints.label
                    return e.List(ctx, options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    options.FieldSelector = d.selectors.endpoints.field
                    options.LabelSelector = d.selectors.endpoints.label
                    return e.Watch(ctx, options)
                },
            }
            s := d.client.CoreV1().Services(namespace)
            slw := &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    options.FieldSelector = d.selectors.service.field
                    options.LabelSelector = d.selectors.service.label
                    return s.List(ctx, options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    options.FieldSelector = d.selectors.service.field
                    options.LabelSelector = d.selectors.service.label
                    return s.Watch(ctx, options)
                },
            }
            p := d.client.CoreV1().Pods(namespace)
            plw := &cache.ListWatch{
                ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                    options.FieldSelector = d.selectors.pod.field
                    options.LabelSelector = d.selectors.pod.label
                    return p.List(ctx, options)
                },
                WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                    options.FieldSelector = d.selectors.pod.field
                    options.LabelSelector = d.selectors.pod.label
                    return p.Watch(ctx, options)
                },
            }
            var nodeInf cache.SharedInformer
            if d.attachMetadata.Node {
                nodeInf = d.newNodeInformer(ctx)
                go nodeInf.Run(ctx.Done())
            }

            eps := NewEndpoints(
                log.With(d.logger, "role", "endpoint"),
                d.newEndpointsByNodeInformer(elw),
                cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
                cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
                nodeInf,
            )
            d.discoverers = append(d.discoverers, eps)
            go eps.endpointsInf.Run(ctx.Done())
            go eps.serviceInf.Run(ctx.Done())
            go eps.podInf.Run(ctx.Done())
        }
    ...
    default:
        level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role)
    }

    var wg sync.WaitGroup
    for _, dd := range d.discoverers {
        wg.Add(1)
        go func(d discovery.Discoverer) {
            defer wg.Done()
            d.Run(ctx, ch)
        }(dd)
    }

    d.Unlock()

    wg.Wait()
    <-ctx.Done()
}

这是 NewEndpoints() 的相关源码。

我们可以看到,它其实就跟实现控制器一样的原理,了解 kubernetes 源码的话,阅读起来很简单。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *Endpoints {
    if l == nil {
        l = log.NewNopLogger()
    }
    e := &Endpoints{
        logger:           l,
        endpointsInf:     eps,
        endpointsStore:   eps.GetStore(),
        serviceInf:       svc,
        serviceStore:     svc.GetStore(),
        podInf:           pod,
        podStore:         pod.GetStore(),
        nodeInf:          node,
        withNodeMetadata: node != nil,
        queue:            workqueue.NewNamed("endpoints"),
    }

    _, err := e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(o interface{}) {
            epAddCount.Inc()
            e.enqueue(o)
        },
        UpdateFunc: func(_, o interface{}) {
            epUpdateCount.Inc()
            e.enqueue(o)
        },
        DeleteFunc: func(o interface{}) {
            epDeleteCount.Inc()
            e.enqueue(o)
        },
    })
    if err != nil {
        level.Error(l).Log("msg", "Error adding endpoints event handler.", "err", err)
    }

    serviceUpdate := func(o interface{}) {
        svc, err := convertToService(o)
        if err != nil {
            level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
            return
        }

        ep := &apiv1.Endpoints{}
        ep.Namespace = svc.Namespace
        ep.Name = svc.Name
        obj, exists, err := e.endpointsStore.Get(ep)
        if exists && err == nil {
            e.enqueue(obj.(*apiv1.Endpoints))
        }

        if err != nil {
            level.Error(e.logger).Log("msg", "retrieving endpoints failed", "err", err)
        }
    }
    _, err = e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
        // TODO(fabxc): potentially remove add and delete event handlers. Those should
        // be triggered via the endpoint handlers already.
        AddFunc: func(o interface{}) {
            svcAddCount.Inc()
            serviceUpdate(o)
        },
        UpdateFunc: func(_, o interface{}) {
            svcUpdateCount.Inc()
            serviceUpdate(o)
        },
        DeleteFunc: func(o interface{}) {
            svcDeleteCount.Inc()
            serviceUpdate(o)
        },
    })
    if err != nil {
        level.Error(l).Log("msg", "Error adding services event handler.", "err", err)
    }
    if e.withNodeMetadata {
        _, err = e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(o interface{}) {
                node := o.(*apiv1.Node)
                e.enqueueNode(node.Name)
            },
            UpdateFunc: func(_, o interface{}) {
                node := o.(*apiv1.Node)
                e.enqueueNode(node.Name)
            },
            DeleteFunc: func(o interface{}) {
                node := o.(*apiv1.Node)
                e.enqueueNode(node.Name)
            },
        })
        if err != nil {
            level.Error(l).Log("msg", "Error adding nodes event handler.", "err", err)
        }
    }

    return e
}

然后我们看看它的 Run() 接口实现。

就是 kubernetes 那一套东西了,第一次启动拿到所有发现的实例,后续再根据变更做对应的更新,然后通过 send() 接口把数据发送到 updates 通道,触发更新。

  • buildEndpoints() ,该接口是把一个个 endpoints 进行数据组合,每个 endpoints 组合一组数据。

  • send() ,该接口是将 endpoints 数据发送到 updates 通道,触发更新。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Run implements the Discoverer interface.
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
    defer e.queue.ShutDown()

    cacheSyncs := []cache.InformerSynced{e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
    if e.withNodeMetadata {
        cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
    }

    if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
        if !errors.Is(ctx.Err(), context.Canceled) {
            level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
        }
        return
    }

    go func() {
        for e.process(ctx, ch) {
        }
    }()

    // Block until the target provider is explicitly canceled.
    <-ctx.Done()
}

func (e *Endpoints) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
    keyObj, quit := e.queue.Get()
    if quit {
        return false
    }
    defer e.queue.Done(keyObj)
    key := keyObj.(string)

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        level.Error(e.logger).Log("msg", "splitting key failed", "key", key)
        return true
    }

    o, exists, err := e.endpointsStore.GetByKey(key)
    if err != nil {
        level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
        return true
    }
    if !exists {
        send(ctx, ch, &targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)})
        return true
    }
    eps, err := convertToEndpoints(o)
    if err != nil {
        level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
        return true
    }
    send(ctx, ch, e.buildEndpoints(eps))
    return true
}

e.buildEndpoints(eps) 代码组装后,endpoints 里面其中一个端点的数据格式如下(我们拿 kubelet 的 serviceMonitor 来举例):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
__address__="192.168.21.122:10250"
__meta_kubernetes_endpoint_address_target_kind="Node"
__meta_kubernetes_endpoint_address_target_name="ops-k8s-node3"
__meta_kubernetes_endpoint_port_name="https-metrics"
__meta_kubernetes_endpoint_port_protocol="TCP"
__meta_kubernetes_endpoint_ready="true"
__meta_kubernetes_endpoints_label_app_kubernetes_io_managed_by="prometheus-operator"
__meta_kubernetes_endpoints_label_app_kubernetes_io_name="kubelet"
__meta_kubernetes_endpoints_label_k8s_app="kubelet"
__meta_kubernetes_endpoints_labelpresent_app_kubernetes_io_managed_by="true"
__meta_kubernetes_endpoints_labelpresent_app_kubernetes_io_name="true"
__meta_kubernetes_endpoints_labelpresent_k8s_app="true"
__meta_kubernetes_endpoints_name="rancher-monitoring-kubelet"
__meta_kubernetes_namespace="kube-system"
__meta_kubernetes_service_label_app_kubernetes_io_managed_by="prometheus-operator"
__meta_kubernetes_service_label_app_kubernetes_io_name="kubelet"
__meta_kubernetes_service_label_k8s_app="kubelet"
__meta_kubernetes_service_labelpresent_app_kubernetes_io_managed_by="true"
__meta_kubernetes_service_labelpresent_app_kubernetes_io_name="true"
__meta_kubernetes_service_labelpresent_k8s_app="true"
__meta_kubernetes_service_name="rancher-monitoring-kubelet"
__metrics_path__="/metrics"
__scheme__="https"
job="serviceMonitor/kube-system/rancher-monitoring-kubelet/0"

总结

怎么说呢,总结起来就是,Prometheus 的代码穿插频繁,阅读起来有一定的难度,集中在以下几个点。

  1. 配置解析,在解析 yaml 的过程中,它使用了自定义的解析接口去实现各种复杂的服务发现实例字段构建,刚开始看不熟悉确实遇到不少不理解的地方,花了一点时间去理解,而且这一步是启动 Prometheus 的第一步,我有点精神洁癖,理解不了就没办法继续往下。

  2. 服务发现实例的构建与启动,理解了 yaml 的解析后,这一步就相对简单很多了,根据不同的服务发现类型,去实现同一个启动接口。

  3. 目标数据更新,这一步对 golang 的 select 也需要有一定的理解才行,里面很多地方都用到 select 去等待通道信号,如果 select 有配置 default ,则代码是非阻塞的,不影响 select 代码块后面逻辑的运行,如果没有 default ,则会阻塞在 select 代码这一位置,直到收到通道的信号才继续往下执行。