目录

kubernetes源码-控制器 leader 选举机制

自定义控制器 HA 实现 leader 选举,弄清楚选举是什么一个机制.

写在前面

  • 最近在写自定义控制器的时候,发现一个问题,如果我要高可用,那必须要开多个 pod ,但是如果我的控制器里面是一些定时任务类型的服务,那有多少个 pod ,任务就会被同时运行多少次,或者监听到同一个事件,导致事件被多次重复处理。

  • 而我不希望他多次执行,于是我想到了 kube-controller-manager 和 kube-scheduler ,他们本身也是多个 pod ,但是始终只有一个 pod 起到主的作用,其他都是从,所以,本着求知的欲望,看看它是怎么实现的,然后我们依样画葫芦,实现到自己的控制器上。

入口函数

假设我们已经属性 kube-controller-manager 的源码,在 kube-controller-manager 的启动函数内,我们可以看到这样一段代码。

我们来先看一下它的执行步骤:

  1. 如果没有启用选举机制,则直接调用 run() 函数启动控制器。

  2. 生成资源锁相关的参数,如 id ,资源锁类型等。

  3. 启动 leader 选举并运行控制器。

  4. 重点在 leaderElectAndRun() 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    // No leader election, run directly
    if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
        run(context.TODO(), saTokenControllerInitFunc, NewControllerInitializers)
        panic("unreachable")
    }

    id, err := os.Hostname()
    if err != nil {
        return err
    }

    // add a uniquifier so that two processes on the same host don't accidentally both become active
    id = id + "_" + string(uuid.NewUUID())

    // leaderMigrator will be non-nil if and only if Leader Migration is enabled.
    var leaderMigrator *leadermigration.LeaderMigrator = nil

    // startSATokenController will be original saTokenControllerInitFunc if leader migration is not enabled.
    startSATokenController := saTokenControllerInitFunc

    // If leader migration is enabled, create the LeaderMigrator and prepare for migration
    if leadermigration.Enabled(&c.ComponentConfig.Generic) {
        klog.Infof("starting leader migration")

        leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
            "kube-controller-manager")

        // Wrap saTokenControllerInitFunc to signal readiness for migration after starting
        //  the controller.
        startSATokenController = func(ctx ControllerContext) (http.Handler, bool, error) {
            defer close(leaderMigrator.MigrationReady)
            return saTokenControllerInitFunc(ctx)
        }
    }

    // Start the main lock
    go leaderElectAndRun(c, id, electionChecker,
        c.ComponentConfig.Generic.LeaderElection.ResourceLock,
        c.ComponentConfig.Generic.LeaderElection.ResourceName,
        leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                initializersFunc := NewControllerInitializers
                if leaderMigrator != nil {
                    // If leader migration is enabled, we should start only non-migrated controllers
                    //  for the main lock.
                    initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
                    klog.Info("leader migration: starting main controllers.")
                }
                run(ctx, startSATokenController, initializersFunc)
            },
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        })

    // If Leader Migration is enabled, proceed to attempt the migration lock.
    if leaderMigrator != nil {
        // Wait for Service Account Token Controller to start before acquiring the migration lock.
        // At this point, the main lock must have already been acquired, or the KCM process already exited.
        // We wait for the main lock before acquiring the migration lock to prevent the situation
        //  where KCM instance A holds the main lock while KCM instance B holds the migration lock.
        <-leaderMigrator.MigrationReady

        // Start the migration lock.
        go leaderElectAndRun(c, id, electionChecker,
            c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
            c.ComponentConfig.Generic.LeaderMigration.LeaderName,
            leaderelection.LeaderCallbacks{
                OnStartedLeading: func(ctx context.Context) {
                    klog.Info("leader migration: starting migrated controllers.")
                    // DO NOT start saTokenController under migration lock
                    run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
                },
                OnStoppedLeading: func() {
                    klog.Fatalf("migration leaderelection lost")
                },
            })
    }

代码分析

好了,我们知道重点在 leaderElectAndRun() 函数里面,我们接下来就来详细看看它是怎么一回事。

参数

leaderElectAndRun() 接收 6 个参数:

  • 第一个是控制器的配置参数。

  • 第二个是资源锁 id ,默认是主机名 + 随机的 uuid 。

  • 第三个是健康检查适配器。

  • 第四个是资源锁类型。

  • 第五个是 leader 名称。

  • 第六个是回调函数。

逻辑

