目录

prometheus源码(一)-从启动流程开始

prometheus 的启动流程,源码为 prometheus 的 v2.45.0 分支 .

写在前面

  1. 今天开个新坑,看看 prometheus 的源码。众所周知,prometheus 是 CNCF 项目之一,是一款基于时序数据库的开源的系统和服务的监控报警系统。熟悉 kubernetes 的都知道,它也是 kubernetes 集群常用的监控组件之一,同时它也是运维必备技能之一。

  2. 在日常工作中,我经常会接触到它,秉着热爱学习的态度,我想看看它的具体运行原理和相关代码逻辑,也尝试过在网上查找过它的一些源码相关的资料,不过网上关于它源码的资料少之又少,所以我们干脆直接自己上手来看。

  3. 代码拿到手一时间也不知道从何看起,我们还是学之前看 kubelet 的经验,从启动函数开始看起吧。

目录结构

我们列出几个主要的目录:

  1. cmd 主程序的入口文件

    • prometheus 服务端的主程序入口

    • promtool 校验工具的主程序入口

  2. config 默认配置参数设置及配置文件解析

  3. discovery 服务发现方式的实现,如 consul ,kubernetes ,dns 等

  4. notifier 生成告警信息及告警信息发送

  5. promql 查询语句解析

  6. scrape 指标采集

  7. rules 规则计算和记录规则

  8. tsdb 数据存储相关

完整的目录如下(目录层级—2层):

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
└── prometheus
    ├── cmd
    │   ├── prometheus
    │   └── promtool
    ├── config
    │   └── testdata
    ├── console_libraries
    ├── consoles
    ├── discovery
    │   ├── aws
    │   ├── azure
    │   ├── consul
    │   ├── digitalocean
    │   ├── dns
    │   ├── eureka
    │   ├── file
    │   ├── gce
    │   ├── hetzner
    │   ├── http
    │   ├── install
    │   ├── ionos
    │   ├── kubernetes
    │   ├── legacymanager
    │   ├── linode
    │   ├── marathon
    │   ├── moby
    │   ├── nomad
    │   ├── openstack
    │   ├── ovhcloud
    │   ├── puppetdb
    │   ├── refresh
    │   ├── scaleway
    │   ├── targetgroup
    │   ├── triton
    │   ├── uyuni
    │   ├── vultr
    │   ├── xds
    │   └── zookeeper
    ├── docs
    │   ├── command-line
    │   ├── configuration
    │   ├── images
    │   └── querying
    ├── documentation
    │   ├── examples
    │   ├── images
    │   └── prometheus-mixin
    ├── model
    │   ├── exemplar
    │   ├── histogram
    │   ├── labels
    │   ├── metadata
    │   ├── relabel
    │   ├── rulefmt
    │   ├── textparse
    │   ├── timestamp
    │   └── value
    ├── notifier
    ├── plugins
    ├── prompb
    │   └── io
    ├── promql
    │   ├── fuzz-data
    │   ├── parser
    │   └── testdata
    ├── rules
    │   └── fixtures
    ├── scrape
    │   └── testdata
    ├── scripts
    ├── storage
    │   └── remote
    ├── template
    ├── tracing
    │   └── testdata
    ├── tsdb
    │   ├── agent
    │   ├── chunkenc
    │   ├── chunks
    │   ├── docs
    │   ├── encoding
    │   ├── errors
    │   ├── fileutil
    │   ├── goversion
    │   ├── index
    │   ├── record
    │   ├── testdata
    │   ├── tombstones
    │   ├── tsdbutil
    │   └── wlog
    ├── util
    │   ├── documentcli
    │   ├── gate
    │   ├── httputil
    │   ├── jsonutil
    │   ├── logging
    │   ├── osutil
    │   ├── pool
    │   ├── runtime
    │   ├── stats
    │   ├── strutil
    │   ├── teststorage
    │   ├── testutil
    │   ├── treecache
    │   └── zeropool
    └── web
        ├── api
        └── ui

