目录

Operator基础-Operator开发实践

开发属于自己的 Operator ,纯属玩玩,实际生产上可以用 CronJob 代替.

背景

mysql 需要定时远程备份,平时都是利用 xtrabackup 到本地,为了能够异地容灾,需要构建一个用来备份 mysql 数据库的程序,如果是写脚本需要在机器上面写 Crontab 去实现,如果用来执行备份的机器性能不够的话,分多台机器执行的话,机器还很分散不方便管理,这里我们的想法是通过 CRD 的形式将备份程序部署到集群,这样做的好处是可以通过集群 CRD 统一管理这些备份示例。

一开始是打算使用 cephfs 存储的,无奈线下的 ceph 集群版本比较老旧,无法使用 ceph-csi 和 external-provisioner 去做挂载(做测试还搞挂了 cephfs 的文件系统),只能使用 NFS 代替,不是高可用的。

/operator%E5%9F%BA%E7%A1%80-operator%E5%BC%80%E5%8F%91%E5%AE%9E%E8%B7%B5/OPS-Operator.png
OPS-Operator 架构图

整体的设计思路是通过 CRD 资源来创建 Job 来批处理备份程序,这里是模仿 CronJob 的思路来设计的,跟 CronJob 还是有区别的,只是如果真要这么玩,建议直接使用 kubernetes 内置的 CronJob ,就不需要自己这么折腾了。

项目初始化

按照之前我们体验过的 Demo 流程,我们先初始化项目目录,再创建 API 。

项目初始化:

kubebuilder init --domain isekiro.com --owner isekiro --repo github.com/isekiro/ops-operator

创建 API:

kubebuilder create api --group batch --version v1 --kind MysqlBackup

API 字段

按照 kubernetes 自带的 CronJob 的话,我们创建应用要配置很多 Pod 相关的内容,这里我们不用内置的 CronJob 模板,我们自己自定义一些字段,然后再在控制器逻辑层去组装我们自己的容器。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
type ConcurrencyPolicy string

const (
	// AllowConcurrent allows CronJobs to run concurrently.
	AllowConcurrent ConcurrencyPolicy = "Allow"

	// ForbidConcurrent forbids concurrent runs, skipping next run if previous
	// hasn't finished yet.
	ForbidConcurrent ConcurrencyPolicy = "Forbid"

	// ReplaceConcurrent cancels currently running job and replaces it with a new one.
	ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// MysqlBackupSpec defines the desired state of MysqlBackup
type MysqlBackupSpec struct {
	// +kubebuilder:validation:MinLength=0

	// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
	Schedule string `json:"schedule"`

	// +kubebuilder:validation:Minimum=0

	// Optional deadline in seconds for starting the job if it misses scheduled
	// time for any reason.  Missed jobs executions will be counted as failed ones.
	// +optional
	StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`

	// Specifies how to treat concurrent executions of a Job.
	// Valid values are:
	// - "Allow" (default): allows CronJobs to run concurrently;
	// - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
	// - "Replace": cancels currently running job and replaces it with a new one
	// +optional
	ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`

	// This flag tells the controller to suspend subsequent executions, it does
	// not apply to already started executions.  Defaults to false.
	// +optional
	Suspend *bool `json:"suspend,omitempty"`

	// +kubebuilder:validation:Minimum=0

	// The number of successful finished jobs to retain.
	// This is a pointer to distinguish between explicit zero and not specified.
	// +optional
	SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`

	// +kubebuilder:validation:Minimum=0

	// The number of failed finished jobs to retain.
	// This is a pointer to distinguish between explicit zero and not specified.
	// +optional
	FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`

	// 容器启动失败的重启策略
	// +optional
	RestartPolicy corev1.RestartPolicy `json:"restartPolicy,omitempty"`

	// 备份配置参数
	// +optional
	BackupEnvs []corev1.EnvVar `json:"backupEnvs,omitempty"`

	// 镜像配置字段
	// +optional
	Image string `json:"image,omitempty"`

	// 容器运行参数字段
	// +optional
	Args []string `json:"args,omitempty"`

	// 镜像拉取策略
	// +optional
	ImagePullPolicy string `json:"imagePullPolicy,omitempty"`

	// 声明挂载卷
	// +optional
	Volumes []corev1.Volume `json:"volumes,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`

	// 挂载卷到容器
	// +optional
	VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty" patchStrategy:"merge" patchMergeKey:"mountPath"`

	// 节点过滤器
	// +optional
	NodeSelector map[string]string `json:"nodeSelector,omitempty"`
}

// MysqlBackupStatus defines the observed state of MysqlBackup
type MysqlBackupStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	// A list of pointers to currently running jobs.
	// +optional
	Active []corev1.ObjectReference `json:"active,omitempty"`

	// Information when was the last time the job was successfully scheduled.
	// +optional
	LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// MysqlBackup is the Schema for the mysqlbackups API
type MysqlBackup struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   MysqlBackupSpec   `json:"spec,omitempty"`
	Status MysqlBackupStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// MysqlBackupList contains a list of MysqlBackup
type MysqlBackupList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []MysqlBackup `json:"items"`
}

