K8S 工具类
pre-load
通过双检锁来避免重复 load K8S config,在每个 K8S 指令前都会执行一次。
class K8S: # pragma: no cover
load_lock = Lock()
kube_config_loaded = False
host_info: Optional[dict] = None
@staticmethod
def pre_load():
if not K8S.kube_config_loaded:
with K8S.load_lock:
if not K8S.kube_config_loaded:
config.load_kube_config(KUBE_CONFIG_PATH)
K8S.kube_config_loaded = Truelist_all_pods
暂时没再使用了。
@FlaskCache.cached(timeout=10)
def list_all_pods() -> list[V1Pod]:
K8S.pre_load()
v1 = client.CoreV1Api()
try:
return v1.list_namespaced_pod(K8S_NAMESPACE).items
except Exception:
LOG.exception("list_all_pods exception")
return []list_pods
使用场景:
- K8S 后台任务,获得当前正在运行的 pods,进而对 job 对象的状态进行更新。
- get-job-log API,通过 label 筛选对应的 pod,进而获得对应日志
- get-sut-info API,获取任务拉起的服务的状态
def list_pods(filter_labels: list[str] = []) -> list[V1Pod]:
"""
filter_labels:List[str], eq. "contest.4pd.io/leaderboard-resource-type=sut"
"""
K8S.pre_load()
v1 = client.CoreV1Api()
try:
return v1.list_namespaced_pod(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
except Exception:
LOG.exception(filter_labels)
return []list_pods_metrics
使用场景:get_job_metric_info API,获取任务拉起的服务的资源信息
25 年 1 月添加的接口,不清楚用于什么地方。
@staticmethod
def list_pods_metrics(filter_labels: list[str] = []) -> list[PodMetric]:
K8S.pre_load()
v1 = client.CustomObjectsApi()
try:
items = v1.list_namespaced_custom_object(
"metrics.k8s.io", "v1beta1", K8S_NAMESPACE, "pods", label_selector=",".join(filter_labels)
).get("items", [])
return list(map(lambda x: PodMetric.model_validate(x), items))
except Exception:
LOG.exception(filter_labels)
return []list_deployments
使用场景:K8S 后台任务,对于 14 天仍在运行的 deployments 进行清理。
def list_deployments(filter_labels: list[str] = []) -> list[V1Deployment]:
K8S.pre_load()
v1 = client.AppsV1Api()
try:
return v1.list_namespaced_deployment(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
except Exception:
LOG.exception(filter_labels)
return []list_jobs
使用场景:K8S 后台任务,对于 14 天仍在运行的 jobs 进行清理。
def list_jobs(filter_labels: list[str] = []) -> list[V1Job]:
K8S.pre_load()
v1 = client.BatchV1Api()
try:
return v1.list_namespaced_job(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
except Exception:
LOG.exception(filter_labels)
return []list_services
使用场景:K8S 后台任务,对于 14 天仍在运行的 services 进行清理。
def list_services(filter_labels: list[str] = []) -> list[V1Service]:
K8S.pre_load()
v1 = client.CoreV1Api()
try:
return v1.list_namespaced_service(K8S_NAMESPACE, label_selector=",".join(filter_labels)).items
except Exception:
LOG.exception(filter_labels)
return []delete_pod
使用场景:无。
def delete_pod(name: str, force: bool = False):
K8S.pre_load()
v1 = client.CoreV1Api()
try:
return v1.delete_namespaced_pod(name, K8S_NAMESPACE, grace_period_seconds=0 if force else None)
except Exception:
LOG.exception(name)
return Nonedelete_deployment
使用场景:K8S 后台任务,对于 14 天仍在运行的 deployments 进行清理。
def delete_deployment(name: str, force: bool = False):
K8S.pre_load()
v1 = client.AppsV1Api()
try:
return v1.delete_namespaced_deployment(name, K8S_NAMESPACE, grace_period_seconds=0 if force else None)
except Exception:
LOG.exception(name)
return Nonedelete_job
使用场景:K8S 后台任务,对于 14 天仍在运行的 job 进行清理。
def delete_job(name: str, force: bool = False):
K8S.pre_load()
v1 = client.BatchV1Api()
try:
return v1.delete_namespaced_job(
name,
K8S_NAMESPACE,
grace_period_seconds=0 if force else None,
propagation_policy="Background",
)
except Exception:
LOG.exception(name)
return Nonedelete_service
使用场景:K8S 后台任务,对于 14 天仍在运行的 service 进行清理。
def delete_service(name: str, force: bool = False):
K8S.pre_load()
v1 = client.CoreV1Api()
try:
return v1.delete_namespaced_service(name, K8S_NAMESPACE, grace_period_seconds=0 if force else None)
except Exception:
LOG.exception(name)
return Noneget_pod_log
使用场景:submit 运行后的打包 + 临时日志查看 API。
此处配置了 tail_lines 和 previous,来保障日志内容不过分多、重启前日志保留。
def get_pod_log(name: str, tail_lines: Optional[int] = None) -> str:
K8S.pre_load()
v1 = client.CoreV1Api()
try:
pod: V1Pod = v1.read_namespaced_pod(name, K8S_NAMESPACE)
containers = pod.spec.containers
if len(containers) <= 1:
previous_log = ""
try:
previous_log = "*" * 10 + "以下为最后一次重启前的日志" + "*" * 10 + "\n\n"
previous_log += (
v1.read_namespaced_pod_log(name, K8S_NAMESPACE, tail_lines=tail_lines, previous=True) + "\n"
)
except Exception:
previous_log = ""
return previous_log + v1.read_namespaced_pod_log(name, K8S_NAMESPACE, tail_lines=tail_lines)
else: # containers数量大于1 拼接日志
log = ""
sep = "-" * 20
for container in containers:
cname = container.name
log += "{0}container.name={1}{0}\n\n".format(sep, cname)
try:
previous_log = ""
try:
previous_log = (
"*" * 10 + f"以下为container.name={cname}最后一次重启前的日志" + "*" * 10 + "\n\n"
)
previous_log += (
v1.read_namespaced_pod_log(
name,
K8S_NAMESPACE,
container=cname,
tail_lines=tail_lines,
previous=True,
)
+ "\n"
)
except Exception:
previous_log = ""
current_log = previous_log + v1.read_namespaced_pod_log(
name,
K8S_NAMESPACE,
container=cname,
tail_lines=tail_lines,
)
except Exception as e:
LOG.exception("name=%s, container_name=%s", name, cname)
current_log = "获取container_name=%s日志失败, 原因如下:\n%s" % (
cname,
e,
)
log += current_log + "\n"
return log
except Exception as e:
LOG.exception("name=%s, tail_lines=%s", name, tail_lines)
return "获取日志失败, 原因如下:\n%s" % eget_pod_event
使用场景:打包日志的时候,顺带打包 events。但是由于当前没有持久化存储,所以 event 只有 1h。
从 Kubernetes 1.19 版本开始,引入了 Event TTL 机制,用于自动清理旧的事件。默认配置下:
- Normal 事件:
Normal类型的事件默认保存时间是 1 小时。Normal事件通常表示系统中的正常操作,例如 Pod 成功调度、容器正常启动等。- Warning 事件:
Warning类型的事件默认保存时间是 24 小时。Warning事件一般表示系统中出现了一些可能需要关注的问题,如 Pod 调度失败、容器崩溃等。
def get_pod_event(podname: str) -> list[CoreV1Event]:
K8S.pre_load()
v1 = client.CoreV1Api()
try:
return v1.list_namespaced_event(
K8S_NAMESPACE,
field_selector="involvedObject.name={}".format(podname),
).items
except Exception:
LOG.exception(podname)
return []get_host
使用场景:获取 judgeflow 的端口信息,还是之前用于 copilot 的,现在无使用场景。
def get_host() -> dict:
"""获取pod的地址信息"""
if K8S.host_info is None:
K8S.host_info = load_yaml(KUBE_CONFIG_PATH)["clusters"][0]["cluster"]
return K8S.host_infodef get_job_service(job_id: int) -> Optional[HostPorts]:
"""获取job的端口 若不存在返回空dict"""
svcs = K8S.list_services([FILTER_LABEL_JUDGE_FLOW, f"{HELM_LABEL_JOB_ID}={job_id}"])
if len(svcs) == 0:
return None
if len(svcs) != 1:
LOG.error("job_id=%d的svc数量为%d", job_id, len(svcs))
svc = svcs[-1]
ports = {str(port.port): str(port.node_port) for port in svc.spec.ports}
host_info = K8S.get_host()
host = host_info["server"]
host = host[: host.rfind(":")]
return HostPorts(host=host, ports=ports)
@blue.get(
"/get_job_service_info/<int:job_id>",
summary="获取任务裁判流的端口ip信息",
security=[{"bearerAuth": []}],
)
def get_job_service_info(path: JobId):
info = get_job_service(path.job_id)
if info is None:
return Resp(False, msg="找不到job_id的svc")
return Resp(True, data=info)