函数入口

  1. 了解了相关目录结构以后,我们目标就很明确了,我们直接进入 cmd/prometheus/main.go 文件查看。

  2. 首先就是解析各种启动参数和配置文件,其中我们会经常看到一个变量判断 agentMode ,这个参数是什么作用的呢?

    • 经查阅相关资料,agentMode 是 prometheus v2.32.0 以后的一项新特性,如果已开启 agentMode 模式的 prometheus 将会默认关闭其 UI 查询能力,报警以及本地存储等能力。

    • 启用了 agentMode 模式的 prometheus 实例的数据将写入到远程存储中。并借助远程存储来提供一个全局视图。由于它使用了 Prometheus Remote Write 的方式,所以我们需要先准备一个 “远程存储” 用于 metrcis 的中心化存储。例如使用 Thanos 来提供此能力。当然,如果你想要使用其他的方案,比如:Cortex、influxDB 等也都是可以的。

启动流程

从上到下,分别对以下组件进行初始化和启动。

  • 解析配置文件

  • Termination handler 终止处理器

  • TSDB 时序数据库

  • WAL storage WAL 存储

  • Initial configuration loading 配置文件加载(实际上它是调用 reloadConfig() 接口启动的,启动后调用 reloadReady.Close() ,触发启动其他管理器)

  • Scrape discovery manager 采集服务发现

  • Notify discovery manager 通知服务发现

  • Rule manager 规则管理器

  • Scrape manager 采集管理器

  • Tracing manager 追踪管理器

  • Reload handler 重载处理器

  • Web handler Web 处理器

  • Notifier 通知管理器