我们看看 leaderElectAndRun() 具体逻辑:

  1. 调用 resourcelock.NewFromKubeconfig() 初始化一个资源锁。传入资源锁类型、命名空间、租约名、clientset(用于创建租约) 和身份 id 等参数。

  2. 调用 leaderelection.RunOrDie() 启动 leader 选举。

  3. leaderelection 包是导入 client-go/tools/leaderelection/leaderelection.go 的。

  4. 我们跳转过去看看 leaderelection 的源码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
    rl, err := resourcelock.NewFromKubeconfig(resourceLock,
        c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
        leaseName,
        resourcelock.ResourceLockConfig{
            Identity:      lockIdentity,
            EventRecorder: c.EventRecorder,
        },
        c.Kubeconfig,
        c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
    if err != nil {
        klog.Fatalf("error creating lock: %v", err)
    }

    leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
        Callbacks:     callbacks,
        WatchDog:      electionChecker,
        Name:          leaseName,
    })

    panic("unreachable")
}

类型

在 leaderelection/resourcelock 包下,我们看到 client-go 在这里定义了抽象的资源锁对象。

分别有 4 种不同的资源类型,1.20 版本以后,租约类型默认是 leases ,可以较大减少开销。

  • EndpointsLock // 1.17 默认是这个类型

  • ConfigMapLock

  • LeaseLock

  • multilock

学过面向对象编程编程的同学应该都懂,四种资源锁都会实现下面的接口,面向这个抽象的资源锁对象编程。

NewFromKubeconfig() 是去调 New() 函数完成构建,我们可以看到它在这里判断资源锁类型,完成资源锁对象的构建。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Interface offers a common interface for locking on arbitrary
// resources used in leader election.  The Interface is used
// to hide the details on specific implementations in order to allow
// them to change over time.  This interface is strictly for use
// by the leaderelection code.
type Interface interface {
    // Get returns the LeaderElectionRecord
    Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

    // Create attempts to create a LeaderElectionRecord
    Create(ctx context.Context, ler LeaderElectionRecord) error

    // Update will update and existing LeaderElectionRecord
    Update(ctx context.Context, ler LeaderElectionRecord) error

    // RecordEvent is used to record events
    RecordEvent(string)

    // Identity will return the locks Identity
    Identity() string

    // Describe is used to convert details on current resource lock
    // into a string
    Describe() string
}

// Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
    endpointsLock := &EndpointsLock{
        EndpointsMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coreClient,
        LockConfig: rlc,
    }
    configmapLock := &ConfigMapLock{
        ConfigMapMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coreClient,
        LockConfig: rlc,
    }
    leaseLock := &LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coordinationClient,
        LockConfig: rlc,
    }
    switch lockType {
    case EndpointsResourceLock:
        return endpointsLock, nil
    case ConfigMapsResourceLock:
        return configmapLock, nil
    case LeasesResourceLock:
        return leaseLock, nil
    case EndpointsLeasesResourceLock:
        return &MultiLock{
            Primary:   endpointsLock,
            Secondary: leaseLock,
        }, nil
    case ConfigMapsLeasesResourceLock:
        return &MultiLock{
            Primary:   configmapLock,
            Secondary: leaseLock,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

// NewFromKubeconfig will create a lock of a given type according to the input parameters.
// Timeout set for a client used to contact to Kubernetes should be lower than
// RenewDeadline to keep a single hung request from forcing a leader loss.
// Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic.
func NewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) {
    // shallow copy, do not modify the kubeconfig
    config := *kubeconfig
    timeout := renewDeadline / 2
    if timeout < time.Second {
        timeout = time.Second
    }
    config.Timeout = timeout
    leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))
    return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc)
}

默认值

前面我们说到,它默认的资源锁类型是 leases ,我们是从哪得知的?

kube-controller-manager 默认值在这里完成注册: pkg/controller/apis/config/v1alpha1/defaults.go ,我们可以在这里找到 leader 选举相关的一些默认值。根据前面的分析,我们知道他的配置主要在 Generic 这个字段下。

1
cmconfigv1alpha1.RecommendedDefaultGenericControllerManagerConfiguration(&obj.Generic)

跳转到位置:vendor/k8s.io/controller-manager/config/v1alpha1/defaults.go ,可以看到资源锁类型的默认值 “leases” 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    if len(obj.LeaderElection.ResourceLock) == 0 {
        // Use lease-based leader election to reduce cost.
        // We migrated for EndpointsLease lock in 1.17 and starting in 1.20 we
        // migrated to Lease lock.
        obj.LeaderElection.ResourceLock = "leases"
    }

    ...

    componentbaseconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&obj.LeaderElection)