我们将配置通过 backupEnvs 传入容器 env ,然后我们的备份程序去读取相关配置再执行备份任务,其他的字段是用来控制 Job 的行为和调度的。

业务逻辑

业务逻辑则控制我们的 API 字段如何去组装一个 Job ,和控制 Job 的行为。

定义 3 个数组来存放 Job ,用来控制 Job 的成功保留个数、失败保留个数和运行在的 Job 。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
  // find the active list of jobs
	var activeJobs []*kbatch.Job
	var successfulJobs []*kbatch.Job
	var failedJobs []*kbatch.Job
	var mostRecentTime *time.Time // find the last run so we can update the status

	isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
		for _, c := range job.Status.Conditions {
			if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
				return true, c.Type
			}
		}

		return false, ""
	}

	getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
		timeRaw := job.Annotations[scheduledTimeAnnotation]
		if len(timeRaw) == 0 {
			return nil, nil
		}

		timeParsed, err := time.Parse(time.RFC3339, timeRaw)
		if err != nil {
			return nil, err
		}
		return &timeParsed, nil
	}

	for i, job := range childJobs.Items {
		_, finishedType := isJobFinished(&job)
		switch finishedType {
		case "": // ongoing
			activeJobs = append(activeJobs, &childJobs.Items[i])
		case kbatch.JobFailed:
			failedJobs = append(failedJobs, &childJobs.Items[i])
		case kbatch.JobComplete:
			successfulJobs = append(successfulJobs, &childJobs.Items[i])
		}

		// We'll store the launch time in an annotation, so we'll reconstitute that from
		// the active jobs themselves.
		scheduledTimeForJob, err := getScheduledTimeForJob(&job)
		if err != nil {
			log.Error(err, "unable to parse schedule time for child job", "job", &job)
			continue
		}
		if scheduledTimeForJob != nil {
			if mostRecentTime == nil {
				mostRecentTime = scheduledTimeForJob
			} else if mostRecentTime.Before(*scheduledTimeForJob) {
				mostRecentTime = scheduledTimeForJob
			}
		}
	}

	if mostRecentTime != nil {
		mysqlBackup.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
	} else {
		mysqlBackup.Status.LastScheduleTime = nil
	}
	mysqlBackup.Status.Active = nil
	for _, activeJob := range activeJobs {
		jobRef, err := ref.GetReference(r.Scheme, activeJob)
		if err != nil {
			log.Error(err, "unable to make reference to active job", "job", activeJob)
			continue
		}
		mysqlBackup.Status.Active = append(mysqlBackup.Status.Active, *jobRef)
	}

	log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))

	if err := r.Status().Update(ctx, &mysqlBackup); err != nil {
		log.Error(err, "unable to update MysqlBackup status")
		return ctrl.Result{}, err
	}

	// Clean up old jobs according to the history limit
	// NB: deleting these are "best effort" -- if we fail on a particular one,
	// we won't requeue just to finish the deleting.
	if mysqlBackup.Spec.FailedJobsHistoryLimit != nil {
		sort.Slice(failedJobs, func(i, j int) bool {
			if failedJobs[i].Status.StartTime == nil {
				return failedJobs[j].Status.StartTime != nil
			}
			return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
		})
		for i, job := range failedJobs {
			if int32(i) >= int32(len(failedJobs))-*mysqlBackup.Spec.FailedJobsHistoryLimit {
				break
			}
			if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
				log.Error(err, "unable to delete old failed job", "job", job)
			} else {
				log.V(0).Info("deleted old failed job", "job", job)
			}
		}
	}

	if mysqlBackup.Spec.SuccessfulJobsHistoryLimit != nil {
		sort.Slice(successfulJobs, func(i, j int) bool {
			if successfulJobs[i].Status.StartTime == nil {
				return successfulJobs[j].Status.StartTime != nil
			}
			return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
		})
		for i, job := range successfulJobs {
			if int32(i) >= int32(len(successfulJobs))-*mysqlBackup.Spec.SuccessfulJobsHistoryLimit {
				break
			}
			if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
				log.Error(err, "unable to delete old successful job", "job", job)
			} else {
				log.V(0).Info("deleted old successful job", "job", job)
			}
		}
	}