下面是源码,这里还只是将启动函数加入到启动切片里面吗,然后通过 goroutine 一个个启动,其中有些组件虽然是先加入到切片中得到提前启动,但是仔细观察会发现,它们会被初始化配置管理器的通道给阻塞住,等待配置文件初始化完成后,收到从通道来的信息后才开始启动。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
...
    var g run.Group
    {
        // Termination handler.
        term := make(chan os.Signal, 1)
        signal.Notify(term, os.Interrupt, syscall.SIGTERM)
        cancel := make(chan struct{})
        g.Add(
            func() error {
                // Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
                select {
                case <-term:
                    level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
                    reloadReady.Close()
                case <-webHandler.Quit():
                    level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")
                case <-cancel:
                    reloadReady.Close()
                }
                return nil
            },
            func(err error) {
                close(cancel)
                webHandler.SetReady(false)
            },
        )
    }
    {
        // Scrape discovery manager.
        g.Add(
            func() error {
                err := discoveryManagerScrape.Run()
                level.Info(logger).Log("msg", "Scrape discovery manager stopped")
                return err
            },
            func(err error) {
                level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
                cancelScrape()
            },
        )
    }
    {
        // Notify discovery manager.
        g.Add(
            func() error {
                err := discoveryManagerNotify.Run()
                level.Info(logger).Log("msg", "Notify discovery manager stopped")
                return err
            },
            func(err error) {
                level.Info(logger).Log("msg", "Stopping notify discovery manager...")
                cancelNotify()
            },
        )
    }
    if !agentMode {
        // Rule manager.
        g.Add(
            func() error {
                <-reloadReady.C
                ruleManager.Run()
                return nil
            },
            func(err error) {
                ruleManager.Stop()
            },
        )
    }
    {
        // Scrape manager.
        g.Add(
            func() error {
                // When the scrape manager receives a new targets list
                // it needs to read a valid config for each job.
                // It depends on the config being in sync with the discovery manager so
                // we wait until the config is fully loaded.
                <-reloadReady.C

                err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
                level.Info(logger).Log("msg", "Scrape manager stopped")
                return err
            },
            func(err error) {
                // Scrape manager needs to be stopped before closing the local TSDB
                // so that it doesn't try to write samples to a closed storage.
                // We should also wait for rule manager to be fully stopped to ensure
                // we don't trigger any false positive alerts for rules using absent().
                level.Info(logger).Log("msg", "Stopping scrape manager...")
                scrapeManager.Stop()
            },
        )
    }
    {
        // Tracing manager.
        g.Add(
            func() error {
                <-reloadReady.C
                tracingManager.Run()
                return nil
            },
            func(err error) {
                tracingManager.Stop()
            },
        )
    }
    {
        // Reload handler.

        // Make sure that sighup handler is registered with a redirect to the channel before the potentially
        // long and synchronous tsdb init.
        hup := make(chan os.Signal, 1)
        signal.Notify(hup, syscall.SIGHUP)
        cancel := make(chan struct{})
        g.Add(
            func() error {
                <-reloadReady.C

                for {
                    select {
                    case <-hup:
                        if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
                            level.Error(logger).Log("msg", "Error reloading config", "err", err)
                        }
                    case rc := <-webHandler.Reload():
                        if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
                            level.Error(logger).Log("msg", "Error reloading config", "err", err)
                            rc <- err
                        } else {
                            rc <- nil
                        }
                    case <-cancel:
                        return nil
                    }
                }
            },
            func(err error) {
                // Wait for any in-progress reloads to complete to avoid
                // reloading things after they have been shutdown.
                cancel <- struct{}{}
            },
        )
    }
    {
        // Initial configuration loading.
        cancel := make(chan struct{})
        g.Add(
            func() error {
                select {
                case <-dbOpen:
                // In case a shutdown is initiated before the dbOpen is released
                case <-cancel:
                    reloadReady.Close()
                    return nil
                }

                if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
                    return fmt.Errorf("error loading config from %q: %w", cfg.configFile, err)
                }

                reloadReady.Close()

                webHandler.SetReady(true)
                level.Info(logger).Log("msg", "Server is ready to receive web requests.")
                <-cancel
                return nil
            },
            func(err error) {
                close(cancel)
            },
        )
    }
    if !agentMode {
        // TSDB.
        opts := cfg.tsdb.ToTSDBOptions()
        cancel := make(chan struct{})
        g.Add(
            func() error {
                level.Info(logger).Log("msg", "Starting TSDB ...")
                if cfg.tsdb.WALSegmentSize != 0 {
                    if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
                        return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
                    }
                }
                if cfg.tsdb.MaxBlockChunkSegmentSize != 0 {
                    if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 {
                        return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
                    }
                }

                db, err := openDBWithMetrics(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.getStats())
                if err != nil {
                    return fmt.Errorf("opening storage failed: %w", err)
                }

                switch fsType := prom_runtime.Statfs(localStoragePath); fsType {
                case "NFS_SUPER_MAGIC":
                    level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
                default:
                    level.Info(logger).Log("fs_type", fsType)
                }

                level.Info(logger).Log("msg", "TSDB started")
                level.Debug(logger).Log("msg", "TSDB options",
                    "MinBlockDuration", cfg.tsdb.MinBlockDuration,
                    "MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
                    "MaxBytes", cfg.tsdb.MaxBytes,
                    "NoLockfile", cfg.tsdb.NoLockfile,
                    "RetentionDuration", cfg.tsdb.RetentionDuration,
                    "WALSegmentSize", cfg.tsdb.WALSegmentSize,
                    "WALCompression", cfg.tsdb.WALCompression,
                )

                startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
                localStorage.Set(db, startTimeMargin)
                db.SetWriteNotified(remoteStorage)
                close(dbOpen)
                <-cancel
                return nil
            },
            func(err error) {
                if err := fanoutStorage.Close(); err != nil {
                    level.Error(logger).Log("msg", "Error stopping storage", "err", err)
                }
                close(cancel)
            },
        )
    }
    if agentMode {
        // WAL storage.
        opts := cfg.agent.ToAgentOptions()
        cancel := make(chan struct{})
        g.Add(
            func() error {
                level.Info(logger).Log("msg", "Starting WAL storage ...")
                if cfg.agent.WALSegmentSize != 0 {
                    if cfg.agent.WALSegmentSize < 10*1024*1024 || cfg.agent.WALSegmentSize > 256*1024*1024 {
                        return errors.New("flag 'storage.agent.wal-segment-size' must be set between 10MB and 256MB")
                    }
                }
                db, err := agent.Open(
                    logger,
                    prometheus.DefaultRegisterer,
                    remoteStorage,
                    localStoragePath,
                    &opts,
                )
                if err != nil {
                    return fmt.Errorf("opening storage failed: %w", err)
                }

                switch fsType := prom_runtime.Statfs(localStoragePath); fsType {
                case "NFS_SUPER_MAGIC":
                    level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
                default:
                    level.Info(logger).Log("fs_type", fsType)
                }

                level.Info(logger).Log("msg", "Agent WAL storage started")
                level.Debug(logger).Log("msg", "Agent WAL storage options",
                    "WALSegmentSize", cfg.agent.WALSegmentSize,
                    "WALCompression", cfg.agent.WALCompression,
                    "StripeSize", cfg.agent.StripeSize,
                    "TruncateFrequency", cfg.agent.TruncateFrequency,
                    "MinWALTime", cfg.agent.MinWALTime,
                    "MaxWALTime", cfg.agent.MaxWALTime,
                )

                localStorage.Set(db, 0)
                close(dbOpen)
                <-cancel
                return nil
            },
            func(e error) {
                if err := fanoutStorage.Close(); err != nil {
                    level.Error(logger).Log("msg", "Error stopping storage", "err", err)
                }
                close(cancel)
            },
        )
    }
    {
        // Web handler.
        g.Add(
            func() error {
                if err := webHandler.Run(ctxWeb, listener, *webConfig); err != nil {
                    return fmt.Errorf("error starting web server: %w", err)
                }
                return nil
            },
            func(err error) {
                cancelWeb()
            },
        )
    }
    {
        // Notifier.

        // Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
        // so keep this interrupt after the ruleManager.Stop().
        g.Add(
            func() error {
                // When the notifier manager receives a new targets list
                // it needs to read a valid config for each job.
                // It depends on the config being in sync with the discovery manager
                // so we wait until the config is fully loaded.
                <-reloadReady.C

                notifierManager.Run(discoveryManagerNotify.SyncCh())
                level.Info(logger).Log("msg", "Notifier manager stopped")
                return nil
            },
            func(err error) {
                notifierManager.Stop()
            },
        )
    }
