typeConcurrencyPolicystringconst(// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrentConcurrencyPolicy="Allow"// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrentConcurrencyPolicy="Forbid"// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrentConcurrencyPolicy="Replace")// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// MysqlBackupSpec defines the desired state of MysqlBackup
typeMysqlBackupSpecstruct{// +kubebuilder:validation:MinLength=0
// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
Schedulestring`json:"schedule"`// +kubebuilder:validation:Minimum=0
// Optional deadline in seconds for starting the job if it misses scheduled
// time for any reason. Missed jobs executions will be counted as failed ones.
// +optional
StartingDeadlineSeconds*int64`json:"startingDeadlineSeconds,omitempty"`// Specifies how to treat concurrent executions of a Job.
// Valid values are:
// - "Allow" (default): allows CronJobs to run concurrently;
// - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
// - "Replace": cancels currently running job and replaces it with a new one
// +optional
ConcurrencyPolicyConcurrencyPolicy`json:"concurrencyPolicy,omitempty"`// This flag tells the controller to suspend subsequent executions, it does
// not apply to already started executions. Defaults to false.
// +optional
Suspend*bool`json:"suspend,omitempty"`// +kubebuilder:validation:Minimum=0
// The number of successful finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
SuccessfulJobsHistoryLimit*int32`json:"successfulJobsHistoryLimit,omitempty"`// +kubebuilder:validation:Minimum=0
// The number of failed finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
FailedJobsHistoryLimit*int32`json:"failedJobsHistoryLimit,omitempty"`// 容器启动失败的重启策略
// +optional
RestartPolicycorev1.RestartPolicy`json:"restartPolicy,omitempty"`// 备份配置参数
// +optional
BackupEnvs[]corev1.EnvVar`json:"backupEnvs,omitempty"`// 镜像配置字段
// +optional
Imagestring`json:"image,omitempty"`// 容器运行参数字段
// +optional
Args[]string`json:"args,omitempty"`// 镜像拉取策略
// +optional
ImagePullPolicystring`json:"imagePullPolicy,omitempty"`// 声明挂载卷
// +optional
Volumes[]corev1.Volume`json:"volumes,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`// 挂载卷到容器
// +optional
VolumeMounts[]corev1.VolumeMount`json:"volumeMounts,omitempty" patchStrategy:"merge" patchMergeKey:"mountPath"`// 节点过滤器
// +optional
NodeSelectormap[string]string`json:"nodeSelector,omitempty"`}// MysqlBackupStatus defines the observed state of MysqlBackup
typeMysqlBackupStatusstruct{// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
// A list of pointers to currently running jobs.
// +optional
Active[]corev1.ObjectReference`json:"active,omitempty"`// Information when was the last time the job was successfully scheduled.
// +optional
LastScheduleTime*metav1.Time`json:"lastScheduleTime,omitempty"`}//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// MysqlBackup is the Schema for the mysqlbackups API
typeMysqlBackupstruct{metav1.TypeMeta`json:",inline"`metav1.ObjectMeta`json:"metadata,omitempty"`SpecMysqlBackupSpec`json:"spec,omitempty"`StatusMysqlBackupStatus`json:"status,omitempty"`}//+kubebuilder:object:root=true
// MysqlBackupList contains a list of MysqlBackup
typeMysqlBackupListstruct{metav1.TypeMeta`json:",inline"`metav1.ListMeta`json:"metadata,omitempty"`Items[]MysqlBackup`json:"items"`}
// find the active list of jobs
varactiveJobs[]*kbatch.JobvarsuccessfulJobs[]*kbatch.JobvarfailedJobs[]*kbatch.JobvarmostRecentTime*time.Time// find the last run so we can update the status
isJobFinished:=func(job*kbatch.Job)(bool,kbatch.JobConditionType){for_,c:=rangejob.Status.Conditions{if(c.Type==kbatch.JobComplete||c.Type==kbatch.JobFailed)&&c.Status==corev1.ConditionTrue{returntrue,c.Type}}returnfalse,""}getScheduledTimeForJob:=func(job*kbatch.Job)(*time.Time,error){timeRaw:=job.Annotations[scheduledTimeAnnotation]iflen(timeRaw)==0{returnnil,nil}timeParsed,err:=time.Parse(time.RFC3339,timeRaw)iferr!=nil{returnnil,err}return&timeParsed,nil}fori,job:=rangechildJobs.Items{_,finishedType:=isJobFinished(&job)switchfinishedType{case"":// ongoing
activeJobs=append(activeJobs,&childJobs.Items[i])casekbatch.JobFailed:failedJobs=append(failedJobs,&childJobs.Items[i])casekbatch.JobComplete:successfulJobs=append(successfulJobs,&childJobs.Items[i])}// We'll store the launch time in an annotation, so we'll reconstitute that from
// the active jobs themselves.
scheduledTimeForJob,err:=getScheduledTimeForJob(&job)iferr!=nil{log.Error(err,"unable to parse schedule time for child job","job",&job)continue}ifscheduledTimeForJob!=nil{ifmostRecentTime==nil{mostRecentTime=scheduledTimeForJob}elseifmostRecentTime.Before(*scheduledTimeForJob){mostRecentTime=scheduledTimeForJob}}}ifmostRecentTime!=nil{mysqlBackup.Status.LastScheduleTime=&metav1.Time{Time:*mostRecentTime}}else{mysqlBackup.Status.LastScheduleTime=nil}mysqlBackup.Status.Active=nilfor_,activeJob:=rangeactiveJobs{jobRef,err:=ref.GetReference(r.Scheme,activeJob)iferr!=nil{log.Error(err,"unable to make reference to active job","job",activeJob)continue}mysqlBackup.Status.Active=append(mysqlBackup.Status.Active,*jobRef)}log.V(1).Info("job count","active jobs",len(activeJobs),"successful jobs",len(successfulJobs),"failed jobs",len(failedJobs))iferr:=r.Status().Update(ctx,&mysqlBackup);err!=nil{log.Error(err,"unable to update MysqlBackup status")returnctrl.Result{},err}// Clean up old jobs according to the history limit
// NB: deleting these are "best effort" -- if we fail on a particular one,
// we won't requeue just to finish the deleting.
ifmysqlBackup.Spec.FailedJobsHistoryLimit!=nil{sort.Slice(failedJobs,func(i,jint)bool{iffailedJobs[i].Status.StartTime==nil{returnfailedJobs[j].Status.StartTime!=nil}returnfailedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)})fori,job:=rangefailedJobs{ifint32(i)>=int32(len(failedJobs))-*mysqlBackup.Spec.FailedJobsHistoryLimit{break}iferr:=r.Delete(ctx,job,client.PropagationPolicy(metav1.DeletePropagationBackground));client.IgnoreNotFound(err)!=nil{log.Error(err,"unable to delete old failed job","job",job)}else{log.V(0).Info("deleted old failed job","job",job)}}}ifmysqlBackup.Spec.SuccessfulJobsHistoryLimit!=nil{sort.Slice(successfulJobs,func(i,jint)bool{ifsuccessfulJobs[i].Status.StartTime==nil{returnsuccessfulJobs[j].Status.StartTime!=nil}returnsuccessfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)})fori,job:=rangesuccessfulJobs{ifint32(i)>=int32(len(successfulJobs))-*mysqlBackup.Spec.SuccessfulJobsHistoryLimit{break}iferr:=r.Delete(ctx,job,client.PropagationPolicy(metav1.DeletePropagationBackground));(err)!=nil{log.Error(err,"unable to delete old successful job","job",job)}else{log.V(0).Info("deleted old successful job","job",job)}}}
newContainers:=func(app*batchv1.MysqlBackup)[]corev1.Container{return[]corev1.Container{{Name:app.Name,Env:app.Spec.BackupEnvs,Image:app.Spec.Image,Args:app.Spec.Args,ImagePullPolicy:corev1.PullPolicy(app.Spec.ImagePullPolicy),VolumeMounts:app.Spec.DeepCopy().VolumeMounts,},}}newVolumes:=func(app*batchv1.MysqlBackup)[]corev1.Volume{returnapp.Spec.Volumes}newRestartPolicy:=func(app*batchv1.MysqlBackup)corev1.RestartPolicy{returnapp.Spec.RestartPolicy}newNodeSelector:=func(app*batchv1.MysqlBackup)map[string]string{returnapp.Spec.NodeSelector}constructJobForMysqlBackup:=func(mysqlBackup*batchv1.MysqlBackup,scheduledTimetime.Time)(*kbatch.Job,error){// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name:=fmt.Sprintf("%s-%d",mysqlBackup.Name,scheduledTime.Unix())job:=&kbatch.Job{ObjectMeta:metav1.ObjectMeta{Labels:make(map[string]string),Annotations:make(map[string]string),Name:name,Namespace:mysqlBackup.Namespace,},// Spec: *mysqlBackup.Spec.JobTemplate.Spec.DeepCopy(),
}// build conntainer
job.Spec.Template.Spec.Containers=newContainers(mysqlBackup)// build volumes
job.Spec.Template.Spec.Volumes=newVolumes(mysqlBackup)// build restartPolicy
job.Spec.Template.Spec.RestartPolicy=newRestartPolicy(mysqlBackup)// build nodeSelector
job.Spec.Template.Spec.NodeSelector=newNodeSelector(mysqlBackup)job.Annotations[scheduledTimeAnnotation]=scheduledTime.Format(time.RFC3339)iferr:=ctrl.SetControllerReference(mysqlBackup,job,r.Scheme);err!=nil{returnnil,err}returnjob,nil}// actually make the job...
job,err:=constructJobForMysqlBackup(&mysqlBackup,missedRun)iferr!=nil{log.Error(err,"unable to construct job from template")// don't bother requeuing until we get a change to the spec
returnscheduledResult,nil}// ...and create it on the cluster
iferr:=r.Create(ctx,job);err!=nil{log.Error(err,"unable to create Job for MysqlBackup","job",job)returnctrl.Result{},err}log.V(1).Info("created Job for MysqlBackup run","job",job)returnscheduledResult,nil
// Default implements webhook.Defaulter so a webhook will be registered for the type
func(r*MysqlBackup)Default(){mysqlbackuplog.Info("default","name",r.Name)ifr.Spec.ConcurrencyPolicy==""{r.Spec.ConcurrencyPolicy=AllowConcurrent}ifr.Spec.Suspend==nil{r.Spec.Suspend=new(bool)}ifr.Spec.SuccessfulJobsHistoryLimit==nil{r.Spec.SuccessfulJobsHistoryLimit=new(int32)*r.Spec.SuccessfulJobsHistoryLimit=3}ifr.Spec.FailedJobsHistoryLimit==nil{r.Spec.FailedJobsHistoryLimit=new(int32)*r.Spec.FailedJobsHistoryLimit=1}}// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-batch-isekiro-com-v1-mysqlbackup,mutating=false,failurePolicy=fail,sideEffects=None,groups=batch.isekiro.com,resources=mysqlbackups,verbs=create;update,versions=v1,name=vmysqlbackup.kb.io,admissionReviewVersions=v1
var_webhook.Validator=&MysqlBackup{}// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func(r*MysqlBackup)ValidateCreate()error{mysqlbackuplog.Info("validate create","name",r.Name)returnr.validateMysqlBackup()}// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func(r*MysqlBackup)ValidateUpdate(oldruntime.Object)error{mysqlbackuplog.Info("validate update","name",r.Name)returnr.validateMysqlBackup()}// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func(r*MysqlBackup)ValidateDelete()error{mysqlbackuplog.Info("validate delete","name",r.Name)// TODO(user): fill in your validation logic upon object deletion.
returnnil}func(r*MysqlBackup)validateMysqlBackup()error{varallErrsfield.ErrorListiferr:=r.validateMysqlBackupName();err!=nil{allErrs=append(allErrs,err)}iferr:=r.validateMysqlBackupSpec();err!=nil{allErrs=append(allErrs,err)}iflen(allErrs)==0{returnnil}returnapierrors.NewInvalid(schema.GroupKind{Group:"batch.tutorial.kubebuilder.io",Kind:"MysqlBackup"},r.Name,allErrs)}func(r*MysqlBackup)validateMysqlBackupSpec()*field.Error{// kubernetes API machinery 的字段助手会帮助我们很好地返回结构化的验证错误。
returnvalidateScheduleFormat(r.Spec.Schedule,field.NewPath("spec").Child("schedule"))}funcvalidateScheduleFormat(schedulestring,fldPath*field.Path)*field.Error{if_,err:=cron.ParseStandard(schedule);err!=nil{returnfield.Invalid(fldPath,schedule,err.Error())}returnnil}func(r*MysqlBackup)validateMysqlBackupName()*field.Error{iflen(r.ObjectMeta.Name)>validationutils.DNS1035LabelMaxLength-11{// The job name length is 63 character like all Kubernetes objects
// (which must fit in a DNS subdomain). The cronjob controller appends
// a 11-character suffix to the cronjob (`-$TIMESTAMP`) when creating
// a job. The job name length limit is 63 characters. Therefore cronjob
// names must have length <= 63-11=52. If we don't validate this here,
// then job creation will fail later.
returnfield.Invalid(field.NewPath("metadata").Child("name"),r.Name,"must be no more than 52 characters")}returnnil}
准入控制分 2 种,一个是变种准入控制,一种是校验准入控制。
变种准入控制:这里我们只对并发策略、失败 Job 个数和成功 Job 个数做了一个默认配置的操作。
校验准入控制:我们只校验名称长度不超过 52 个字符和对 Schedule 调度字符串的校验。
创建完 webhook 如需要在本地 make run ,需要在 /tmp/k8s-webhook-server/serving-certs 目录下有 {tls.crt , tls.key} 文件。
# 打开 bases 以下选项bases:...- ../webhook- ../certmanagerpatchesStrategicMerge:...- manager_webhook_patch.yaml- webhookcainjection_patch.yaml# vars 全取消注释vars:# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER' prefix.- name:CERTIFICATE_NAMESPACE# namespace of the certificate CRobjref:kind:Certificategroup:cert-manager.ioversion:v1name:serving-cert# this name should match the one in certificate.yamlfieldref:fieldpath:metadata.namespace- name:CERTIFICATE_NAMEobjref:kind:Certificategroup:cert-manager.ioversion:v1name:serving-cert# this name should match the one in certificate.yaml- name:SERVICE_NAMESPACE# namespace of the serviceobjref:kind:Serviceversion:v1name:webhook-servicefieldref:fieldpath:metadata.namespace- name:SERVICE_NAMEobjref:kind:Serviceversion:v1name:webhook-service