目录

prometheus源码(三)-指标抓取管理器

prometheus 的指标管理,源码为 prometheus 的 v2.45.0 分支 .

写在前面

prometheus 通过服务发现去发现监控目标后,接下来就是对这些目标进行指标收集,我们这次就来看看它的指标收集源码。

我们大致看一下他代码启动流程的顺序(并不是所有的函数都列出来,只列出几个大的)。

/prometheus%E6%BA%90%E7%A0%81%E4%B8%89-%E6%8C%87%E6%A0%87%E6%8A%93%E5%8F%96%E7%AE%A1%E7%90%86%E5%99%A8/%E6%8C%87%E6%A0%87%E7%AE%A1%E7%90%86.jpg
大致流程图

看了大概的流程,有助于后面的源码部分的理解,因为代码穿插比较频繁,如果部提前有个概念的话,看起很费劲。

即使有了个大概,其实还是会有一些地方没理解的,这个时候就需要看一下他的架构图(图来源于网络)。

/prometheus%E6%BA%90%E7%A0%81%E4%B8%89-%E6%8C%87%E6%A0%87%E6%8A%93%E5%8F%96%E7%AE%A1%E7%90%86%E5%99%A8/prometheus%E6%9E%B6%E6%9E%84%E5%9B%BE.png
prometheus架构图

入口函数

在 cmd/prometheus/main.go ,我们之前也分析过,入口函数就在这里。

  1. 阻塞等待配置文件加载完毕,然后 Run() 启动指标抓取管理器对目标进行抓取。

  2. 退出函数,它退出前需要做一些逻辑判断。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...
    {
        // Scrape manager.
        g.Add(
            func() error {
                // When the scrape manager receives a new targets list
                // it needs to read a valid config for each job.
                // It depends on the config being in sync with the discovery manager so
                // we wait until the config is fully loaded.
                <-reloadReady.C

                err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
                level.Info(logger).Log("msg", "Scrape manager stopped")
                return err
            },
            func(err error) {
                // Scrape manager needs to be stopped before closing the local TSDB
                // so that it doesn't try to write samples to a closed storage.
                // We should also wait for rule manager to be fully stopped to ensure
                // we don't trigger any false positive alerts for rules using absent().
                level.Info(logger).Log("msg", "Stopping scrape manager...")
                scrapeManager.Stop()
            },
        )
    }
...

Run()

Run() 方法位于 scrape/manager.go 下,这里的代码是管理器的构造,以及方法所在。

Run() 接收并保存目标集,更新并触发 reloader 抓取循环 reload() 重加载。

重新加载器 reloader 后台进行,不会阻止接收目标,以及目标更新。

reload() 的 sp.Sync(groups) 是核心代码所在。每个 job 构造一个 ScrapePool ,然后通过 ScrapePool 的 Sync() 方法对 job 的 target 目标来进行指标获取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
    go m.reloader()
    for {
        select {
        case ts := <-tsets:
            m.updateTsets(ts)

            select {
            case m.triggerReload <- struct{}{}:
            default:
            }

        case <-m.graceShut:
            return nil
        }
    }
}

func (m *Manager) reloader() {
    reloadIntervalDuration := m.opts.DiscoveryReloadInterval
    if reloadIntervalDuration < model.Duration(5*time.Second) {
        reloadIntervalDuration = model.Duration(5 * time.Second)
    }

    ticker := time.NewTicker(time.Duration(reloadIntervalDuration))

    defer ticker.Stop()

    for {
        select {
        case <-m.graceShut:
            return
        case <-ticker.C:
            select {
            case <-m.triggerReload:
                m.reload()
            case <-m.graceShut:
                return
            }
        }
    }
}

func (m *Manager) reload() {
    m.mtxScrape.Lock()
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {
        if _, ok := m.scrapePools[setName]; !ok {
            scrapeConfig, ok := m.scrapeConfigs[setName]
            if !ok {
                level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
                continue
            }
            sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.opts)
            if err != nil {
                level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
                continue
            }
            m.scrapePools[setName] = sp
        }

        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {
            sp.Sync(groups)
            wg.Done()
        }(m.scrapePools[setName], groups)

    }
    m.mtxScrape.Unlock()
    wg.Wait()
}

