Helm 工具类

前提补充

  1. helm install 和 helm uninstall 可能会访问 K8S 网络波动问题,并且因为更为重要,所以需要重试来避免失败。
  2. helm install 有可能成功但是没有发觉,从而触发 cannot re-use a name that is still in use 因此需要更高 backoff 内容。
def helm_predicate(*ret) -> bool:  # pragma: no cover
    SPEC_MSG = "Error: INSTALLATION FAILED: cannot re-use a name that is still in use"
    return not ret[0] and not re.match(f".*{SPEC_MSG}.*", ret[1])
 
 
@staticmethod
@backoff.on_predicate(backoff.constant, predicate=helm_predicate, max_tries=3, interval=1, logger=LOG)
def helm_install(
    name: str, chart_name: str, chart_version: Optional[str] = None, value_path: Optional[str] = None
        , k8s_namespace: Optional[str] = K8S_NAMESPACE, kube_config_path: Optional[str] = KUBE_CONFIG_PATH
) -> CmdBase:
	# ...

helm install

helm install release-name chart-name, 根据需要增加 namespace/kubeconfig/value/version

with Helm.mutex:
    cmd = ["helm", "install", name, chart_name, "--namespace", k8s_namespace, "--kubeconfig", kube_config_path]
    if chart_version is not None:
        cmd.extend(["--version", chart_version])
    if value_path is not None:
        cmd.extend(["-f", value_path])
    proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return proc.returncode == 0, proc.stderr.decode()

helm template

helm templatehelm install --dry-run 的效果差不多,都是用来对要部署的 chart 进行预演。不同的是,前者不需要连接 K8S 而后者需要。同时,helm template 会输出 yaml 格式的资源清单,方便后续处理。

helm template test chart-name, 根据需要增加 value/version。

读取输出内容的表达式为:list(yaml.full_load_all(proc.stdout.decode()))

如下所示,输出的 yaml 被转换为 python 对象。

apiVersion: v1
kind: Pod
metadata:
  name: my-pod
---
apiVersion: v1
kind: Service
metadata:
  name: my-service
[
    {
        'apiVersion': 'v1',
        'kind': 'Pod',
        'metadata': {'name': 'my-pod'}
    },
    {
        'apiVersion': 'v1',
        'kind': 'Service',
        'metadata': {'name': 'my-service'}
    }
]

helm gpu check

一个脚本工具,能够检查 helm chart 中 gpu 使用。略。

helm uninstall

helm uninstall release-name, 根据需要增加 namespace/kubeconfig

helm list

helm list 列出指定命名空间下的 helm 版本,选择第一列,即 helm name。

@staticmethod
def list_helms(name_pattern: Optional[str] = HELM_PATTERN) -> list[str]:
	with Helm.mutex:
	    helms = (
	        subprocess.run(
	            f"helm list --max 20000 --namespace {K8S_NAMESPACE} "
	            + f"--kubeconfig {KUBE_CONFIG_PATH} | awk '{{print $1}}'",
	            stdout=subprocess.PIPE,
	            stderr=subprocess.DEVNULL,
	            shell=True,
	        )
	        .stdout.decode()
	        .split(os.linesep)[1:-1]
	    )
	    if name_pattern is not None:
	        helms = [helm for helm in helms if re.match(name_pattern, helm)]
	    return helms

helm repo add

helm repo list | awk '{print $1, $2}' 列出当前的 helm 仓库,存在则返回 repo name

helm repo add repo-name repo-url 添加 helm 仓库

为了方便,在处理的时候,repo-name 设置为基于 url 进行替换,从而也方便后续互相转换。

