Helm 工具类
前提补充
- helm install 和 helm uninstall 可能会访问 K8S 网络波动问题,并且因为更为重要,所以需要重试来避免失败。
- helm install 有可能成功但是没有发觉,从而触发
cannot re-use a name that is still in use因此需要更高 backoff 内容。
def helm_predicate(*ret) -> bool: # pragma: no cover
SPEC_MSG = "Error: INSTALLATION FAILED: cannot re-use a name that is still in use"
return not ret[0] and not re.match(f".*{SPEC_MSG}.*", ret[1])
@staticmethod
@backoff.on_predicate(backoff.constant, predicate=helm_predicate, max_tries=3, interval=1, logger=LOG)
def helm_install(
name: str, chart_name: str, chart_version: Optional[str] = None, value_path: Optional[str] = None
, k8s_namespace: Optional[str] = K8S_NAMESPACE, kube_config_path: Optional[str] = KUBE_CONFIG_PATH
) -> CmdBase:
# ...helm install
helm install release-name chart-name, 根据需要增加 namespace/kubeconfig/value/version
with Helm.mutex:
cmd = ["helm", "install", name, chart_name, "--namespace", k8s_namespace, "--kubeconfig", kube_config_path]
if chart_version is not None:
cmd.extend(["--version", chart_version])
if value_path is not None:
cmd.extend(["-f", value_path])
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return proc.returncode == 0, proc.stderr.decode()helm template
helm template 和 helm install --dry-run 的效果差不多,都是用来对要部署的 chart 进行预演。不同的是,前者不需要连接 K8S 而后者需要。同时,helm template 会输出 yaml 格式的资源清单,方便后续处理。
helm template test chart-name, 根据需要增加 value/version。
读取输出内容的表达式为:list(yaml.full_load_all(proc.stdout.decode()))。
如下所示,输出的 yaml 被转换为 python 对象。
apiVersion: v1
kind: Pod
metadata:
name: my-pod
---
apiVersion: v1
kind: Service
metadata:
name: my-service[
{
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'my-pod'}
},
{
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {'name': 'my-service'}
}
]helm gpu check
一个脚本工具,能够检查 helm chart 中 gpu 使用。略。
helm uninstall
helm uninstall release-name, 根据需要增加 namespace/kubeconfig
helm list
helm list 列出指定命名空间下的 helm 版本,选择第一列,即 helm name。
@staticmethod
def list_helms(name_pattern: Optional[str] = HELM_PATTERN) -> list[str]:
with Helm.mutex:
helms = (
subprocess.run(
f"helm list --max 20000 --namespace {K8S_NAMESPACE} "
+ f"--kubeconfig {KUBE_CONFIG_PATH} | awk '{{print $1}}'",
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
shell=True,
)
.stdout.decode()
.split(os.linesep)[1:-1]
)
if name_pattern is not None:
helms = [helm for helm in helms if re.match(name_pattern, helm)]
return helmshelm repo add
helm repo list | awk '{print $1, $2}' 列出当前的 helm 仓库,存在则返回 repo name
helm repo add repo-name repo-url 添加 helm 仓库
为了方便,在处理的时候,repo-name 设置为基于 url 进行替换,从而也方便后续互相转换。
@staticmethod
def helm_add_repo(url: str) -> CmdFull:
"""helm add repo, repo已存在则直接返回"""
with Helm.mutex:
cmd = ["helm repo list | awk '{print $1, $2}'"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if proc.returncode == 0:
for repo in proc.stdout.decode().splitlines()[1:]:
repo_name, repo_url = repo.split(" ", 1)
if repo_url == url:
return True, repo_name, ""
name = url.replace("://", "-").replace("/", "-")
proc = subprocess.run(["helm", "repo", "add", name, url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return proc.returncode == 0, name, proc.stderr.decode()helm repo update
helm repo update,更新 Helm 客户端本地的仓库索引文件,这些索引文件包含了各个 Helm 仓库中可用的 Chart 列表及其元数据。当 Helm 仓库中的 Chart 有更新时,通过执行该命令可以让本地的 Helm 客户端获取到最新的 Chart 信息。
helm show values
helm show values chart_name,可以选择增加 version。功能是,展示指定 chart 的 values.yaml。
helm pull
执行 helm pull 拉取 chart 之前,需要先进行 helm repo update。
helm pull chart-name --destination,根据需要增加 version。
需要注意的是,应该先将 chart 保存到临时文件夹,以避免出现故障导致拉取多个文件。
Helm.helm_repo_update()
with Helm.mutex:
with tempfile.TemporaryDirectory() as temp_dir:
cmd = ["helm", "pull", chart_name, "--destination", temp_dir]
if chart_version is not None:
cmd.extend(["--version", chart_version])
proc = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if len(os.listdir(temp_dir)) != 1:
return False
copyfile(os.path.join(temp_dir, os.listdir(temp_dir)[0]), destination)
return proc.returncode == 0helm show chart
helm show chart chart-name,或者再增加 version,判断 helm chart 是否存在。
Helm services 服务
chart2info
将 chart 链接转换为 chart-name, chart-version, chart-values.
使用位置:拉起 chart 的时候需要提取上面信息,以及导出 benchmark、打包 submit 的时候需要用到。
helm_add_repo校验 helm repo 是否存在helm_repo_update更新 helm repo- 解析 chart-name, chart-version
validate_helm判断 repo 中是否存在 chartget_helm_value获得 repo 中 values
if "/" not in chart:
raise exc.ArgumentError("chart地址格式错误")
ok, chart_repo, msg = Helm.helm_add_repo(chart[: chart.rindex("/")])
if not ok:
raise exc.ArgumentError("解析repo失败: %s" % msg)
ok, _, msg = Helm.helm_repo_update()
if not ok:
raise exc.ArgumentError("更新repo失败: %s" % msg)
repo_chart = chart_repo + chart[chart.rindex("/") :]
if ":" in repo_chart:
chart_name, chart_version = repo_chart.rsplit(":", 1)
else:
chart_name, chart_version = repo_chart, None
if not Helm.validate_helm(chart_name, chart_version):
raise exc.ArgumentError("chart地址不存在")
chart_values = get_helm_value(chart_name, chart_version)
return chart_name, chart_version, chart_valuesget_helm_value
使用位置:拉取 chart 的时候,获得 chart 内部的 value 信息,补充的 value 信息进行 merge 操作。
一般是直接在 chart2info 中调用,也存在 load-sut 直接提供 chart 文件时直接进行解析。
sut_name_prune
使用位置:拉起 sut 的 release 时,提供的 release name 不满足 helm 要求。
def sut_name_prune(resource_name: str) -> str:
"""将sut名字改为helm可用的名字"""
resource_name = resource_name.lower()
resource_name = re.sub("_", "-", resource_name)
resource_name = re.sub("[^-a-z0-9]", "", resource_name)
return resource_nameHelm 命名规范:
- 字符限制:名称只能包含小写字母、数字、连字符(
-),不能以连字符开头或结尾。例如,my-chart是有效的,而-mychart或my-chart-是无效的。 - 长度限制:没有严格的固定长度限制,但应尽量保持名称简洁明了,便于识别和使用。一般来说,不建议名称过长,以免在命令行操作或配置文件中造成不便。
- 唯一性:在一个 Helm 仓库中,每个 chart 的名称必须是唯一的。这有助于确保在安装、升级和管理 charts 时能够准确地引用和区分不同的应用程序或服务。
除了 sut_name_prune 外,实际 helm-name 会再增加 job-id,来保证唯一性。
SUT_HELM_NAME = "job-{}-sut-{}"
helm_name = SUT_HELM_NAME.format(job_id, resource_name)chart2images
使用场景:获取所有的 images,在 load-sut 或 benchmark-export 时,对 chart 中 image 进行 retag。
def chart2images(chart_name: str, chart_version: Optional[str], value_path: Optional[str]) -> set[str]:
ok, workloads, msg = Helm.helm_template(chart_name, chart_version, value_path)
if not ok:
LOG.error(msg)
return set()
def pod_spec2images(pod_spec) -> list[str]:
return [container["image"] for container in pod_spec["containers"]]
def pod_template_spec2images(pod_template_spec) -> list[str]:
return pod_spec2images(pod_template_spec["template"]["spec"])
def object2image(obj) -> str:
return obj["image"]
def dagtask2image(dagtask) -> list[str]:
if "inline" in dagtask:
return list(chain(*[template2images(template) for template in dagtask["inline"]]))
return []
def template2images(template) -> list[str]:
images = []
if "container" in template:
images.append(object2image(template["container"]))
if "script" in template:
images.append(object2image(template["script"]))
if "initContainers" in template:
images.extend(object2image(obj) for obj in template["initContainers"])
if "sidecars" in template:
images.extend(object2image(obj) for obj in template["sidecars"])
if "containerSet" in template:
containerSet = template["containerSet"]
if "containers" in containerSet:
images.extend(object2image(obj) for obj in containerSet["containers"])
if "dag" in template:
dag = template["dag"]
if "tasks" in dag:
for task in dag["tasks"]:
images.extend(dagtask2image(task))
if "steps" in template:
for step in template["steps"]:
for workflowstep in step:
images.extend(workflowstep2images(workflowstep))
return images
def workflowstep2images(workflowstep) -> list[str]:
if "inline" in workflowstep:
return list(chain(*[template2images(template) for template in workflowstep["inline"]]))
return []
def workflowspec2images(workflowspec) -> list[str]:
images = []
if "templates" in workflowspec:
for template in workflowspec["templates"]:
images.extend(template2images(template))
if "templateDefaults" in workflowspec:
for template in workflowspec["templates"]:
images.extend(template2images(template))
return images
def workflowstatus2images(workflowstatus) -> list[str]:
images = []
if "storedTemplates" in workflowstatus:
for template in workflowstatus["storedTemplates"]:
images.extend(template2images(template))
if "storedWorkflowTemplateSpec" in workflowstatus:
images.extend(workflowspec2images(workflowstatus["storedWorkflowTemplateSpec"]))
return images
def workflow2images(workflow) -> list[str]:
images = []
if "status" in workflow:
images.extend(workflowstatus2images(workflow["status"]))
if "spec" in workflow:
images.extend(workflowspec2images(workflow["spec"]))
return images
def workflowtemplate2images(workflowtemplate) -> list[str]:
if "spec" in workflowtemplate:
return workflowspec2images(workflowtemplate["spec"])
return []
def cronworkflowspec2images(cronworkflowspec) -> list[str]:
if "workflowSpec" in cronworkflowspec:
return workflowspec2images(cronworkflowspec["workflowSpec"])
return []
def cronworkflow2images(cronworkflow) -> list[str]:
if "spec" in cronworkflow:
return cronworkflowspec2images(cronworkflow["spec"])
return []
images = []
for workload in workloads:
if "kind" not in workload:
continue
kind = workload["kind"]
if kind == "Pod":
images.extend(pod_spec2images(workload["spec"]))
elif kind == "PodTemplate":
images.extend(pod_template_spec2images(workload))
elif kind in (
"ReplicationController",
"ReplicaSet",
"Deployment",
"StatefulSet",
"ControllerRevision",
"DaemonSet",
"Job",
"CronJob",
):
images.extend(pod_template_spec2images(workload["spec"]))
elif kind == "Workflow":
images.extend(workflow2images(workload))
elif kind == "WorkflowTemplate":
images.extend(workflowtemplate2images(workload))
elif kind == "CronWorkflow":
images.extend(cronworkflow2images(workload))
return set(images) - set(["busybox"])