抓取到的对象如下结构分组,setName 即 job 名称, groups 则是 targets。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
scrape_configs:
  - job_name: "monitor"
    static_configs:
    - targets: ['192.168.101.9:11504']

  - job_name: 'node-exporter'
    static_configs:
    - targets: ['10.21.1.74:9100', '192.168.101.9:9100']


{
    "monitor": [
        {
            "Targets": [
                {
                    "__address__": "192.168.101.9:11504"
                }
            ],
            "Labels": null,
            "Source": "0"
        }
    ],
    "node-exporter": [
        {
            "Targets": [
                {
                    "__address__": "10.21.1.74:9100"
                },
                {
                    "__address__": "192.168.101.9:9100"
                }
            ],
            "Labels": null,
            "Source": "0"
        }
    ]
}

指标获取

Sync() 将目标组转换为实际的抓取目标,与当前管理器同步结果集,返回需要被抓取和不需要再抓取需要删除的目标。

  1. 遍历目标组,拿出目标后调用 TargetsFromGroup() 检查目标身上有没有原始的标签,把所有标签都加到标签集,同时也在这里调用 PopulateLabels() 给目标身上加上一些元数据标签,及做了一层 relabeling ,对目标指标做了一次过滤。

  2. 这里的 relabeling 指的就是 relabel_configs 配置的指标过滤规则,检查目标身上有没有匹配的标签,并执行相应的操作。

  3. 这里由于代码穿插比较多的原因,TargetsFromGroup() 函数就不贴出来了。

relabel_action 有以下操作:

  • replace: 正则匹配源标签的值用来替换目标标签,如果有 replacement, 使用 replacement 替换目标标签。

  • keep: 如果正则没有匹配到源标签,删除 target 。

  • drop: 正则匹配到源标签,删除 target 。

  • hashmod: 设置目标标签值为源标签值的 hash 值。

  • labelmap: 正则匹配所有标签名,将匹配的标签的值复制到由 replacement 提供的标签名。

  • labeldrop: 正则匹配所有标签名,匹配则移除标签。

  • labelkeep: 正则匹配所有标签名,不匹配的标签会被移除。