上面一些列的逻辑,是对已有 Job 的处理,如果 Job 不存在,则新建 Job Pod 容器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
  newContainers := func(app *batchv1.MysqlBackup) []corev1.Container {
		return []corev1.Container{
			{
				Name:            app.Name,
				Env:             app.Spec.BackupEnvs,
				Image:           app.Spec.Image,
				Args:            app.Spec.Args,
				ImagePullPolicy: corev1.PullPolicy(app.Spec.ImagePullPolicy),
				VolumeMounts:    app.Spec.DeepCopy().VolumeMounts,
			},
		}
	}

	newVolumes := func(app *batchv1.MysqlBackup) []corev1.Volume {
		return app.Spec.Volumes
	}

	newRestartPolicy := func(app *batchv1.MysqlBackup) corev1.RestartPolicy {
		return app.Spec.RestartPolicy
	}

	newNodeSelector := func(app *batchv1.MysqlBackup) map[string]string {
		return app.Spec.NodeSelector
	}

	constructJobForMysqlBackup := func(mysqlBackup *batchv1.MysqlBackup, scheduledTime time.Time) (*kbatch.Job, error) {
		// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
		name := fmt.Sprintf("%s-%d", mysqlBackup.Name, scheduledTime.Unix())

		job := &kbatch.Job{
			ObjectMeta: metav1.ObjectMeta{
				Labels:      make(map[string]string),
				Annotations: make(map[string]string),
				Name:        name,
				Namespace:   mysqlBackup.Namespace,
			},
			// Spec: *mysqlBackup.Spec.JobTemplate.Spec.DeepCopy(),
		}

		// build conntainer
		job.Spec.Template.Spec.Containers = newContainers(mysqlBackup)
		// build volumes
		job.Spec.Template.Spec.Volumes = newVolumes(mysqlBackup)
		// build restartPolicy
		job.Spec.Template.Spec.RestartPolicy = newRestartPolicy(mysqlBackup)
		// build nodeSelector
		job.Spec.Template.Spec.NodeSelector = newNodeSelector(mysqlBackup)

		job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
		
		if err := ctrl.SetControllerReference(mysqlBackup, job, r.Scheme); err != nil {
			return nil, err
		}

		return job, nil
	}

	// actually make the job...
	job, err := constructJobForMysqlBackup(&mysqlBackup, missedRun)
	if err != nil {
		log.Error(err, "unable to construct job from template")
		// don't bother requeuing until we get a change to the spec
		return scheduledResult, nil
	}

	// ...and create it on the cluster
	if err := r.Create(ctx, job); err != nil {
		log.Error(err, "unable to create Job for MysqlBackup", "job", job)
		return ctrl.Result{}, err
	}

	log.V(1).Info("created Job for MysqlBackup run", "job", job)

	return scheduledResult, nil

