项目介绍
算法竞赛平台后端建设
项目背景:公司现有的算法效果评估工具存在扩展性差、耦合严重的问题,同时为增强算法人员的竞争机制,急需构建一个类似 Kaggle 的竞赛平台。该平台支持多赛事并行开展,实现算法团队打榜 PK 功能。
主要职责:负责从 0 到 1 搭建竞赛平台的后端服务,与公司内部多个平台进行对接,确保竞赛平台能够适应并处理各种复杂的算法策略评估场景。
工作内容:
- 平台架构设计与核心功能开发:完成竞赛平台后端服务的整体架构设计与核心流程搭建。进行数据库建模,接口设计,并完成核心流程的开发工作。通过集成 K8S 和 Ceph 实现容器化隔离与存储共享,为算法团队提供稳定的策略提交和评估环境,确保评估程序能够准确评估算法策略。
- 复杂场景评估方案实现:针对音视频生成等难以用程序直接效果评估的竞赛场景,与内部标注平台进行对接,采用异步评估的方式,有效解决了复杂场景下的策略效果评估难题,满足了平台多样化的评估需求。
- 用户体验与系统性能优化:针对前端响应缓慢、K8S 资源抢占导致任务乱序、服务 Pod 异常重启导致日志丢失、算法提交策略完成时间的无目标等待以及榜首服务策略开源化等问题和需求,通过优化前端请求、调整运行机制、增加平台集群状态面板等手段,提升用户体验。
项目成果:竞赛平台成功上线并稳定运行,截至目前,已累计承载 23 类共 100 场内部算法竞赛,为 150 名算法同学提供服务,处理策略提交次数达 10 万次,有效促进了算法团队之间的竞争与交流。
竞赛平台评测程序开发
主要职责:根据产品需求开发算法竞赛平台的各类算法应用场景的评测程序,包括 AutoML 机器学习、ASR 语音流式识别、TTS 文本转语音、算子模型训练与预测阶段优化等不同场景。
工作内容:
- 讨论并设计各应用场景下评测程序的评估流程、与算法策略服务的交互接口与评估指标。
- 为算法工程师提供满足交互接口的 starting-kit 服务代码,供工程师基于策略进行进一步完善。
- 针对算法工程师服务运行中错误问题,通过 Pod 日志、状态等辅助算法工程师排查代码 Bug。
项目成果:共完成 9 类应用场景共 30 余次的需求迭代。
面试问题准备
现有的算法效果评估工具是什么样的?
主要内容:评估工具的介绍、存在的问题
算法竞赛平台的架构设计
最开始的项目目标是,针对解决方案的几个业务场景,去重新设计一个算法评估平台。
考虑到需要评估的业务场景相对比较少,同时由于评估的策略也是解决方案团队产生的,因此提交的策略也较少。因此平台第一版的设计中,
-
架构设计逻辑是什么样的?
-
基于上面的设计逻辑,技术栈选择依据是:在技术栈上,采用了 Flask 框架 + MySQL 作为数据库存储,使用公司 ftp 作为文件存储(包括数据集、日志、bad-case、策略的指标结果等),使用 Docker 进行程序运行隔离,使用 Maxwell + Kafka + 后台定时任务异步处理。
对于业务场景的评估逻辑,则从平台侧独立出来,成为一个单独的程序,启动 docker 服务进行。对于后端服务运行,采用 Docker 的形式,即策略提供方提交一个可以运行的 Docker。
为了避免服务器上同时运行 Docker 过多导致服务器卡顿,采取了异步拉起运行的评测程序。中间通过 kafka 进行桥接,并为了服务解耦,选择了读取 mysql binlog 写入 kafka 的方式。
同时配置了一系列的后台定时任务,分别为:读取 kafka 的 submit 数据,根据当前运行程序数目判断是否启动一定数量新的评估程序;
之前的 job 结果是怎么处理的?
在当前方案运行不到一个月,公司想要将算法团队整合到一起进行内部竞争,打算将这个平台做大。
公司的运维也同时开始辅助进行平台建设,开始引入 K8S 集群进行大规模的一个算法效果评估流程。整个的组件也进行了更新。
在服务运行机制上,将 Docker 运行更改为 K8S 的 Pod 运行。算法工程师仍然是提交一个包含 Docker 镜像的 yaml 格式的文本内容,由评测程序根据设计,将其包装成 helm chart 提交给平台,平台部署成 K8S pod 形式,并将对应的 service name 返回。同样,评测程序也从 Docker 更新成了 Helm chart 形式运行在 K8S 上。
在文件存储上,从 FTP 更换成了 NFS 服务在三个 K8S 节点上进行文件的共享。后面为啥换成了 ceph?再后面模型比较大,ceph 网络带宽不够,对接了 modelhub。再后面,为了解决带宽问题,运维在 GPU 机房内的机器上增加了 juicefs,近距离带宽提高镜像拷贝速度。
考虑看一下历史的群里聊天记录,从而看看反馈的问题。
权限设计反而是相对靠后设计的,因为迭代的比较快,平台没有对外的 API 文档,所以接口方面的权限不是特别重要。
后来运维更换为了 ceph 作为文件存储,存储包括比赛数据集、每个策略的日志信息、比赛程序记录的每个策略的指标结果、策略的处理结果等。
在设计 ceph 的文件结构的时候,分成了 dataset, submit, customer, leaderboard 等几个目录。dataset 为数据集存放,下一级针对 ID 进行了取模 100,以避免该层级文件夹过多。submit 由于增长比较快,是根据策略的提交时间来作为下一级目录,再之后为当天的 submit-id。customer 则是提供给算法工程师进行模型存放的,运维提供了将 customer 挂载到机器上的命令。这一个目录是按照公司邮箱用户名的。leaderboard 则是一些比赛平台相关的。
后台任务
alarm
- alarm_unmarked_job,每天 9 点标注平台超过 xxx 工作日仍然没有标注完成的 job
- alarm_ceph_quota,每 10 分钟完成 CEPH 使用率检查
- alarm_unassigned_issue,每 2 分钟从 jira 中判断是否有“产品技术设施” xxx 时间没有响应的工单。
cal_submit_result
3 分钟尝试合并。
list_to_merge_submit()filter_orm(SubmitResult, submit_id=submit.id, only_one="none")尝试获取 submitResult,没有为 None。filter_orm(Job, id=list(map(int, job_ids_str.split(","))))any(job.status not in FINISHED_STATUS and job.status != JobStatus.EVALUATING for job in jobs)passbenchmark.dataset_ids=set(job.dataset_id.ids)calc_submit_result(merge_method, jobs)bundle_submit的sut-chart,log,bad-case,detail-caseif not submit_succeeded,clear_submit_extra_resource
@exiter.continuous
def update_submit_results():
LOG.log(15, "打包submit_result")
finished_submits = list_to_merge_submit()
submit_results: list[SubmitResult] = []
for finished_submit in finished_submits:
try:
submit_id, job_ids_str = finished_submit
submit = get_orm(Submit, submit_id)
submit_result = filter_orm(SubmitResult, submit_id=submit.id, only_one="none")
add_submit_result = False
if submit_result is None:
submit_result = SubmitResult(submit_id=submit_id)
add_submit_result = True
jobs = filter_orm(Job, id=list(map(int, job_ids_str.split(","))))
if any(job.status not in FINISHED_STATUS and job.status != JobStatus.EVALUATING for job in jobs):
continue
if submit is None: # pragma: no cover
continue
benchmark = get_orm(Benchmark, submit.benchmark_id)
if benchmark is None:
continue
dataset_ids_set = set(benchmark.dataset_ids)
submit_succeeded = False
if (
all(job.status == JobStatus.SUCCEEDED or job.status == JobStatus.EVALUATING for job in jobs)
and set([job.dataset_id for job in jobs]) == dataset_ids_set
):
# 全部成功计算结果
LOG.info("正在合并submit_id=%d的结果", submit_id)
try:
merge_method = benchmark.benchmark_config.get("merge_commit_result_method")
merge_method = BenchmarkMergeCommitResultMethod.model_validate(merge_method)
submit_result.result = calc_submit_result(merge_method, jobs)
submit_succeeded = True
except exc.ResultMergeError:
LOG.exception("submit_id=%d无法合并结果", submit_id)
except ValidationError:
LOG.exception("benchmark_id=%d的合并方法格式错误", submit.benchmark_id)
LOG.info("正在打包submit_id=%d的被测服务chart包", submit_id)
submit_result.bundled_sut_chart_uri = bundle_submit(submit_id, jobs, "sut_chart")
if benchmark.description.get("is_closed_book") == 1:
LOG.info("benchmark为闭卷考试,跳过打包submit_id=%d的错例", submit_id)
else:
LOG.info("正在打包submit_id=%d的错例", submit_id)
submit_result.bundled_bad_case_uri = bundle_submit(submit_id, jobs, "bad_case")
LOG.info("正在打包submit_id=%d的详情", submit_id)
submit_result.bundled_detail_case_uri = bundle_submit(submit_id, jobs, "detail_case")
LOG.info("正在打包submit_id=%d的日志", submit_id)
submit_result.bundled_log_uri = bundle_submit(submit_id, jobs, "log")
if add_submit_result:
add_orm(submit_result)
else:
update_orm(
SubmitResult,
submit_result.id,
{
SubmitResult.result: submit_result.result if submit_result.result else {},
SubmitResult.bundled_sut_chart_uri: submit_result.bundled_sut_chart_uri,
SubmitResult.bundled_bad_case_uri: submit_result.bundled_bad_case_uri,
SubmitResult.bundled_detail_case_uri: submit_result.bundled_detail_case_uri,
SubmitResult.bundled_log_uri: submit_result.bundled_log_uri,
},
)
# 如果没跑成功,直接清理retag的镜像和copy的存储
if not submit_succeeded:
clear_submit_extra_resource(submit)
except Exception: # pragma: no cover
LOG.exception(finished_submit)
LOG.log(15, "submit_result打包完成")
scheduler = BlockingScheduler()
scheduler.add_job(update_submit_results, "interval", seconds=180, next_run_time=datetime.now())
catch_scheduler_exc_callback(scheduler)
scheduler.start()gc_submit_extra_resource
每天执行依次,清理 14 天的 submit extra resource。
calculate_credit()获得每个用户的每个 submit 的 credit 值,选择没有 credit 的 submitlist_to_cleaned_submit_extra_resource获得 submit-result 合并在 14 天前的 submitsclear_submit_extra_resource清理 submit extra resource
@exiter.continuous
def submit_extra_resource_garbage_collection(IS_DELETE: bool, EXPIRE_DAY: int):
before_date = date.today() - timedelta(days=EXPIRE_DAY)
credit_submits = set()
user_credit_info = calculate_credit()[0]
for user in user_credit_info:
for user_credit_record in user_credit_info[user][0]:
if user_credit_record.submit_credit > 0:
credit_submits.add(user_credit_record.submit_id)
submits = list_to_cleaned_submit_extra_resource(merged_at_before=before_date, include_all_failed_result=True)
to_clean_submits = list()
to_clean_submit_ids = list()
for submit in submits:
if submit.id not in credit_submits:
benchmark = get_orm(Benchmark, submit.benchmark_id)
if benchmark is None:
continue
to_clean_submits.append(submit)
to_clean_submit_ids.append(submit.id)
LOG.info("[submit extra resource gc] 可被gc的submit ids: %s", to_clean_submit_ids)
if IS_DELETE:
clear_submit_extra_resource(to_clean_submits)
LOG.log(15, "对submit extra resource的gc结束")
SUBMIT_EXTRA_RESOURCE_DELETE = True
SUBMIT_EXTRA_RESOURCE_EXPIRE_DAY = 14
scheduler = BlockingScheduler()
scheduler.add_job(
submit_extra_resource_garbage_collection, "cron",
args=(SUBMIT_EXTRA_RESOURCE_DELETE, SUBMIT_EXTRA_RESOURCE_EXPIRE_DAY), hour=0, minute=0, second=0
)
catch_scheduler_exc_callback(scheduler)
scheduler.start()gc
job
针对 Job 数据
JOB_SUBMIT_PERIOD_TIME = 2
def handle_job_insert(job: Job):
# check submit, benchmark exists, benchmark submit-mode
# ...
try:
if not wait_limit(job.id):
return
handle_job(job.id)
time.sleep(JOB_SUBMIT_PERIOD_TIME)
except exiter.RequestExit as e: # pragma: no cover
raise e
except Exception as e: # pragma: no cover
LOG.exception(job)
bundle_error_log(job.id, str(e))
try:
update_orm(Job, job.id,
{Job.status: JobStatus.SUBMIT_ERROR, Job.finish_at: func.now(), Job.progress: 1}
)
kill_submit(job.submit_id)
except Exception as e:
LOG.exception(e)
IN_K8S_STATUS = [JobStatus.SUBMITTED, JobStatus.PENDING, JobStatus.RUNNING]
def wait_limit(job_id: int) -> bool:
"""等待资源放行 限制同时job运行数 禁止在有job处于正在拉起状态中添加新job"""
while get_orm(Job, job_id).status == JobStatus.INITIALIZED:
if exiter.exited: # pragma: no cover
raise exiter.RequestExit
jobs = filter_orm(Job, status=IN_K8S_STATUS)
if len(jobs) >= int(get_system_setting_by_name("job_limit").value):
LOG.warning("运行job数量过多... 正在等待")
else:
return True
time.sleep(JOB_STATUS_PERIOD_TIME)
return False处理 Job 的逻辑
def handle_job(job_id: int):
# get-orm, assert job + dataset + benchmark + submit is not None
# ...
try:
submit_config = load_yaml(CFile.SUBMIT_CONFIG(submit.id))
config = SubmitConfig.model_validate(submit_config)
# modelhub 校验
if benchmark.benchmark_config.get("moduleHub_disabled", False):
if len(config.leaderboard_options.modelhub) > 0:
raise Exception("榜单不允许使用moduleHub!")
if job.original_job_id is None:
if len(config.leaderboard_options.nfs) > 0:
# 同一submit的job同时处理
jobs = filter_orm(Job, submit_id=submit.id, status=JobStatus.INITIALIZED)
for j in jobs:
if len(config.leaderboard_options.modelhub) > 0:
LOG.info("正在为 job {} load_modulehub_model".format(j.id))
load_model_result = load_modulehub_model(j, submit, benchmark, config)
if not load_model_result:
continue
update_orm(Job, j.id, status=JobStatus.PREPARING)
return
if len(config.leaderboard_options.modelhub) > 0:
load_model_result = load_modulehub_model(job, submit, benchmark, config)
if not load_model_result:
return
update_orm(Job, job.id, status=JobStatus.PREPARING)
return
if ceph2volume(dataset.uri) == SECRET_DATAFILE:
update_orm(Job, job.id, status=JobStatus.PREPARING)
return
submit_job2k8s(job_id)
except Exception as e:
LOG.exception("")
bundle_error_log(job_id, str(e))
update_orm(Job, job_id, {Job.status: JobStatus.SUBMIT_ERROR, Job.finish_at: func.now(), Job.progress: 1})
kill_submit(job.submit_id)提交 Job 到 k8s 中
@exc_wrap
def submit_job2k8s(job_id: int):
if copy_job_result(job_id):
return
job = get_orm(Job, job_id)
if job.status in FINISHED_STATUS+IN_K8S_STATUS:
return
submit = get_orm(Submit, job.submit_id)
dataset = get_orm(Dataset, job.dataset_id)
benchmark = get_orm(Benchmark, submit.benchmark_id)
judge_flow = get_orm(JudgeFlow, benchmark.judge_flow_id)
judge_flow_config = benchmark.judge_flow_config or {}
value_path = CFile.JOB_VALUE(job_id, automake=True)
# chart2info(judge_flow.chart) 解析 judgeflow chart,异常则 kill-submit,return
chart_name, chart_version, value = chart2info(judge_flow.chart)
# 处理 command
if "command" in judge_flow_config:
if not is_json(judge_flow_config["command"]):
judge_flow_config["command"] = json.dumps(judge_flow_config["command"].split(" "))
# 将榜单 judgeflow-config 与 chart 中 value 进行合并
value = merge_values(value, judge_flow_config)
# 根据数据集 uri 将数据从 ceph 拷贝到磁盘
dataset_filepath = ceph2disk(dataset.uri)
dataset_filename = os.path.basename(dataset_filepath)
st_config_filename = os.path.basename(CFile.SUBMIT_CONFIG(submit.id))
# 解析榜单中 workspace + datafile + submit 信息的挂载路径
container_workspace = find_in_dictlist(value["volumeMounts"], "name", "workspace")["mountPath"]
container_datafile = find_in_dictlist(value["volumeMounts"], "name", "datafile")["mountPath"]
if ceph2volume(dataset.uri) == SECRET_DATAFILE:
dataset_filename = os.path.basename(get_job_dataset_decode_path(job.id))
container_submit = find_in_dictlist(value["volumeMounts"], "name", "submit")["mountPath"]
# 综合 value
leaderboard_value: dict = {}
environment = {
"TZ": "Asia/Shanghai",
"JOB_ID": job_id,
"SUBMIT_ID": submit.id,
"DATASET_ID": dataset.id,
"LOAD_SUT_URL": f"{BASE_URL}/resource/load_sut",
"UNLOAD_SUT_URL": f"{BASE_URL}/resource/unload_sut",
"GET_JOB_SERVICE_INFO_URL": f"{BASE_URL}/resource/get_job_service_info",
"GET_JOB_SUT_INFO_URL": f"{BASE_URL}/resource/get_job_sut_info",
"GET_SUBMIT_INFO_URL": f"{BASE_URL}/submit/no_secret_info",
"GET_CEPH_DOWNLOAD_URL": f"{BASE_URL}/ceph/uri/download",
"GET_CEPH_FILEBROWSER_URL": f"{BASE_URL}/ceph/uri/filebrowser",
"REGISTER_MARK_TASK_URL": f"{BASE_URL}/job/register_mark_task",
"GET_IMAGE_HASH_URL": f"{BASE_URL}/resource/image/hash",
"LOAD_MODEL_HUB_URL": f"{BASE_URL}/resource/modelhub/load",
"GET_MODEL_HUB_STATUS_URL": f"{BASE_URL}/resource/modelhub/status",
"UPDATE_SUBMIT_URL": f"{BASE_URL}/submit/update",
"MODEL_HUB_ENTRYPOINT": ModelHubConfig.ENTRYPOINT,
"BENCHMARK_NAME": benchmark.name,
"DATASET_NUM": len(benchmark.dataset_ids),
"DATASET_NAME": dataset.name,
"DATASET_FILEPATH": os.path.join(container_datafile, dataset_filename),
"SUBMIT_CONFIG_FILEPATH": os.path.join(container_submit, st_config_filename),
"RESULT_FILEPATH": os.path.join(container_workspace, CFile.WORK_RESULT.value[0]),
"BAD_CASES_FILEPATH": os.path.join(container_workspace, CFile.WORK_BAD_CASE.value[0]),
"DETAILED_CASES_FILEPATH": os.path.join(container_workspace, CFile.WORK_DETAIL_CASE.value[0]),
"INTERNAL_CEPH_PATH": disk2ceph(get_benchmark_internal_space(benchmark.id)),
"WORKSPACE_CEPH_PATH": disk2ceph(get_job_workspace(job_id)),
"CUSTOMER_CEPH_PATH": disk2ceph(get_customer_path()),
"SUBMIT_PRIVATE_CEPH_PATH": disk2ceph(get_submit_private_space(job.submit_id)),
}
leaderboard_value["env"] = [{"name": k, "value": v} for k, v in environment.items()]
leaderboard_value["env"].append(
{
"name": "LEADERBOARD_API_TOKEN",
"valueFrom": {"secretKeyRef": {"name": "leaderboard-secret", "key": "leaderboard-api-token"}},
}
)
priorityclassvalue = "0"
script_dir = os.path.dirname(os.path.abspath(__file__))
if not os.access(script_dir + "/../cmds/get_priority", os.X_OK):
os.chmod(script_dir + "/../cmds/get_priority", 0o755)
go_executable_path = script_dir + "/../cmds/get_priority"
args = [
"--dbHost",
DBConfig.HOST,
"--dbPort",
str(DBConfig.PORT),
"--dbUser",
DBConfig.USER,
"--dbPassword",
unquote(DBConfig.PASSWD),
"--dbName",
"k8sconfig",
]
try:
result = subprocess.run([go_executable_path] + args, capture_output=True, text=True, check=True)
output = result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Error executing the Go program: {e}")
output = str(random.randint(1, 100000000))
try:
tmp = int(output)
tmp
except (ValueError, TypeError):
output = str(random.randint(1, 100000000))
priorityclassname = "priority" + output
priorityclassvalue = output
# todo: priorityclassname
if PriorityServer.Button != "on":
priorityclassvalue = "0"
leaderboard_value["env"].append(
{
"name": "priorityclassname",
"value": priorityclassname,
}
)
leaderboard_value["env"].append(
{
"name": "priorityclassvalue",
"value": priorityclassvalue,
}
)
if job.original_job_id:
leaderboard_value["env"].append(
{
"name": "MARK_TASK_ID",
"value": get_orm(Job, job.original_job_id).mark_task_id,
}
)
leaderboard_value["podLabels"] = {}
leaderboard_value["podLabels"][HELM_LABEL_JOB_ID] = str(job_id)
leaderboard_value["podLabels"][HELM_LABEL_SUBMIT_ID] = str(submit.id)
leaderboard_value["priorityclassname"] = priorityclassname
leaderboard_value["priorityclassvalue"] = priorityclassvalue
value = merge_values(value, leaderboard_value)
fill_volumes(job_id, value)
if job.original_job_id:
copy_from_original_job(job.id, job.original_job_id)
value["affinity"] = value.get("affinity") or {}
value["affinity"]["nodeAffinity"] = {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 1,
"preference": {
"matchExpressions": [{"key": "contest.4pd.io/gpu", "operator": "NotIn", "values": ["on"]}]
},
}
]
}
value = format_values(value)
dump_yaml(value_path, value)
helm_name = JUDGE_FLOW_HELM_NAME.format(job_id)
ok, msg = Helm.helm_install(helm_name, chart_name, chart_version, value_path)
if not ok:
bundle_error_log(job_id, msg)
update_orm(Job, job_id, status=JobStatus.SUBMIT_ERROR, submit_at=func.now(), finish_at=func.now(), progress=1)
kill_submit(job.submit_id)
else:
update_no_finish_job(job_id, status=JobStatus.SUBMITTED, submit_at=func.now())除此之外,Job 有一个 prepare 状态,该状态适用于提交文本中包含 leaderboard_options.nfs 内容。这个内容有两个选择,一个是 ceph 挂载,一个是 juicefs 挂载。
该文件中还包含了 ceph 挂载、juicefs 挂载的处理,有点多。
k8s
- k8s-clean 和 k8s-watcher 两个定时任务。
- k8s-clean 1 分钟执行一次
list_deployments+list_jobs+list_servicesgetcattr(workload, "metadata.labels", {})尝试从 label 中读取 job-idname = workload.metadata.name获得资源 name 后续 deleteif job exist + db job.status 结束状态 + db.job.update_at < 14 days before,非强制 delete func
- k8s-watcher 5 秒钟执行一次
handle_pod功能:若为 judgeflow-pod 更新 job 状态,否则返回 job-id- 根据 label 获得 job-id,若 db.job.status 结束状态 or 策略 pod,返回 job-id
- 否则,当前 pod 为 judgeflow pod,更新 job 状态。
- phase=failed,kill-submit,handle-finish(failed)
- phase=pending, when job.status != pending, update-pending (后续 job 阶段用)
- phase=running, when job.status != running, update-running, elif update-at 超过 2 分钟, update-update-at
- phase=succeeded, handle-finish(success)
- 在
handle_pod之后,获得当前 K8S 中存在的 job-ids,和 DB 中 IN_K8S_STATUS 状态的 Job 进行比较 - if
not in K8S + update delay 10min, 认为寻找 pod 超时,update_no_finish_job(job.id, KILLED) - elif
running time > 7days, running too long,kill_submit
@exiter.continuous
def k8s_cleaner():
k8s_clean(K8S.list_deployments(), K8S.delete_deployment)
k8s_clean(K8S.list_jobs(), K8S.delete_job)
k8s_clean(K8S.list_services(), K8S.delete_service)
def k8s_clean(workloads: list, delete_func: Callable[[str, bool], Any]):
for workload in workloads:
name = workload.metadata.name
labels = getcattr(workload, "metadata.labels", {})
job_id = intNone(labels.get(HELM_LABEL_JOB_ID))
if job_id is None:
continue
job = get_orm(Job, job_id)
if (
job is not None
and job.status in (FINISHED_STATUS + EVALUATE_STATUS)
and job.updated_at + FTDT < datetime.now()
):
submit = get_orm(Submit, job.submit_id)
if submit is not None and not submit.reserve_resource:
LOG.info("HELM UNINSTALL 10分钟后未生效, 调用K8S命令清理 JOB %s 的资源 %s", job.id, name)
delete_func(name, False)
@exiter.continuous
def k8s_watcher():
handled_jobs = set()
# 提前list避免处理pod时间过长导致 新的submitted的job被认为是找不到pod
k8s_jobs = filter_orm(Job, status=IN_K8S_STATUS)
pods = K8S.list_pods()
for pod in pods:
try:
job_id = handle_pod(pod)
if job_id is not None:
handled_jobs.add(job_id)
except Exception: # pragma: no cover
LOG.exception(pod.metadata.name)
if len(pods) == 0:
return
for job in k8s_jobs:
if job.id not in handled_jobs and job.updated_at + NFT < datetime.now():
LOG.error("寻找pod超时, job_id=%d", job.id)
bundle_error_log(job.id, "寻找pod超时")
update_no_finish_job(job.id, status=JobStatus.KILLED)
else:
if job.status == JobStatus.RUNNING:
try:
job_running_timedelta = get_job_running_timedelta(job.id)
if job_running_timedelta > timedelta(days=BaseConfig.JOB_RUNNING_THRESHOLD_DAYS):
job = get_orm(Job, job.id)
if job.status == JobStatus.RUNNING:
LOG.warning("实际运行时间超过限制, job_id={}, 即将kill submit_id={}".format(job.id, job.submit_id))
kill_submit(job.submit_id)
except Exception as e:
LOG.exception(e)
LOG.error("获取job实际运行时间失败, job_id={}".format(job.id))
def handle_pod(pod: V1Pod) -> Optional[int]:
labels = getcattr(pod, "metadata.labels", {})
job_id = intNone(labels.get(HELM_LABEL_JOB_ID))
if job_id is None or HELM_LABEL_TYPE not in labels:
return None
job = get_orm(Job, job_id)
if job is None:
return None
if job.status in (FINISHED_STATUS + EVALUATE_STATUS) or labels[HELM_LABEL_TYPE] != HELM_LABEL_TYPE_JUDGE_FLOW:
return job_id
phase = getcattr(pod, "status.phase", "Error")
identity = "name=%s, job=%s" % (pod.metadata.name, job)
if phase == "Failed":
LOG.info("%s: pod状态Failed, 正在处理", identity)
kill_submit(job.submit_id)
handle_finished(job, JobStatus.FAILED)
elif phase == "Pending":
if job.status != JobStatus.PENDING:
LOG.info("%s: pod状态第一次Pending, 修改状态", identity)
update_no_finish_job(job_id, status=JobStatus.PENDING)
elif phase == "Running":
if job.status not in [JobStatus.RUNNING, JobStatus.EVALUATING]:
LOG.info("%s: pod状态第一次Running, 修改状态", identity)
update_no_finish_job(job_id, status=JobStatus.RUNNING, running_at=func.now())
elif job.updated_at + RUP < datetime.now():
update_no_finish_job(job_id, updated_at=func.now())
elif phase == "Succeeded":
LOG.info("%s: pod状态Succeeded, 正在处理", identity)
handle_finished(job, JobStatus.SUCCEEDED)
else:
# phase == "Unknown"
LOG.error("%s: pod状态Unknown", identity)
return job_id
def handle_finished(job: Job, status: JobStatus):
"""处理结束的任务"""
result = fetch_result(job.id)
if result == {}:
status = JobStatus.FAILED
else:
# job = get_orm(Job, job.id)
if job.mark_task_id is not None:
status = JobStatus.EVALUATING
bundle_log(job.id)
if job.running_at is None:
update_orm(Job, job.id, running_at=Job.submit_at)
dataset = get_orm(Dataset, job.dataset_id)
if dataset is not None and ceph2volume(dataset.uri) == SECRET_DATAFILE:
decode_path = get_job_dataset_decode_path(job.id)
try:
os.remove(decode_path)
os.removedirs(os.path.dirname(decode_path))
except Exception:
LOG.exception("删除文件夹加密数据集文件夹失败, job_id: %d", job.id)
update_orm(Job, job.id, status=status, finish_at=func.now(), result=result, progress=1)
if job.original_job_id is not None:
update_orm(Job, job.original_job_id, status=JobStatus.EVALUATED)
submit = get_orm(Submit, job.submit_id)
if submit is not None and not submit.reserve_resource:
clear_job(job)
scheduler = BlockingScheduler()
scheduler.add_job(k8s_watcher, "interval", seconds=5, next_run_time=datetime.now())
scheduler.add_job(k8s_cleaner, "interval", seconds=60, next_run_time=datetime.now())monitor
- report_contest_status,15s 一次
- report_total_monitoring_data,15s 一次
- report_current_running_duration_monitoring_data,15s 一次
- report_last1h_duration_monitoring_data,10min 一次
scheduled_task
- 每 5min 执行一次,判断是否有定时任务要执行。
list_to_run_scheduled_task,ScheduledTask.scheduled_at < func.now()执行时间小于当前时间。task exists+deleted none+status != pending排除不需要执行的任务- 更新 task 状态,执行 task。
res = requests.request(method, url, header, data, params)- 根据 res 中的 success 更新 task 状态,success or failed。
@exiter.continuous
def handle_scheduled_tasks():
scheduled_tasks = list_to_run_scheduled_task()
for scheduled_task_id in scheduled_tasks:
try:
scheduled_task = get_orm(ScheduledTask, scheduled_task_id)
if not scheduled_task:
LOG.info("找不到定时任务{}!".format(scheduled_task_id))
continue
if scheduled_task.deleted_at:
LOG.info("定时任务{}已经被删除, 跳过".format(scheduled_task_id))
continue
if scheduled_task.status != ScheduledTaskStatus.PENDING:
LOG.info("定时任务{}状态不是等待中, 跳过".format(scheduled_task_id))
continue
update_orm(ScheduledTask, scheduled_task_id,
{ScheduledTask.run_at: func.now(), ScheduledTask.status: ScheduledTaskStatus.RUNNING})
LOG.info("执行定时任务: {}".format(scheduled_task_id))
request_url = "{}{}".format(BASE_URL, scheduled_task.request_url)
payload = None
args = None
if "json" in scheduled_task.request_detail:
payload = json.dumps(scheduled_task.request_detail["json"])
elif "form" in scheduled_task.request_detail:
payload = scheduled_task.request_detail["form"]
if "args" in scheduled_task.request_detail:
args = scheduled_task.request_detail["args"]
response = requests.request(method=scheduled_task.request_method, url=request_url,
headers=scheduled_task.request_detail["headers"],
data=payload, params=args).json()
final_status = ScheduledTaskStatus.SUCCEEDED if response["success"] else ScheduledTaskStatus.FAILED
update_orm(ScheduledTask, scheduled_task_id, {ScheduledTask.finish_at: func.now(),
ScheduledTask.status: final_status,
ScheduledTask.result: response})
except Exception as e:
LOG.exception(e)
update_orm(ScheduledTask, scheduled_task_id, {ScheduledTask.finish_at: func.now(),
ScheduledTask.status: ScheduledTaskStatus.FAILED})
LOG.log(15, "定时任务检查完成")
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(handle_scheduled_tasks, "interval", seconds=300, next_run_time=datetime.now())
catch_scheduler_exc_callback(scheduler)
scheduler.start()submit_package
submit
- 读取每一条 submit,获取对应的 benchmark,若是外部导入,则 pass 掉。
- 将 submit config 保存到 ceph 上
- 根据数据集创建对应的 Job 插入数据库中
consumer = KafkaConsumer(
KafkaConfig.SUBMIT_TOPIC,
bootstrap_servers=KafkaConfig.SERVER,
group_id=KafkaConfig.SUBMIT_CONSUMER_GROUP_ID,
# 当没有初始偏移量或当前偏移量无效时的重置策略,"earliest"表示从最早的消息开始消费
auto_offset_reset="earliest",
# 禁用自动提交偏移量,需要手动提交
enable_auto_commit=False,
# 最大轮询间隔,单位为毫秒,这里设置为1小时
max_poll_interval_ms=60 * 60 * 1000,
value_deserializer=lambda value: json.loads(value),
security_protocol=KafkaConfig.SECURITY_PROTOCOL if hasattr(KafkaConfig, "SECURITY_PROTOCOL") else "PLAINTEXT",
sasl_mechanism=KafkaConfig.SASL_MECHANISM if hasattr(KafkaConfig, "SASL_MECHANISM") else None,
sasl_plain_username=KafkaConfig.SASL_PLAIN_USERNAME if hasattr(KafkaConfig, "SASL_PLAIN_USERNAME") else None,
sasl_plain_password=KafkaConfig.SASL_PLAIN_PASSWORD if hasattr(KafkaConfig, "SASL_PLAIN_PASSWORD") else None,
)
exiter.register_exit_func(consumer.close)
for consumer_record in consumer:
with exiter as ok:
if not ok:
raise exiter.FailedStart
try:
msg = consumer_record.value
LOG.info("insert, %s", msg["data"])
if db_conn_retry(Submit, msg["data"]["id"]):
handle_submit_insert(get_orm(Submit, msg["data"]["id"]))
else:
LOG.error("获取不到submit, %s", msg["data"])
except Exception:
LOG.exception(consumer_record)
consumer.commit()
def handle_submit_insert(submit: Submit):
benchmark = get_orm(Benchmark, submit.benchmark_id)
if benchmark is None:
LOG.error("提交id=%d的榜单id=%d不存在", submit.id, submit.benchmark_id)
return
if benchmark.benchmark_config.get("submit_mode") in ["import", "sync"]:
return
config_path = CFile.SUBMIT_CONFIG(submit.id, automake=True)
write_bfile(config_path, submit.config)
jobs = [Job(submit_id=submit.id, dataset_id=dataset_id) for dataset_id in benchmark.dataset_ids]
add_orms(jobs)
LOG.info("处理完成, %s", submit)算法竞赛平台中复杂场景有哪些?
最开始有 text2video 文本转视频, text2music 文本转音乐,后面业务需要的 tts 文本转音频等,这种直接用程序无法评估,因此需要标注人员进行标注。
主打一个阶段拆分 + 异步避免资源浪费。
首先,在结果评估方面的方案为,由产品去确定如何评估策略产生的视频、音频等,比如视频中动作是否流畅,文字中关键元素是否都包含之类,这些由标注平台去生成标注配置。产品再给出基于这些标注结果,评估指标的生成逻辑。(简写)
其次,关键的就是如何对接标注平台。最简单的方案是,竞赛平台不做任何改动,评测服务在获得算法策略服务结果后,将待标注数据发送给标注平台,然后一直等待标注平台给出 callback。但是这种会导致资源浪费严重,因为标注工作并非当日能完成,也存在周五提供数据、下周一才标注完成的可能。因此,为了资源过度浪费问题,将评估流程拆分为两个阶段。
由于避免服务运行中不断增加的 CPU、内存资源,导致超过资源上限问题,所以在配置的时候,强制 requests 和 limits 是一致的。
第一阶段依旧和过往一样,等待策略服务启动后进行接口交互获得待标注结果。获得交互结果后,将数据 + job-id 发送给标注平台后,就正常结束运行。
待标注平台标注完成后,调用平台的 /job/rerun/id???? 接口,重新拉起评测服务。评测程序可以根据挂载的文件目录中配置的标识信息识别出当前处于二阶段,从而请求标注平台标注结果,根据标注结果计算得出 metric 指标结果,保存下来,再结束运行。
这种反复拉起机制提供了极大的扩展机制,使得评估流程可以拆分到多个阶段,评测程序在挂载的目录下记录所处阶段,并控制每个阶段需要执行的逻辑,只需要在最后一个阶段提供指标结果即可。
Job 的状态如何配置的?比如后面展示未运行完成。
考虑采用异步机制,也就是评估程序在获得算法服务产生的结果后,请求平台将算法服务 pod 关闭。然后评估程序将服务结果,连同 submit-id、dataset-id,pod-name 等信息发送给标注平台,然后结束自己的服务。标注平台在处理完标注结果后,会请求竞赛平台会重新拉起对应评估程序 pod,评估程序请求标注平台标注结果,进行最后的指标计算逻辑。
同样后面增加了 automl,对接了线上服务。
接口权限是如何设计的
权限模型有:RBAC 和 ABAC。基于角色的访问控制(Role-based access control,简称 RBAC),指的是通过用户的角色(Role)授权其相关权限,这实现了更灵活的访问控制,相比直接授予用户权限,要更加简单、高效、可扩展。基于属性的访问控制(Attribute-Based Access Control,简称 ABAC)是一种非常灵活的授权模型,不同于 RBAC,ABAC 则是通过各种属性来动态判断一个操作是否可以被允许。
在设计上,最开始采用经典 RBAC(用户-角色-权限) 的权限模型,平台使用方可以分为:平台管理者、比赛程序开发者、算法工程师等,平台管理者能访问所有接口,算法工程师访问的接口最少。但是后来发现这种设计在某些接口上存在一定局限性,比如算法工程师可以终止自己的策略运行,但是不能终止其他人的策略,管理人员又可以终止所有人的策略。为解决这样问题,为权限表增加了两个关键字段:规则表达式和优先级,进化成能根据请求内容动态判断权限的 ABAC 模型。
通过自定义的权限规则表达式,可以实现非常细粒度的权限控制,灵活性更高。引入优先级机制,使得多条权限的处理顺序更加鲜明,确保更严格或更重要的权限规则优先生效。
在实现上,利用 flask 的拦截器机制,基于请求 header 中的 token 获得请求用户 ID,之后解析出当前请求的 URL 链接、请求方法、path 参数、查询条件、请求体等。验证时,先获取到当前用户的所有权限,并根据权限优先级进行排序,再依次基于权限的规则表达式,判断能否通过校验。
基于拦截器的实现,使得接口的权限判断与接口的处理逻辑完全隔离开。而每个接口的权限规则又是存放在数据库中,使得无需修改业务代码就可以动态的更新接口权限规则。
需要注意的是,
before_request_authenticate在鉴权的时候,会先过滤一些不需鉴权的接口,("/openapi", "/favicon.ico", "/health")接口。
def endpoint_validate(endpoint: str, user_id: int, *, path=None, query=None, body=None, form=None) -> bool:
"""校验权限"""
permissions = list_permissions(user_id, endpoint)
validated = False
permissions = sorted(permissions, key=lambda permission: (permission.priority, permission.updated_at))
for permission in permissions:
if rule_validate(permission.rule, user_id, path=path, query=query, body=body, form=form):
validated = permission.allow
return validated
def rule_validate(rule: str, user_id: int, *, path=None, query=None, body=None, form=None) -> bool:
"""校验权限规则"""
try:
tables = Base.__subclasses__()
table2get = {table.__tablename__: lambda id, table=table: get_orm(table, id) for table in tables}
validated = eval(
rule, {"user_id": user_id, **table2get, "path": path, "query": query, "body": body, "form": form}
)
return validated
except Exception:
LOG.exception("校验规则错误")
return False
@expunge_commit
def get_orm(orm_class: type[ORM], id: int) -> Optional[ORM]:
"""根据id获取orm对象"""
return session.get(orm_class, id, execution_options={"ignore_soft_delete": True})class Permission(Base, SoftDelete, Updatable):
endpoint: Mapped[str] = mapped_column(VARCHAR(1000), nullable=False, comment="接口的endpoint")
allow: Mapped[bool] = mapped_column(BOOLEAN, nullable=False, server_default="1", comment="允许/禁止")
priority: Mapped[int] = mapped_column(INTEGER, nullable=False, server_default="0", comment="优先级, 越大优先级越高")
rule: Mapped[str] = mapped_column(TEXT, nullable=False, default=text("1"), comment="匹配规则")
@app.before_request
def before_request_authenticate():
if request.path.startswith(ignore_prefix_path):
return None
token = request.headers.get("Authorization")
if token and token.startswith("Bearer "):
user = filter_orm(User, only_one="none", api_token=token[7:])
if user is not None:
g.user_id = user.id
@app.before_request
def before_request_permission():
if request.path.startswith(ignore_prefix_path):
return None
endpoint = request.method + " " + request.url_rule.rule
if len(filter_orm(Permission, endpoint=endpoint)) == 0:
return None
if not g.get("user_id"):
return Resp(False, msg="用户未登录")
header, cookie, path, query, form, body, raw = parse_parameters(
app.view_functions[request.endpoint], doc_ui=False
)
func_kwargs = validate_request(header, cookie, path, query, form, body, raw, request.view_args)
path, query, form, body = (
func_kwargs.get("path"),
func_kwargs.get("query"),
func_kwargs.get("form"),
func_kwargs.get("body"),
)
if not endpoint_validate(endpoint, g.user_id, path=path, query=query, body=body, form=form):
return Resp(False, msg="无访问权限")用户模块是如何设计的
- 最开始内部使用,是通过公司内部的 LDAP 来进行登录,注册或登录成功后,会将 token 提供给前端。后续前端会携带这个 token 来获得当前用户信息。
- 后续,公司尝试用这个系统来作为算法实习生招聘。于是增加了普通用户功能,用户、密码、邮箱注册后,会生成一个 token,然后使用公司邮箱向该用户提供的邮箱中发送验证链接,该验证链接对应竞赛平台的验证 API 接口。验证 API 判断 token 存在后,会将该用户状态从待验证用户变为普通用户。后续就可以正常使用。
- 如果 token 被盗用,也可以账号密码登录后重新生成新 token。
- 其中关于密码方面,注册和登录阶段采用加密传输,前端生成一个密钥对密码进行加密,并将两者发送到后端,后端解密出密钥。至于数据库方面,公司员工并不存储密码,外部用户则存储 md5 编码后的密码,从而避免密码的泄露。
- 除此之外,默认对于所有的接口都会进行日志,打印出当前的 path, query, body, form 等属性,但是会过滤掉用户相关的接口,从而避免密码通过日志途径进行泄露。
- 后续又增加了邀请码机制,一个邀请码只能给一人使用,从而避免用户注册。
当然当前多平台的服务在设计用户模块的时候,可能会提供多注册方式兼容(手机号、邮箱、第三方 OAuth),验证码防刷、登录态管理等机制。在注销的时候,也会有软删除机制,清理关联资源等。但是这些在当前的系统中并不是必要的,比如该平台主要还是面向内部成员,因此在设计上采取了非必要不增加复杂实现的思路。
比如 refresh token 可以进行登录态管理,也可以避免用户的 token 被盗,进而做一些其他事情。但是当前的系统很难出现这种情况,内部的成员成员如果做了这种事情,那可能会一定的处罚措施。
当前常见的用户设计方案:
-
身份生命周期管理(从注册到注销)
注册:多方式兼容(手机号 / 邮箱 / 第三方 OAuth),验证码防刷(滑动验证 + 频率限制),邀请码体系(分层注册权限)
登录:多端支持(Web/APP/ 小程序),密码策略(BCrypt 加密 + 盐值,强制复杂度),登录态管理(JWT+Refresh Token,设备指纹绑定)
注销:软删除机制(保留历史数据 + 不可恢复标记),关联资源清理(如发布内容、订单),注销后数据匿名化处理 -
数据安全与隐私
敏感字段分级:手机号 / 身份证号加密存储(AES + 密钥管理服务),头像 / 昵称等公开字段单独表
隐私控制:字段级权限(如允许部分用户查看手机号),操作审计(每次敏感信息访问记录 IP + 时间)
合规响应:支持用户数据导出( GDPR 请求),72 小时内响应删除请求(物理删除或逻辑标记) -
扩展性设计
字段可扩展:预留 JSON 扩展字段(如用户标签、第三方平台 ID),避免频繁改表
分库分表:用户 ID 哈希分片(如按 1024 分片),读写分离(主库写 + 从库读),热点用户缓存(Redis 存最近活跃用户)
多租户支持:租户 ID 隔离(不同企业用户数据物理隔离),租户级配额管理(如每个租户最多 500 用户) -
风控与异常处理
登录保护:连续 5 次失败锁定账号(需短信验证解锁),异地登录预警(IP 白名单 + 二次验证)
操作风控:修改密码 / 换绑手机需原图验证码 + 短信双重验证,高危操作(如提现)强制人脸识别
限流降级:用户模块接口单独限流(如注册接口 QPS 限制 50),熔断机制(第三方短信服务超时自动降级) -
与周边模块的协同
权限系统:用户 - 角色 - 权限绑定(RBAC 基础上支持 ABAC,如用户等级决定 API 调用频次)
通知系统:注册 / 登录异常即时推送(短信 + 站内信双通道),用户行为触发通知(如积分变动)
支付系统:用户钱包账户体系(主账户 + 子账户),交易流水与用户 ID 强关联 -
性能优化
缓存策略:高频访问的用户信息(如昵称、头像)Redis 缓存(TTL 30 分钟),缓存穿透防护(布隆过滤器)
异步处理:用户注册后的初始化任务(创建默认角色、同步到搜索索引)放入消息队列(Kafka/RabbitMQ)
批量操作:用户批量导出(Excel 生成异步处理,避免阻塞主线程)
配置管理是怎么做的
- 接口权限的规则表达式配置 —— permission 表中
- system-setting 表中存放了一些配置
- 其他一些比如数据库连接、Kafka 等配置,则是通过 K8S 的 configmap 来进行配置的
分级管理:开发 / 测试 / 生产环境隔离,用 Nacos/Apollo 实现动态热更新(如临时调整接口限流阈值);
敏感配置加密(如 AWS KMS 加密数据库密码,代码中解密);
禁止配置写死在代码里,所有配置通过环境变量注入。
反例:曾遇线上事故,因某个功能开关硬编码在代码中,紧急关闭时被迫全量发布。
数据库优化是怎么做的
写 SQL 语句
- json 处理
- 时间处理
API 设计与校验
Flask 采用了 GitHub 中的 Flask_openapi3,该库可以基于 pydantic 进行校验,将请求的各属性转换为 path, query, body, form 等对象中。
GPT 生成的内容:
- 版本控制:URL 带版本号(/v1/users),兼容旧版本(至少维护 1 个月);
- 参数校验:前端 + 后端双重校验,用 Pydantic 定义接口输入(如手机号格式、金额范围);
- 错误码:统一 5 位错误码(如 10001 用户未登录,20003 参数错误),附带英文说明便于排查。
消息队列是怎么用的
后台任务优雅退出
class ExitHandler:
"""优雅退出 退出仅捕获信号不做退出"""
def __init__(self, signals: list[signal.Signals] = [signal.SIGTERM]) -> None:
self.to_exit = False
self.running_num = 0
self.lock = threading.Lock()
self.exit_func_list = [sys.exit]
self.exit_args_list = [()]
self.exit_kwargs_list: list[dict] = [{}]
for sig in signals:
signal.signal(sig, self.grace_handler)
def grace_handler(self, signum, frame):
with self.lock:
self.to_exit = True
if self.running_num == 0:
self.exit()
def start_task(self) -> bool:
with self.lock:
if self.exited:
return False
self.running_num += 1
return True
def stop_task(self):
with self.lock:
self.running_num -= 1
if self.exited and self.running_num == 0:
self.exit()
def register_exit_func(self, func, *args, **kwargs):
self.exit_func_list.append(func)
self.exit_args_list.append(args)
self.exit_kwargs_list.append(kwargs)
def exit(self):
n = len(self.exit_func_list)
for i in reversed(range(n)):
self.exit_func = self.exit_func_list[i]
self.exit_args = self.exit_args_list[i]
self.exit_kwargs = self.exit_kwargs_list[i]
self.exit_func(*self.exit_args, **self.exit_kwargs)
def continuous(self, func):
@wraps(func)
def continuous_wrap(*args, **kwargs):
if self.start_task():
try:
func(*args, **kwargs)
except Exception:
LOG.exception(func)
self.stop_task()
return continuous_wrap
class FailedStart(Exception):
"""Break out of the with statement"""
class RequestExit(Exception):
"""Break out requested by code"""
# 下面这个部分,是用于 submit 类似的,上下文管理器方式判断退出逻辑
def __enter__(self):
return self.start_task()
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, self.FailedStart) or isinstance(exc_type, self.RequestExit):
return
if exc_type:
LOG.error("%s %s %s", exc_type, exc_val, exc_tb)
self.stop_task()
@property
def exited(self):
return self.to_exit安全与合规
- 容易忽视的点:
- 数据脱敏:日志、接口返回中隐藏用户手机号(138****1234);
- 审计日志:记录敏感操作(如删除用户、修改权限),保存至少 6 个月;
- 防攻击:WAF 拦截 SQL 注入、XSS,接口加验证码防爬虫(如登录接口)。
- 合规:金融项目必须通过等保 2.0,所有接口走 HTTPS,密码用 BCrypt 加密(不用 MD5)。
部署与运维
效率与稳定性平衡:
容器化:Docker+K8s,秒级扩容(如活动期间临时增加 3 倍实例);
灰度发布:先推 1% 流量到新版本,监控无异常再全量(避免全量翻车);
监控报警:Prometheus+Grafana 监控 QPS、响应时间、内存使用率,设置多级报警(短信 + 电话 + 邮件)。
案例:某次发布因未灰度,新代码导致 CPU 飙升,10 分钟内自动回滚,避免了线上事故。
做了哪些废弃的功能吗
gitlab 开源代码校验,提交的时候需要提交 gitlab 链接,能够访问。
但是,实际上只能通过 gitlab API 判断能否访问,里面的内容是否和镜像内的内容是否一样就难以判断。
最后废弃掉了。
服务监控都做了哪些
有服务监控吗,怎么做的?
grafana 中有监控,同时日志中打印了错误信息
用户体验与系统性能优化做了哪些
前端响应缓慢
请求合并、前后端 cache 机制
K8S 资源抢占导致任务乱序
服务 Pod 异常重启导致日志丢失
算法提交策略完成时间的无目标等待
增加 cpu、gpu 面板,提供当前还没有运行完成的 submit 信息。
榜首服务策略开源化
- 最开始,每个人上传到 dockerhub 任意的位置,然后策略提交信息公开
- 但是策略公开后,有人会将镜像删除,或者用无关镜像进行强行覆盖,导致策略开源化并没有成效
- 针对此,就将每个人提交的镜像拷贝到比赛平台的目录下,具体拷贝的时间,是什么时候呢?
- 虽然不能修改,
- 最后变成 helm chart 的形式
困难和挑战
与其说是困难,不如说是一次挑战。从 0 开始的完成一个相对复杂的项目,同时要和其他部门、其他角色去做沟通、交流。
技术实现
重构前后的对比
重构前:服务器通过 python 启动评估程序。
重构后:评估程序放在 K8S 容器内进行,方便进行管理、监控、程序隔离。
重构前:评估程序逻辑与平台逻辑耦合在一个仓库,通过模版方法对三个阶段进行自定义。
重构后:平台后端代码与评估程序代码完全分开,每个评估程序也独立一个仓库,评估程序内逻辑完全自定义。
重构前:提交人员只能提交一个镜像信息、环境变量等数据
重构后:提交内容为 yaml 形式的文本片段,不同的评估程序自定义提交的格式,比如除了配置 docker 镜像外、还可以配置 K8S 中的多个字段,如环境变量、readiness、资源大小等等。极大的提高了整个流程的自由度。
还有很多新功能的开发与演进,放在下一节。