接下来我们看看 sync() 这一部分的逻辑。

  1. 获取到的目标列表可能有重复的目标,对它们进行去重后,启动新的抓取循环,如果有目标消失了,停止消失目标的抓取循环。

  2. l.run() 是一个接口实现,我们看看它的逻辑。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
    sp.mtx.Lock()
    defer sp.mtx.Unlock()
    start := time.Now()

    sp.targetMtx.Lock()
    var all []*Target
    var targets []*Target
    lb := labels.NewBuilder(labels.EmptyLabels())
    sp.droppedTargets = []*Target{}
    for _, tg := range tgs {
        targets, failures := TargetsFromGroup(tg, sp.config, sp.noDefaultPort, targets, lb)
        for _, err := range failures {
            level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
        }
        targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
        for _, t := range targets {
            // Replicate .Labels().IsEmpty() with a loop here to avoid generating garbage.
            nonEmpty := false
            t.LabelsRange(func(l labels.Label) { nonEmpty = true })
            switch {
            case nonEmpty:
                all = append(all, t)
            case !t.discoveredLabels.IsEmpty():
                sp.droppedTargets = append(sp.droppedTargets, t)
            }
        }
    }
    sp.targetMtx.Unlock()
    sp.sync(all)

    targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
        time.Since(start).Seconds(),
    )
    targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
    var (
        uniqueLoops   = make(map[uint64]loop)
        interval      = time.Duration(sp.config.ScrapeInterval)
        timeout       = time.Duration(sp.config.ScrapeTimeout)
        bodySizeLimit = int64(sp.config.BodySizeLimit)
        sampleLimit   = int(sp.config.SampleLimit)
        bucketLimit   = int(sp.config.NativeHistogramBucketLimit)
        labelLimits   = &labelLimits{
            labelLimit:            int(sp.config.LabelLimit),
            labelNameLengthLimit:  int(sp.config.LabelNameLengthLimit),
            labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
        }
        honorLabels             = sp.config.HonorLabels
        honorTimestamps         = sp.config.HonorTimestamps
        mrc                     = sp.config.MetricRelabelConfigs
        scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
    )

    sp.targetMtx.Lock()
    for _, t := range targets {
        hash := t.hash()

        if _, ok := sp.activeTargets[hash]; !ok {
            // The scrape interval and timeout labels are set to the config's values initially,
            // so whether changed via relabeling or not, they'll exist and hold the correct values
            // for every target.
            var err error
            interval, timeout, err = t.intervalAndTimeout(interval, timeout)
            acceptHeader := scrapeAcceptHeader
            if sp.enableProtobufNegotiation {
                acceptHeader = scrapeAcceptHeaderWithProtobuf
            }
            s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader}
            l := sp.newLoop(scrapeLoopOptions{
                target:                  t,
                scraper:                 s,
                sampleLimit:             sampleLimit,
                bucketLimit:             bucketLimit,
                labelLimits:             labelLimits,
                honorLabels:             honorLabels,
                honorTimestamps:         honorTimestamps,
                mrc:                     mrc,
                interval:                interval,
                timeout:                 timeout,
                scrapeClassicHistograms: scrapeClassicHistograms,
            })
            if err != nil {
                l.setForcedError(err)
            }

            sp.activeTargets[hash] = t
            sp.loops[hash] = l

            uniqueLoops[hash] = l
        } else {
            // This might be a duplicated target.
            if _, ok := uniqueLoops[hash]; !ok {
                uniqueLoops[hash] = nil
            }
            // Need to keep the most updated labels information
            // for displaying it in the Service Discovery web page.
            sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
        }
    }

    var wg sync.WaitGroup

    // Stop and remove old targets and scraper loops.
    for hash := range sp.activeTargets {
        if _, ok := uniqueLoops[hash]; !ok {
            wg.Add(1)
            go func(l loop) {
                l.stop()
                wg.Done()
            }(sp.loops[hash])

            delete(sp.loops, hash)
            delete(sp.activeTargets, hash)
        }
    }

    sp.targetMtx.Unlock()

    targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
    forcedErr := sp.refreshTargetLimitErr()
    for _, l := range sp.loops {
        l.setForcedError(forcedErr)
    }
    for _, l := range uniqueLoops {
        if l != nil {
            go l.run(nil)
        }
    }
    // Wait for all potentially stopped scrapers to terminate.
    // This covers the case of flapping targets. If the server is under high load, a new scraper
    // may be active and tries to insert. The old scraper that didn't terminate yet could still
    // be inserting a previous sample set.
    wg.Wait()
}

l.run()

run() 方法开启了一个循环,如果接收到退出循环的通道信息,则跳出循环。

每个 scrapeLoop 按抓取周期循环执行。核心逻辑在 sl.scrapeAndReport() 里面。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (sl *scrapeLoop) run(errc chan<- error) {
    select {
    case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
        // Continue after a scraping offset.
    case <-sl.ctx.Done():
        close(sl.stopped)
        return
    }

    var last time.Time

    alignedScrapeTime := time.Now().Round(0)
    ticker := time.NewTicker(sl.interval)
    defer ticker.Stop()

mainLoop:
    for {
        select {
        case <-sl.parentCtx.Done():
            close(sl.stopped)
            return
        case <-sl.ctx.Done():
            break mainLoop
        default:
        }

        // Temporary workaround for a jitter in go timers that causes disk space
        // increase in TSDB.
        // See https://github.com/prometheus/prometheus/issues/7846
        // Calling Round ensures the time used is the wall clock, as otherwise .Sub
        // and .Add on time.Time behave differently (see time package docs).
        scrapeTime := time.Now().Round(0)
        if AlignScrapeTimestamps && sl.interval > 100*ScrapeTimestampTolerance {
            // For some reason, a tick might have been skipped, in which case we
            // would call alignedScrapeTime.Add(interval) multiple times.
            for scrapeTime.Sub(alignedScrapeTime) >= sl.interval {
                alignedScrapeTime = alignedScrapeTime.Add(sl.interval)
            }
            // Align the scrape time if we are in the tolerance boundaries.
            if scrapeTime.Sub(alignedScrapeTime) <= ScrapeTimestampTolerance {
                scrapeTime = alignedScrapeTime
            }
        }

        last = sl.scrapeAndReport(last, scrapeTime, errc)

        select {
        case <-sl.parentCtx.Done():
            close(sl.stopped)
            return
        case <-sl.ctx.Done():
            break mainLoop
        case <-ticker.C:
        }
    }

    close(sl.stopped)

    if !sl.disabledEndOfRunStalenessMarkers {
        sl.endOfRunStaleness(last, ticker, sl.interval)
    }
}