admission webhook

有了业务逻辑,我们也得去对我们提交的 yaml 做判断,防止我们把错误的配置提交到 api-server 里面去。

创建 webhook 校验

kubebuilder create webhook --group batch --version v1 --kind MysqlBackup --defaulting --programmatic-validation

会生成 api/v1/mysqlbackup_webhook.go 文件,webhook 的业务逻辑就配置在里面。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *MysqlBackup) Default() {
	mysqlbackuplog.Info("default", "name", r.Name)

	if r.Spec.ConcurrencyPolicy == "" {
		r.Spec.ConcurrencyPolicy = AllowConcurrent
	}
	if r.Spec.Suspend == nil {
		r.Spec.Suspend = new(bool)
	}
	if r.Spec.SuccessfulJobsHistoryLimit == nil {
		r.Spec.SuccessfulJobsHistoryLimit = new(int32)
		*r.Spec.SuccessfulJobsHistoryLimit = 3
	}
	if r.Spec.FailedJobsHistoryLimit == nil {
		r.Spec.FailedJobsHistoryLimit = new(int32)
		*r.Spec.FailedJobsHistoryLimit = 1
	}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-batch-isekiro-com-v1-mysqlbackup,mutating=false,failurePolicy=fail,sideEffects=None,groups=batch.isekiro.com,resources=mysqlbackups,verbs=create;update,versions=v1,name=vmysqlbackup.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &MysqlBackup{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *MysqlBackup) ValidateCreate() error {
	mysqlbackuplog.Info("validate create", "name", r.Name)

	return r.validateMysqlBackup()
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *MysqlBackup) ValidateUpdate(old runtime.Object) error {
	mysqlbackuplog.Info("validate update", "name", r.Name)

	return r.validateMysqlBackup()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *MysqlBackup) ValidateDelete() error {
	mysqlbackuplog.Info("validate delete", "name", r.Name)

	// TODO(user): fill in your validation logic upon object deletion.
	return nil
}

func (r *MysqlBackup) validateMysqlBackup() error {
	var allErrs field.ErrorList
	if err := r.validateMysqlBackupName(); err != nil {
		allErrs = append(allErrs, err)
	}
	if err := r.validateMysqlBackupSpec(); err != nil {
		allErrs = append(allErrs, err)
	}
	if len(allErrs) == 0 {
		return nil
	}

	return apierrors.NewInvalid(
		schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "MysqlBackup"},
		r.Name, allErrs)
}

func (r *MysqlBackup) validateMysqlBackupSpec() *field.Error {
	// kubernetes API machinery 的字段助手会帮助我们很好地返回结构化的验证错误。
	return validateScheduleFormat(
		r.Spec.Schedule,
		field.NewPath("spec").Child("schedule"))
}

func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error {
	if _, err := cron.ParseStandard(schedule); err != nil {
		return field.Invalid(fldPath, schedule, err.Error())
	}
	return nil
}

func (r *MysqlBackup) validateMysqlBackupName() *field.Error {
	if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 {
		// The job name length is 63 character like all Kubernetes objects
		// (which must fit in a DNS subdomain). The cronjob controller appends
		// a 11-character suffix to the cronjob (`-$TIMESTAMP`) when creating
		// a job. The job name length limit is 63 characters. Therefore cronjob
		// names must have length <= 63-11=52. If we don't validate this here,
		// then job creation will fail later.
		return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters")
	}
	return nil
}

准入控制分 2 种,一个是变种准入控制,一种是校验准入控制。

变种准入控制:这里我们只对并发策略、失败 Job 个数和成功 Job 个数做了一个默认配置的操作。

校验准入控制:我们只校验名称长度不超过 52 个字符和对 Schedule 调度字符串的校验。

创建完 webhook 如需要在本地 make run ,需要在 /tmp/k8s-webhook-server/serving-certs 目录下有 {tls.crt , tls.key} 文件。

openssl req -new -x509 -days 365 -nodes -out /tmp/k8s-webhook-server/serving-certs/tls.crt -keyout /tmp/k8s-webhook-server/serving-certs/tls.key