按照我们刚才的方法找到资源锁的其他默认配置位置:vendor/k8s.io/component-base/config/v1alpha1/defaults.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func RecommendedDefaultLeaderElectionConfiguration(obj *LeaderElectionConfiguration) {
    zero := metav1.Duration{}
    if obj.LeaseDuration == zero {
        obj.LeaseDuration = metav1.Duration{Duration: 15 * time.Second}
    }
    if obj.RenewDeadline == zero {
        obj.RenewDeadline = metav1.Duration{Duration: 10 * time.Second}
    }
    if obj.RetryPeriod == zero {
        obj.RetryPeriod = metav1.Duration{Duration: 2 * time.Second}
    }
    if obj.ResourceLock == "" {
        // TODO(#80289): Figure out how to migrate to LeaseLock at this point.
        //   This will most probably require going through EndpointsLease first.
        obj.ResourceLock = EndpointsResourceLock
    }
    if obj.LeaderElect == nil {
        obj.LeaderElect = utilpointer.BoolPtr(true)
    }
}

原理

我们从前面得知,它的启动入口函数是 RunOrDie() 。

  1. 首先它先 new 一个选举实例 NewLeaderElector() ,然后设置健康检查相关的配置,再去调用 Run() 方法启动。

  2. Run() 方法里面调用 acquire() 方法去获取资源锁,获取成功后会执行回调函数 OnStartedLeading() ,le.renew(ctx) 会一直尝试获取资源锁,直到回调函数返回 true 或报错或通道关闭为止,leader 停止退出会调用回调函数 OnStoppedLeading() 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
    if lec.LeaseDuration <= lec.RenewDeadline {
        return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
    }
    if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
        return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
    }
    if lec.LeaseDuration < 1 {
        return nil, fmt.Errorf("leaseDuration must be greater than zero")
    }
    if lec.RenewDeadline < 1 {
        return nil, fmt.Errorf("renewDeadline must be greater than zero")
    }
    if lec.RetryPeriod < 1 {
        return nil, fmt.Errorf("retryPeriod must be greater than zero")
    }
    if lec.Callbacks.OnStartedLeading == nil {
        return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
    }
    if lec.Callbacks.OnStoppedLeading == nil {
        return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
    }

    if lec.Lock == nil {
        return nil, fmt.Errorf("Lock must not be nil.")
    }
    le := LeaderElector{
        config:  lec,
        clock:   clock.RealClock{},
        metrics: globalMetricsFactory.newLeaderMetrics(),
    }
    le.metrics.leaderOff(le.config.Name)
    return &le, nil
}

// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer func() {
        le.config.Callbacks.OnStoppedLeading()
    }()

    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx)
    le.renew(ctx)
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}

我们看看 acquire() 方法,是怎么获取资源锁的。

使用 wait.JitterUntil() 函数,该函数将以 RetryPeriod 为间隔周期性执行,如果 RetryPeriod 大于 0.0 间隔时间变为 duration 到 duration + maxFactor * duration 的随机值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// GetLeader returns the identity of the last observed leader or returns the empty string if
// no leader has yet been observed.
// This function is for informational purposes. (e.g. monitoring, logs, etc.)
func (le *LeaderElector) GetLeader() string {
    return le.getObservedRecord().HolderIdentity
}

// IsLeader returns true if the last observed leader was this client else returns false.
func (le *LeaderElector) IsLeader() bool {
    return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
}

// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    succeeded := false
    desc := le.config.Lock.Describe()
    klog.Infof("attempting to acquire leader lease %v...", desc)
    wait.JitterUntil(func() {
        succeeded = le.tryAcquireOrRenew(ctx)
        le.maybeReportTransition()
        if !succeeded {
            klog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("became leader")
        le.metrics.leaderOn(le.config.Name)
        klog.Infof("successfully acquired lease %v", desc)
        cancel()
    }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
    return succeeded
}

周期性函数内,调用 tryAcquireOrRenew(ctx) 方法尝试获取或重试获取资源锁。

  1. 构建 leaderElectionRecord ,记录资源锁信息。

  2. e.config.Lock.Get(ctx) 通过 clientset 获取 lease 信息,返回 record , recordByte 2个值,获取失败先判断是否报不存在 lease 的错,是的话就新建一个 lease 对象,然后通过 setObservedRecord() 方法,记录新建的 lease 信息 。

  3. 然后对比 e.config.Lock.Get(ctx) 获取的记录跟上一次获取的记录是否一致,不一致的话,则将缓存替换为刚才查询出来的值。检查 leader 身份是否是自己,是否过期。是自己就更新本地的 leaderElectionRecord 和 le.reportedLeader 信息。

  4. le.config.Lock.Update() 把信息更新到 etcd 上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
    now := metav1.Now()
    leaderElectionRecord := rl.LeaderElectionRecord{
        HolderIdentity:       le.config.Lock.Identity(),
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
        RenewTime:            now,
        AcquireTime:          now,
    }

    // 1. obtain or create the ElectionRecord
    oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
    if err != nil {
        if !errors.IsNotFound(err) {
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }

        le.setObservedRecord(&leaderElectionRecord)

        return true
    }

    // 2. Record obtained, check the Identity & Time
    if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
        le.setObservedRecord(oldLeaderElectionRecord)

        le.observedRawRecord = oldLeaderElectionRawRecord
    }
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
        le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        !le.IsLeader() {
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3. We're going to try to update. The leaderElectionRecord is set to it's default
    // here. Let's correct it before updating.
    if le.IsLeader() {
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else {
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }

    // update the lock itself
    if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }

    le.setObservedRecord(&leaderElectionRecord)
    return true
}

接下来看看 renew() ,按照目前我的理解,只要该节点不是 leader 就会一直重试获取资源锁,直到自己是 leader ,条件为 true ,则不再 renew 。

  • wait.PollImmediateUntil() 会周期性执行第二个参数的函数,直到函数返回 true,错误或者通道关闭才会停止运行。

  • 第二个参数函数 le.tryAcquireOrRenew(timeoutCtx) ,renew 成功后更新 leader 转化成功的信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    wait.Until(func() {
        timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
        defer timeoutCancel()
        err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
            return le.tryAcquireOrRenew(timeoutCtx), nil
        }, timeoutCtx.Done())

        le.maybeReportTransition()
        desc := le.config.Lock.Describe()
        if err == nil {
            klog.V(5).Infof("successfully renewed lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("stopped leading")
        le.metrics.leaderOff(le.config.Name)
        klog.Infof("failed to renew lease %v: %v", desc, err)
        cancel()
    }, le.config.RetryPeriod, ctx.Done())

    // if we hold the lease, give it up
    if le.config.ReleaseOnCancel {
        le.release()
    }
}

// release attempts to release the leader lease if we have acquired it.
func (le *LeaderElector) release() bool {
    if !le.IsLeader() {
        return true
    }
    now := metav1.Now()
    leaderElectionRecord := rl.LeaderElectionRecord{
        LeaderTransitions:    le.observedRecord.LeaderTransitions,
        LeaseDurationSeconds: 1,
        RenewTime:            now,
        AcquireTime:          now,
    }
    if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
        klog.Errorf("Failed to release lock: %v", err)
        return false
    }

    le.setObservedRecord(&leaderElectionRecord)
    return true
}

func (le *LeaderElector) maybeReportTransition() {
    if le.observedRecord.HolderIdentity == le.reportedLeader {
        return
    }
    le.reportedLeader = le.observedRecord.HolderIdentity
    if le.config.Callbacks.OnNewLeader != nil {
        go le.config.Callbacks.OnNewLeader(le.reportedLeader)
    }
}

// Check will determine if the current lease is expired by more than timeout.
func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
    if !le.IsLeader() {
        // Currently not concerned with the case that we are hot standby
        return nil
    }
    // If we are more than timeout seconds after the lease duration that is past the timeout
    // on the lease renew. Time to start reporting ourselves as unhealthy. We should have
    // died but conditions like deadlock can prevent this. (See #70819)
    if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
        return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
    }

    return nil
}

// setObservedRecord will set a new observedRecord and update observedTime to the current time.
// Protect critical sections with lock.
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
    le.observedRecordLock.Lock()
    defer le.observedRecordLock.Unlock()

    le.observedRecord = *observedRecord
    le.observedTime = le.clock.Now()
}

// getObservedRecord returns observersRecord.
// Protect critical sections with lock.
func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord {
    le.observedRecordLock.Lock()
    defer le.observedRecordLock.Unlock()

    return le.observedRecord
}

编码

我们明白了选举机制后,我们就可以给我们自己的控制器来安排一下选举方面的代码。

我们可以在 vendor/k8s.io/client-go/examples/leader-election/main.go client-go 代码里面找到相关的实例代码,我们先试试。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
	"context"
	"flag"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/klog/v2"
)