scrapeAndReport()

对指标进抓取和过滤。

  1. sl.scraper.scrape() 抓取指标数据。

  2. sl.append() 写入底层存储,指标写入存储前,在这里做的 metric_relabel 指标过滤,metric_relabel_configs 的 actions 同 relabel_config 一样。

  3. 最后 sl.report() 更新 scrapeLoop 的指标值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- error) time.Time {
    start := time.Now()

    // Only record after the first scrape.
    if !last.IsZero() {
        targetIntervalLength.WithLabelValues(sl.interval.String()).Observe(
            time.Since(last).Seconds(),
        )
    }

    b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
    defer sl.buffers.Put(b)
    buf := bytes.NewBuffer(b)

    var total, added, seriesAdded, bytes int
    var err, appErr, scrapeErr error

    app := sl.appender(sl.appenderCtx)
    defer func() {
        if err != nil {
            app.Rollback()
            return
        }
        err = app.Commit()
        if err != nil {
            level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
        }
    }()

    defer func() {
        if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytes, scrapeErr); err != nil {
            level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
        }
    }()

    if forcedErr := sl.getForcedError(); forcedErr != nil {
        scrapeErr = forcedErr
        // Add stale markers.
        if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
            app.Rollback()
            app = sl.appender(sl.appenderCtx)
            level.Warn(sl.l).Log("msg", "Append failed", "err", err)
        }
        if errc != nil {
            errc <- forcedErr
        }

        return start
    }

    var contentType string
    scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout)
    contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
    cancel()

    if scrapeErr == nil {
        b = buf.Bytes()
        // NOTE: There were issues with misbehaving clients in the past
        // that occasionally returned empty results. We don't want those
        // to falsely reset our buffer size.
        if len(b) > 0 {
            sl.lastScrapeSize = len(b)
        }
        bytes = len(b)
    } else {
        level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr)
        if errc != nil {
            errc <- scrapeErr
        }
        if errors.Is(scrapeErr, errBodySizeLimit) {
            bytes = -1
        }
    }

    // A failed scrape is the same as an empty scrape,
    // we still call sl.append to trigger stale markers.
    total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
    if appErr != nil {
        app.Rollback()
        app = sl.appender(sl.appenderCtx)
        level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
        // The append failed, probably due to a parse error or sample limit.
        // Call sl.append again with an empty scrape to trigger stale markers.
        if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
            app.Rollback()
            app = sl.appender(sl.appenderCtx)
            level.Warn(sl.l).Log("msg", "Append failed", "err", err)
        }
    }

    if scrapeErr == nil {
        scrapeErr = appErr
    }

    return start
}

sl.scraper.scrape()

指标抓取的代码逻辑。

  1. 这里使用 net/http 标准库,对目标发起请求,进行指标抓取。

  2. 发起请求头加上了 (“Accept-Encoding”, “gzip”) ,对请求的流量进行流量压缩,在对目标规模较大的场景比较有意义,可以节省 prometheus 发起请求的流量。对于客户端返回的头部带 (“Content-Encoding”) 为 “gzip” 的响应体,prometheus 会对其进行解压。

  3. 抓取到的指标写入到 buff 缓存中,等待后续的流程处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
    if s.req == nil {
        req, err := http.NewRequest("GET", s.URL().String(), nil)
        if err != nil {
            return "", err
        }
        req.Header.Add("Accept", s.acceptHeader)
        req.Header.Add("Accept-Encoding", "gzip")
        req.Header.Set("User-Agent", UserAgent)
        req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))

        s.req = req
    }

    resp, err := s.client.Do(s.req.WithContext(ctx))
    if err != nil {
        return "", err
    }
    defer func() {
        io.Copy(io.Discard, resp.Body)
        resp.Body.Close()
    }()

    if resp.StatusCode != http.StatusOK {
        return "", errors.Errorf("server returned HTTP status %s", resp.Status)
    }

    if s.bodySizeLimit <= 0 {
        s.bodySizeLimit = math.MaxInt64
    }
    if resp.Header.Get("Content-Encoding") != "gzip" {
        n, err := io.Copy(w, io.LimitReader(resp.Body, s.bodySizeLimit))
        if err != nil {
            return "", err
        }
        if n >= s.bodySizeLimit {
            targetScrapeExceededBodySizeLimit.Inc()
            return "", errBodySizeLimit
        }
        return resp.Header.Get("Content-Type"), nil
    }

    if s.gzipr == nil {
        s.buf = bufio.NewReader(resp.Body)
        s.gzipr, err = gzip.NewReader(s.buf)
        if err != nil {
            return "", err
        }
    } else {
        s.buf.Reset(resp.Body)
        if err = s.gzipr.Reset(s.buf); err != nil {
            return "", err
        }
    }

    n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit))
    s.gzipr.Close()
    if err != nil {
        return "", err
    }
    if n >= s.bodySizeLimit {
        targetScrapeExceededBodySizeLimit.Inc()
        return "", errBodySizeLimit
    }
    return resp.Header.Get("Content-Type"), nil
}