部署及运行

之前的 Demo 我们也已经熟悉了部署的过程。

因为我们这边启用了 webhook ,需要在集群内安装 cert-manager 或同等产品,来给控制器颁发证书。

1
2
3
4
5
6
helm repo add jetstack https://charts.jetstack.io

helm repo update

# 1.10.0 以上版本不支持 kubernetes 1.20.x
helm install   cert-manager jetstack/cert-manager   --namespace cert-manager   --version v1.10.0   --set installCRDs=true

然后修改 config/default/kustomization.yaml 配置文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 打开 bases 以下选项
bases:
...
- ../webhook
- ../certmanager

patchesStrategicMerge:
...
- manager_webhook_patch.yaml
- webhookcainjection_patch.yaml

# vars 全取消注释
vars:
# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER' prefix.
- name: CERTIFICATE_NAMESPACE # namespace of the certificate CR
  objref:
    kind: Certificate
    group: cert-manager.io
    version: v1
    name: serving-cert # this name should match the one in certificate.yaml
  fieldref:
    fieldpath: metadata.namespace
- name: CERTIFICATE_NAME
  objref:
    kind: Certificate
    group: cert-manager.io
    version: v1
    name: serving-cert # this name should match the one in certificate.yaml
- name: SERVICE_NAMESPACE # namespace of the service
  objref:
    kind: Service
    version: v1
    name: webhook-service
  fieldref:
    fieldpath: metadata.namespace
- name: SERVICE_NAME
  objref:
    kind: Service
    version: v1
    name: webhook-service

接下来,配置 config/crd/kustomization.yaml :

1
2
3
4
5
6
7
8
resources:
- bases/batch.isekiro.com_mysqlbackups.yaml

patchesStrategicMerge:
- patches/webhook_in_mysqlbackups.yaml
- patches/cainjection_in_mysqlbackups.yaml
configurations:
- kustomizeconfig.yaml

构建 Operator 镜像

make docker-build IMG=ops-operator:v0.1

部署 Operator 到集群

make deploy IMG=ops-operator:v0.1

yaml 样本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
apiVersion: batch.isekiro.com/v1
kind: MysqlBackup
metadata:
  name: mysqlbackup-sample
spec:
  nodeSelector:
    kubernetes.io/appRole: mysqlbackup-v2
  successfulJobsHistoryLimit: 1
  restartPolicy: OnFailure
  image: mysqlbackup-v2:alpha6
  imagePullPolicy: IfNotPresent
  volumes:
  - name: data
    persistentVolumeClaim:
      claimName: mysqlbackup-v2-pvc
  volumeMounts:         
  - name: data
    mountPath: /mnt
  backupEnvs:
  - name: TZ
    value: Asia/Shanghai
  - name: HOST
    value: "192.168.21.101"
  - name: PORT
    value: "3306"
  - name: USERNAME
    value: "root"
  - name: PASSWORD
    value: "xxxxx"
  - name: EXPIREAT
    value: "24h"
  - name: SQLPATH
    value: "/mnt"
  - name: APIURL
    value: "https://open.feishu.cn/open-apis/bot/v2/hook/xxx"
  # 单位是小时,后续优化一下可读性
  - name: TIMEOUT
    value: "9"
  schedule: "0 0 * * *"
  startingDeadlineSeconds: 60
  concurrencyPolicy: Allow
注意
需要我们提前创建好 NFS 服务器,创建好 PVC 。

创建资源看看效果

kubectl create -f config/samples/batch_v1_cronjob.yaml

从上面 yaml 字段中可以看出,我们的备份程序有通知功能,后面如果有时间,可以把备份程序的代码贴出来,基于 mysqldump 做了一层封装,使其具备有远程备份 mysql 数据库的能力。

结束

至此我们的 OPS-Operator 构建完成,这是一个练手项目,我的设想是以后如果有其他要在集群内运行的任务或者其他类型的部署,考虑可以集成到这个里面,就不需要去写各种脚本或者各种简单的控制来维护。这样更优雅更有新鲜感。