K8S 工具类

pre-load

通过双检锁来避免重复 load K8S config,在每个 K8S 指令前都会执行一次。

class K8S:  # pragma: no cover
    load_lock = Lock()
    kube_config_loaded = False
    host_info: Optional[dict] = None
 
    @staticmethod
    def pre_load():
        if not K8S.kube_config_loaded:
            with K8S.load_lock:
                if not K8S.kube_config_loaded:
                    config.load_kube_config(KUBE_CONFIG_PATH)
                    K8S.kube_config_loaded = True

list_all_pods

暂时没再使用了。

@FlaskCache.cached(timeout=10)
def list_all_pods() -> list[V1Pod]:
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        return v1.list_namespaced_pod(K8S_NAMESPACE).items
    except Exception:
        LOG.exception("list_all_pods exception")
        return []

list_pods

使用场景:

  • K8S 后台任务,获得当前正在运行的 pods,进而对 job 对象的状态进行更新。
  • get-job-log API,通过 label 筛选对应的 pod,进而获得对应日志
  • get-sut-info API,获取任务拉起的服务的状态
def list_pods(filter_labels: list[str] = []) -> list[V1Pod]:
	"""
	filter_labels:List[str], eq. "contest.4pd.io/leaderboard-resource-type=sut"
	"""
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        return v1.list_namespaced_pod(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
    except Exception:
        LOG.exception(filter_labels)
        return []

list_pods_metrics

使用场景:get_job_metric_info API,获取任务拉起的服务的资源信息

25 年 1 月添加的接口,不清楚用于什么地方。

@staticmethod
def list_pods_metrics(filter_labels: list[str] = []) -> list[PodMetric]:
    K8S.pre_load()
    v1 = client.CustomObjectsApi()
    try:
        items = v1.list_namespaced_custom_object(
            "metrics.k8s.io", "v1beta1", K8S_NAMESPACE, "pods", label_selector=",".join(filter_labels)
        ).get("items", [])
        return list(map(lambda x: PodMetric.model_validate(x), items))
    except Exception:
        LOG.exception(filter_labels)
        return []

list_deployments

使用场景:K8S 后台任务,对于 14 天仍在运行的 deployments 进行清理。

def list_deployments(filter_labels: list[str] = []) -> list[V1Deployment]:
    K8S.pre_load()
    v1 = client.AppsV1Api()
    try:
        return v1.list_namespaced_deployment(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
    except Exception:
        LOG.exception(filter_labels)
        return []

list_jobs

使用场景:K8S 后台任务,对于 14 天仍在运行的 jobs 进行清理。

def list_jobs(filter_labels: list[str] = []) -> list[V1Job]:
	K8S.pre_load()
	v1 = client.BatchV1Api()
	try:
		return v1.list_namespaced_job(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
	except Exception:
		LOG.exception(filter_labels)
		return []

list_services

使用场景:K8S 后台任务,对于 14 天仍在运行的 services 进行清理。

def list_services(filter_labels: list[str] = []) -> list[V1Service]:
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        return v1.list_namespaced_service(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
    except Exception:
        LOG.exception(filter_labels)
        return []

delete_pod

使用场景:无。

def delete_pod(name: str, force: bool = False):
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        return v1.delete_namespaced_pod(name, K8S_NAMESPACE, grace_period_seconds=0 if force else None)
    except Exception:
        LOG.exception(name)
        return None

delete_deployment

使用场景:K8S 后台任务,对于 14 天仍在运行的 deployments 进行清理。

def delete_deployment(name: str, force: bool = False):
    K8S.pre_load()
    v1 = client.AppsV1Api()
    try:
        return v1.delete_namespaced_deployment(name, K8S_NAMESPACE, grace_period_seconds=0 if force else None)
    except Exception:
        LOG.exception(name)
        return None

delete_job

使用场景:K8S 后台任务,对于 14 天仍在运行的 job 进行清理。

def delete_job(name: str, force: bool = False):
    K8S.pre_load()
    v1 = client.BatchV1Api()
    try:
        return v1.delete_namespaced_job(
            name,
            K8S_NAMESPACE,
            grace_period_seconds=0 if force else None,
            propagation_policy="Background",
        )
    except Exception:
        LOG.exception(name)
        return None

delete_service

使用场景:K8S 后台任务,对于 14 天仍在运行的 service 进行清理。

def delete_service(name: str, force: bool = False):
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        return v1.delete_namespaced_service(name, K8S_NAMESPACE, grace_period_seconds=0 if force else None)
    except Exception:
        LOG.exception(name)
        return None

get_pod_log

使用场景:submit 运行后的打包 + 临时日志查看 API。

此处配置了 tail_lines 和 previous,来保障日志内容不过分多、重启前日志保留。

def get_pod_log(name: str, tail_lines: Optional[int] = None) -> str:
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        pod: V1Pod = v1.read_namespaced_pod(name, K8S_NAMESPACE)
        containers = pod.spec.containers
        if len(containers) <= 1:
            previous_log = ""
            try:
                previous_log = "*" * 10 + "以下为最后一次重启前的日志" + "*" * 10 + "\n\n"
                previous_log += (
                    v1.read_namespaced_pod_log(name, K8S_NAMESPACE, tail_lines=tail_lines, previous=True) + "\n"
                )
            except Exception:
                previous_log = ""
            return previous_log + v1.read_namespaced_pod_log(name, K8S_NAMESPACE, tail_lines=tail_lines)
        else:  # containers数量大于1 拼接日志
            log = ""
            sep = "-" * 20
            for container in containers:
                cname = container.name
                log += "{0}container.name={1}{0}\n\n".format(sep, cname)
                try:
                    previous_log = ""
                    try:
                        previous_log = (
                            "*" * 10 + f"以下为container.name={cname}最后一次重启前的日志" + "*" * 10 + "\n\n"
                        )
                        previous_log += (
                            v1.read_namespaced_pod_log(
                                name,
                                K8S_NAMESPACE,
                                container=cname,
                                tail_lines=tail_lines,
                                previous=True,
                            )
                            + "\n"
                        )
                    except Exception:
                        previous_log = ""
                    current_log = previous_log + v1.read_namespaced_pod_log(
                        name,
                        K8S_NAMESPACE,
                        container=cname,
                        tail_lines=tail_lines,
                    )
                except Exception as e:
                    LOG.exception("name=%s, container_name=%s", name, cname)
                    current_log = "获取container_name=%s日志失败, 原因如下:\n%s" % (
                        cname,
                        e,
                    )
 
                log += current_log + "\n"
            return log
    except Exception as e:
        LOG.exception("name=%s, tail_lines=%s", name, tail_lines)
        return "获取日志失败, 原因如下:\n%s" % e

get_pod_event

使用场景:打包日志的时候,顺带打包 events。但是由于当前没有持久化存储,所以 event 只有 1h。

从 Kubernetes 1.19 版本开始,引入了 Event TTL 机制,用于自动清理旧的事件。默认配置下:

  • Normal 事件Normal 类型的事件默认保存时间是 1 小时。Normal 事件通常表示系统中的正常操作,例如 Pod 成功调度、容器正常启动等。
  • Warning 事件Warning 类型的事件默认保存时间是 24 小时。Warning 事件一般表示系统中出现了一些可能需要关注的问题,如 Pod 调度失败、容器崩溃等。
def get_pod_event(podname: str) -> list[CoreV1Event]:
    K8S.pre_load()
    v1 = client.CoreV1Api()
    try:
        return v1.list_namespaced_event(
            K8S_NAMESPACE,
            field_selector="involvedObject.name={}".format(podname),
        ).items
    except Exception:
        LOG.exception(podname)
        return []

get_host

使用场景:获取 judgeflow 的端口信息,还是之前用于 copilot 的,现在无使用场景。

def get_host() -> dict:
    """获取pod的地址信息"""
    if K8S.host_info is None:
        K8S.host_info = load_yaml(KUBE_CONFIG_PATH)["clusters"][0]["cluster"]
    return K8S.host_info
def get_job_service(job_id: int) -> Optional[HostPorts]:
    """获取job的端口 若不存在返回空dict"""
    svcs = K8S.list_services([FILTER_LABEL_JUDGE_FLOW, f"{HELM_LABEL_JOB_ID}={job_id}"])
    if len(svcs) == 0:
        return None
    if len(svcs) != 1:
        LOG.error("job_id=%d的svc数量为%d", job_id, len(svcs))
    svc = svcs[-1]
    ports = {str(port.port): str(port.node_port) for port in svc.spec.ports}
 
    host_info = K8S.get_host()
    host = host_info["server"]
    host = host[: host.rfind(":")]
    return HostPorts(host=host, ports=ports)
    
    
@blue.get(
    "/get_job_service_info/<int:job_id>",
    summary="获取任务裁判流的端口ip信息",
    security=[{"bearerAuth": []}],
)
def get_job_service_info(path: JobId):
    info = get_job_service(path.job_id)
    if info is None:
        return Resp(False, msg="找不到job_id的svc")
    return Resp(True, data=info)