sl.append()

指标处理的代码逻辑。

  1. for 循环对指标进行逐行提取,p.Next() 提取指标条目。

  2. 对指标类型进行判断,然后调用 sl.sampleMutator() 方法对指标进行过滤。

  3. 最后写入缓存存储,等待后续的流程对指标进行落盘处理。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
    p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms)
    if err != nil {
        level.Debug(sl.l).Log(
            "msg", "Invalid content type on scrape, using prometheus parser as fallback.",
            "content_type", contentType,
            "err", err,
        )
    }

    var (
        defTime         = timestamp.FromTime(ts)
        appErrs         = appendErrors{}
        sampleLimitErr  error
        bucketLimitErr  error
        e               exemplar.Exemplar // escapes to heap so hoisted out of loop
        meta            metadata.Metadata
        metadataChanged bool
    )

    // updateMetadata updates the current iteration's metadata object and the
    // metadataChanged value if we have metadata in the scrape cache AND the
    // labelset is for a new series or the metadata for this series has just
    // changed. It returns a boolean based on whether the metadata was updated.
    updateMetadata := func(lset labels.Labels, isNewSeries bool) bool {
        if !sl.appendMetadataToWAL {
            return false
        }

        sl.cache.metaMtx.Lock()
        defer sl.cache.metaMtx.Unlock()
        metaEntry, metaOk := sl.cache.metadata[lset.Get(labels.MetricName)]
        if metaOk && (isNewSeries || metaEntry.lastIterChange == sl.cache.iter) {
            metadataChanged = true
            meta.Type = metaEntry.Type
            meta.Unit = metaEntry.Unit
            meta.Help = metaEntry.Help
            return true
        }
        return false
    }

    // Take an appender with limits.
    app = appender(app, sl.sampleLimit, sl.bucketLimit)

    defer func() {
        if err != nil {
            return
        }
        // Only perform cache cleaning if the scrape was not empty.
        // An empty scrape (usually) is used to indicate a failed scrape.
        sl.cache.iterDone(len(b) > 0)
    }()