@staticmethod
def helm_add_repo(url: str) -> CmdFull:
    """helm add repo, repo已存在则直接返回"""
    with Helm.mutex:
        cmd = ["helm repo list | awk '{print $1, $2}'"]
        proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        if proc.returncode == 0:
            for repo in proc.stdout.decode().splitlines()[1:]:
                repo_name, repo_url = repo.split(" ", 1)
                if repo_url == url:
                    return True, repo_name, ""
 
        name = url.replace("://", "-").replace("/", "-")
        proc = subprocess.run(["helm", "repo", "add", name, url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return proc.returncode == 0, name, proc.stderr.decode()

helm repo update

helm repo update,更新 Helm 客户端本地的仓库索引文件,这些索引文件包含了各个 Helm 仓库中可用的 Chart 列表及其元数据。当 Helm 仓库中的 Chart 有更新时,通过执行该命令可以让本地的 Helm 客户端获取到最新的 Chart 信息。

helm show values

helm show values chart_name,可以选择增加 version。功能是,展示指定 chart 的 values.yaml。

helm pull

执行 helm pull 拉取 chart 之前,需要先进行 helm repo update

helm pull chart-name --destination,根据需要增加 version。

需要注意的是,应该先将 chart 保存到临时文件夹,以避免出现故障导致拉取多个文件。

Helm.helm_repo_update()
with Helm.mutex:
    with tempfile.TemporaryDirectory() as temp_dir:
        cmd = ["helm", "pull", chart_name, "--destination", temp_dir]
        if chart_version is not None:
            cmd.extend(["--version", chart_version])
        proc = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
 
        if len(os.listdir(temp_dir)) != 1:
            return False
        copyfile(os.path.join(temp_dir, os.listdir(temp_dir)[0]), destination)
    return proc.returncode == 0

helm show chart

helm show chart chart-name,或者再增加 version,判断 helm chart 是否存在。

Helm services 服务

chart2info

将 chart 链接转换为 chart-name, chart-version, chart-values.

使用位置:拉起 chart 的时候需要提取上面信息,以及导出 benchmark、打包 submit 的时候需要用到。

  • helm_add_repo 校验 helm repo 是否存在
  • helm_repo_update 更新 helm repo
  • 解析 chart-name, chart-version
  • validate_helm 判断 repo 中是否存在 chart
  • get_helm_value 获得 repo 中 values
if "/" not in chart:
    raise exc.ArgumentError("chart地址格式错误")
ok, chart_repo, msg = Helm.helm_add_repo(chart[: chart.rindex("/")])
if not ok:
    raise exc.ArgumentError("解析repo失败: %s" % msg)
ok, _, msg = Helm.helm_repo_update()
if not ok:
    raise exc.ArgumentError("更新repo失败: %s" % msg)
 
repo_chart = chart_repo + chart[chart.rindex("/") :]
if ":" in repo_chart:
    chart_name, chart_version = repo_chart.rsplit(":", 1)
else:
    chart_name, chart_version = repo_chart, None
 
if not Helm.validate_helm(chart_name, chart_version):
    raise exc.ArgumentError("chart地址不存在")
 
chart_values = get_helm_value(chart_name, chart_version)
return chart_name, chart_version, chart_values

get_helm_value

使用位置:拉取 chart 的时候,获得 chart 内部的 value 信息,补充的 value 信息进行 merge 操作。

一般是直接在 chart2info 中调用,也存在 load-sut 直接提供 chart 文件时直接进行解析。

sut_name_prune

使用位置:拉起 sut 的 release 时,提供的 release name 不满足 helm 要求。

def sut_name_prune(resource_name: str) -> str:
    """将sut名字改为helm可用的名字"""
    resource_name = resource_name.lower()
    resource_name = re.sub("_", "-", resource_name)
    resource_name = re.sub("[^-a-z0-9]", "", resource_name)
    return resource_name

Helm 命名规范:

  • 字符限制:名称只能包含小写字母、数字、连字符(-),不能以连字符开头或结尾。例如,my-chart 是有效的,而 -mychart 或 my-chart- 是无效的。
  • 长度限制:没有严格的固定长度限制,但应尽量保持名称简洁明了,便于识别和使用。一般来说,不建议名称过长,以免在命令行操作或配置文件中造成不便。
  • 唯一性:在一个 Helm 仓库中,每个 chart 的名称必须是唯一的。这有助于确保在安装、升级和管理 charts 时能够准确地引用和区分不同的应用程序或服务。

除了 sut_name_prune 外,实际 helm-name 会再增加 job-id,来保证唯一性。

SUT_HELM_NAME = "job-{}-sut-{}"
helm_name = SUT_HELM_NAME.format(job_id, resource_name)

chart2images

使用场景:获取所有的 images,在 load-sut 或 benchmark-export 时,对 chart 中 image 进行 retag。

def chart2images(chart_name: str, chart_version: Optional[str], value_path: Optional[str]) -> set[str]:
    ok, workloads, msg = Helm.helm_template(chart_name, chart_version, value_path)
    if not ok:
        LOG.error(msg)
        return set()
 
    def pod_spec2images(pod_spec) -> list[str]:
        return [container["image"] for container in pod_spec["containers"]]
 
    def pod_template_spec2images(pod_template_spec) -> list[str]:
        return pod_spec2images(pod_template_spec["template"]["spec"])
 
    def object2image(obj) -> str:
        return obj["image"]
 
    def dagtask2image(dagtask) -> list[str]:
        if "inline" in dagtask:
            return list(chain(*[template2images(template) for template in dagtask["inline"]]))
        return []
 
    def template2images(template) -> list[str]:
        images = []
        if "container" in template:
            images.append(object2image(template["container"]))
        if "script" in template:
            images.append(object2image(template["script"]))
        if "initContainers" in template:
            images.extend(object2image(obj) for obj in template["initContainers"])
        if "sidecars" in template:
            images.extend(object2image(obj) for obj in template["sidecars"])
        if "containerSet" in template:
            containerSet = template["containerSet"]
            if "containers" in containerSet:
                images.extend(object2image(obj) for obj in containerSet["containers"])
        if "dag" in template:
            dag = template["dag"]
            if "tasks" in dag:
                for task in dag["tasks"]:
                    images.extend(dagtask2image(task))
        if "steps" in template:
            for step in template["steps"]:
                for workflowstep in step:
                    images.extend(workflowstep2images(workflowstep))
        return images
 
    def workflowstep2images(workflowstep) -> list[str]:
        if "inline" in workflowstep:
            return list(chain(*[template2images(template) for template in workflowstep["inline"]]))
        return []
 
    def workflowspec2images(workflowspec) -> list[str]:
        images = []
        if "templates" in workflowspec:
            for template in workflowspec["templates"]:
                images.extend(template2images(template))
        if "templateDefaults" in workflowspec:
            for template in workflowspec["templates"]:
                images.extend(template2images(template))
        return images
 
    def workflowstatus2images(workflowstatus) -> list[str]:
        images = []
        if "storedTemplates" in workflowstatus:
            for template in workflowstatus["storedTemplates"]:
                images.extend(template2images(template))
        if "storedWorkflowTemplateSpec" in workflowstatus:
            images.extend(workflowspec2images(workflowstatus["storedWorkflowTemplateSpec"]))
        return images
 
    def workflow2images(workflow) -> list[str]:
        images = []
        if "status" in workflow:
            images.extend(workflowstatus2images(workflow["status"]))
        if "spec" in workflow:
            images.extend(workflowspec2images(workflow["spec"]))
        return images
 
    def workflowtemplate2images(workflowtemplate) -> list[str]:
        if "spec" in workflowtemplate:
            return workflowspec2images(workflowtemplate["spec"])
        return []
 
    def cronworkflowspec2images(cronworkflowspec) -> list[str]:
        if "workflowSpec" in cronworkflowspec:
            return workflowspec2images(cronworkflowspec["workflowSpec"])
        return []
 
    def cronworkflow2images(cronworkflow) -> list[str]:
        if "spec" in cronworkflow:
            return cronworkflowspec2images(cronworkflow["spec"])
        return []
 
    images = []
 
    for workload in workloads:
        if "kind" not in workload:
            continue
        kind = workload["kind"]
        if kind == "Pod":
            images.extend(pod_spec2images(workload["spec"]))
        elif kind == "PodTemplate":
            images.extend(pod_template_spec2images(workload))
        elif kind in (
            "ReplicationController",
            "ReplicaSet",
            "Deployment",
            "StatefulSet",
            "ControllerRevision",
            "DaemonSet",
            "Job",
            "CronJob",
        ):
            images.extend(pod_template_spec2images(workload["spec"]))
        elif kind == "Workflow":
            images.extend(workflow2images(workload))
        elif kind == "WorkflowTemplate":
            images.extend(workflowtemplate2images(workload))
        elif kind == "CronWorkflow":
            images.extend(cronworkflow2images(workload))
 
    return set(images) - set(["busybox"])