...

前面说到这些启动函数是加入到切片的,我们看看 g.Add() 源码就知道了。

1
2
3
4
5
6
7
8
type actor struct {
    execute   func() error
    interrupt func(error)
}

func (g *Group) Add(execute func() error, interrupt func(error)) {
    g.actors = append(g.actors, actor{execute, interrupt})
}

全部加入到切片后,挨个启动

1
2
3
4
5
    if err := g.Run(); err != nil {
        level.Error(logger).Log("err", err)
        os.Exit(1)
    }
    level.Info(logger).Log("msg", "See you next time!")

g.Run() 接口源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Group) Run() error {
    if len(g.actors) == 0 {
        return nil
    }

    // Run each actor.
    errors := make(chan error, len(g.actors))
    for _, a := range g.actors {
        go func(a actor) {
            errors <- a.execute()
        }(a)
    }

    // Wait for the first actor to stop.
    err := <-errors

    // Signal all actors to stop.
    for _, a := range g.actors {
        a.interrupt(err)
    }

    // Wait for all actors to stop.
    for i := 1; i < cap(errors); i++ {
        <-errors
    }

    // Return the original error.
    return err
}

结束

了解到 prometheus 的启动流程后,我们接下来就很明确接下来要先看那一部分了,后面我们就先按流程看 Scrape discovery manager 吧。