loop:
    for {
        var (
            et                       textparse.Entry
            sampleAdded, isHistogram bool
            met                      []byte
            parsedTimestamp          *int64
            val                      float64
            h                        *histogram.Histogram
            fh                       *histogram.FloatHistogram
        )
        if et, err = p.Next(); err != nil {
            if errors.Is(err, io.EOF) {
                err = nil
            }
            break
        }
        switch et {
        case textparse.EntryType:
            sl.cache.setType(p.Type())
            continue
        case textparse.EntryHelp:
            sl.cache.setHelp(p.Help())
            continue
        case textparse.EntryUnit:
            sl.cache.setUnit(p.Unit())
            continue
        case textparse.EntryComment:
            continue
        case textparse.EntryHistogram:
            isHistogram = true
        default:
        }
        total++

        t := defTime
        if isHistogram {
            met, parsedTimestamp, h, fh = p.Histogram()
        } else {
            met, parsedTimestamp, val = p.Series()
        }
        if !sl.honorTimestamps {
            parsedTimestamp = nil
        }
        if parsedTimestamp != nil {
            t = *parsedTimestamp
        }

        // Zero metadata out for current iteration until it's resolved.
        meta = metadata.Metadata{}
        metadataChanged = false

        if sl.cache.getDropped(met) {
            continue
        }
        ce, ok := sl.cache.get(met)
        var (
            ref  storage.SeriesRef
            lset labels.Labels
            hash uint64
        )

        if ok {
            ref = ce.ref
            lset = ce.lset

            // Update metadata only if it changed in the current iteration.
            updateMetadata(lset, false)
        } else {
            p.Metric(&lset)
            hash = lset.Hash()

            // Hash label set as it is seen local to the target. Then add target labels
            // and relabeling and store the final label set.
            lset = sl.sampleMutator(lset)

            // The label set may be set to empty to indicate dropping.
            if lset.IsEmpty() {
                sl.cache.addDropped(met)
                continue
            }

            if !lset.Has(labels.MetricName) {
                err = errNameLabelMandatory
                break loop
            }
            if !lset.IsValid() {
                err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
                break loop
            }

            // If any label limits is exceeded the scrape should fail.
            if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
                targetScrapePoolExceededLabelLimits.Inc()
                break loop
            }

            // Append metadata for new series if they were present.
            updateMetadata(lset, true)
        }

        if isHistogram {
            if h != nil {
                ref, err = app.AppendHistogram(ref, lset, t, h, nil)
            } else {
                ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
            }
        } else {
            ref, err = app.Append(ref, lset, t, val)
        }
        sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
        if err != nil {
            if err != storage.ErrNotFound {
                level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
            }
            break loop
        }

        if !ok {
            if parsedTimestamp == nil {
                // Bypass staleness logic if there is an explicit timestamp.
                sl.cache.trackStaleness(hash, lset)
            }
            sl.cache.addRef(met, ref, lset, hash)
            if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
                seriesAdded++
            }
        }

        // Increment added even if there's an error so we correctly report the
        // number of samples remaining after relabeling.
        added++

        if hasExemplar := p.Exemplar(&e); hasExemplar {
            if !e.HasTs {
                e.Ts = t
            }
            _, exemplarErr := app.AppendExemplar(ref, lset, e)
            exemplarErr = sl.checkAddExemplarError(exemplarErr, e, &appErrs)
            if exemplarErr != nil {
                // Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
                level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
            }
            e = exemplar.Exemplar{} // reset for next time round loop
        }

        if sl.appendMetadataToWAL && metadataChanged {
            if _, merr := app.UpdateMetadata(ref, lset, meta); merr != nil {
                // No need to fail the scrape on errors appending metadata.
                level.Debug(sl.l).Log("msg", "Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", meta), "err", merr)
            }
        }
    }
    if sampleLimitErr != nil {
        if err == nil {
            err = sampleLimitErr
        }
        // We only want to increment this once per scrape, so this is Inc'd outside the loop.
        targetScrapeSampleLimit.Inc()
    }
    if bucketLimitErr != nil {
        if err == nil {
            err = bucketLimitErr // If sample limit is hit, that error takes precedence.
        }
        // We only want to increment this once per scrape, so this is Inc'd outside the loop.
        targetScrapeNativeHistogramBucketLimit.Inc()
    }
    if appErrs.numOutOfOrder > 0 {
        level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
    }
    if appErrs.numDuplicates > 0 {
        level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
    }
    if appErrs.numOutOfBounds > 0 {
        level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
    }
    if appErrs.numExemplarOutOfOrder > 0 {
        level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
    }
    if err == nil {
        sl.cache.forEachStale(func(lset labels.Labels) bool {
            // Series no longer exposed, mark it stale.
            _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
            switch errors.Cause(err) {
            case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
                // Do not count these in logging, as this is expected if a target
                // goes away and comes back again with a new scrape loop.
                err = nil
            }
            return err == nil
        })
    }
    return
}

总结

  1. 萌发看 prometheus 源码的念头是在对我们自建的 prometheus 做指标优化的时候产生的。

  2. 有一说一,prometheus 源码的难度还挺大的,这一部分的源码,说实话,我只看懂了个大概,里面还有很多地方其实我还不是很清楚,我就重点看了一下指标抓取和过滤这一块。

  3. 前面也说了,这个对我来说难度也不小,文中如有错误概不负责,请谨慎观看。

  4. 这个还是要结合组件架构图来一起看才能明白得比较快一点。有一点我还不是很透彻,比如他的 append 接口,有一大堆实现,看了架构图才知道,它是存储到 fanout storage 这一块。