// 获取 kubeconfig 或从容器内根据 sa 生成 clientset
func buildConfig(kubeconfig string) (*rest.Config, error) {
	if kubeconfig != "" {
		cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
		if err != nil {
			return nil, err
		}
		return cfg, nil
	}

	cfg, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}
	return cfg, nil
}

func main() {
	klog.InitFlags(nil)

	var kubeconfig string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

    // flag 获取传参
	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
	flag.Parse()

	if leaseLockName == "" {
		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
	}
	if leaseLockNamespace == "" {
		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
	}

	// 生成 clientset
	config, err := buildConfig(kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}
	client := clientset.NewForConfigOrDie(config)

    // 替换成我们自己的业务代码
	run := func(ctx context.Context) {
		// complete your controller loop here
		klog.Info("Controller loop...")

		select {}
	}

	// use a Go context so we can tell the leaderelection code when we
	// want to step down
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 监听暂停信号,优雅停止
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	// 创建资源锁
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// 开启选举循环
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock: lock,
		// 业务代码的进程必须在 leader 退出前结束运行,不然可能会跟下个 leader 选举出来后启动的进程出现冲突
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second,
		RenewDeadline:   15 * time.Second,
		RetryPeriod:     5 * time.Second,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				// we're notified when we start - this is where you would
				// usually put your code
				run(ctx)
			},
			OnStoppedLeading: func() {
				// we can do cleanup here
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
				// we're notified when new leader elected
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

可以直接运行代码验证一下:

1
2
3
4
5
6
7
8
# first terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1

# second terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2

# third terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3

我这边是封装到 pod 里面,跑在集群上测试的。

1
2
3
4
5
6
7
8
9
leader-election-77456b57fc-2rzn5 I0602 06:26:37.307468       1 leaderelection.go:245] attempting to acquire leader lease default/leader-election-test...
leader-election-77456b57fc-2rzn5 I0602 06:26:37.338405       1 main.go:151] new leader elected: 6f321b5f-acac-4207-be63-ac2b3149245f
leader-election-77456b57fc-gqmw2 I0601 03:28:33.857738       1 leaderelection.go:245] attempting to acquire leader lease default/leader-election-test...
leader-election-77456b57fc-gqmw2 I0601 03:28:33.884765       1 main.go:151] new leader elected: 6f321b5f-acac-4207-be63-ac2b3149245f
leader-election-77456b57fc-chvqz I0601 03:27:15.064738       1 leaderelection.go:245] attempting to acquire leader lease default/leader-election-test...
leader-election-77456b57fc-chvqz I0601 03:27:15.090609       1 main.go:151] new leader elected: 70b51d3d-d0f5-45fc-a205-ea4445a495d7
leader-election-77456b57fc-chvqz I0601 03:27:56.005594       1 main.go:151] new leader elected: 8ef6f802-bae3-4064-8dc0-11915813c429
leader-election-77456b57fc-chvqz I0601 03:28:21.647649       1 leaderelection.go:255] successfully acquired lease default/leader-election-test
leader-election-77456b57fc-chvqz I0601 03:28:21.647937       1 main.go:87] Controller loop... 

查看 lease 详情:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
kubectl get lease
NAME                      HOLDER                                 AGE
leader-election-test      6f321b5f-acac-4207-be63-ac2b3149245f   23h

kubectl describe lease leader-election-test 
Name:         leader-election-test
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2023-06-01T06:46:08Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:acquireTime:
        f:holderIdentity:
        f:leaseDurationSeconds:
        f:leaseTransitions:
        f:renewTime:
    Manager:         leaderelection
    Operation:       Update
    Time:            2023-06-01T06:46:08Z
  Resource Version:  1705857
  UID:               c0b54f9e-ef7f-4c4a-ae60-e3ec9bb9999d
Spec:
  Acquire Time:            2023-06-01T06:46:08.502664Z
  Holder Identity:         6f321b5f-acac-4207-be63-ac2b3149245f
  Lease Duration Seconds:  60
  Lease Transitions:       0
  Renew Time:              2023-06-02T06:29:04.583031Z
Events:                    <none>

总结

  • leader 选举机制在 1.20 版本以后改用 lease 来实现,之前使用 endpoint 和 configmap 的机制,不够因为其他控制器也在监听这些资源,更新频繁开销比较大,就换成使用 lease 了。

  • 选举机制保证了控制器的高可用,同时只有一个控制器为主,其他为从,防止同个事件被多次重复监听,重复执行相关的业务逻辑。

  • client-go 有简单的资源锁例子,我们可以直接拿来改一改后使用。