高级代码
09-11 1f64679f14410634fea290c655edf7f760421c9c
开始尝试 SoftDeleteQuery、SimpleFormatter。
Debug 列表
技术向
MySQL 中 text, varchar, blob, longtext 怎么选择?
前后端的跨域问题如何解决?
最开始的数据库形式
benchmark_dataset
benchmark
name, version, desp, status, metrics(JSON), judger_id
dataset
name, desp, uri, owner
job_resource
job_id, role(master/slave), resource_id, container_id, occupied_resource
job
benchmark_id, strategy_id, dataset_id
status(initialized, pending, running, succeeded, failed, killed)
judger
name, image, owner_id, resource(该工具所需资源数)
resource
host, port, resource(json)
result
job_id, result
stragegy
name, owner_id, benchmark_id, config(BLOB), status
user
name, email, role, token
后面在发现对应关系后,进行融合
benchmark_dataset(deleted)
benchmark
add column: benchmark_config, judgeflow_config, dataset_ids
result(deleted)
job
add column: result
再更改
stragegy -> submit
submit
add column: log uri, copy_from_submit_id, benchmark_id
Docker 交互 + FTP 存放结果
9 月份
- b0f021e98bb0f950b40e851f6acc00792c2af7ad
- job-executor 雏形,先在 36 机器上运行,之后统一管理
- 最开始,各种文件还是先在本地进行管理,例如 dataset、策略 config、result file、bad case file
- 然后将文件 volume 映射到 judgeflow 容器内,也将对应的文件路径通过环境变量配置到容器内
- 启动后,每秒检查状态,检查 1440 秒,若一直出错,则更改为 killed 状态,stop 容器
def handler_job(job_id):
update_job(job_id, {Job.status: JobStatus.PENDING})
job_workspace = os.path.join(work_dir, str(job_id))
if not os.path.exists(job_workspace):
os.makedirs(job_workspace)
job = get_job(job_id)
submit_id = job.submit_id
dataset_id = job.dataset_id
submit = get_submit(submit_id)
submit_config = submit.config
benchmark_id = submit.benchmark_id
benchmark = get_benchmark(benchmark_id)
benchmark_name = benchmark.name
benchmark_config = benchmark.benchmark_config
judge_flow_config = benchmark.judge_flow_config
judge_flow_id = benchmark.judge_flow_id
judge_flow = get_judge_flow(judge_flow_id)
judge_flow_image = judge_flow.image
dataset = get_dataset(dataset_id)
update_job(job_id, {Job.status: JobStatus.RUNNING})
(
status,
host_log_filepath,
host_result_filepath,
host_bad_cases_filepath,
) = run_judge_flow(
benchmark_name,
benchmark_config,
judge_flow_config,
judge_flow_image,
dataset,
submit_config,
job_workspace,
)
if status == "succeeded":
update_job(job_id, {Job.status: JobStatus.SUCCEEDED})
f = open(host_result_filepath, "r")
content = f.read()
f.close()
result_json = json.loads(content)
update_job(
job_id,
{
Job.result: result_json,
Job.log_uri: host_log_filepath,
Job.badcase_uri: host_bad_cases_filepath,
},
)
elif status == "failed":
update_job(job_id, {Job.status: JobStatus.FAILED})
elif status == "killed":
update_job(job_id, {Job.status: JobStatus.KILLED})
# todo 当前就直接在36上跑了,将来会和其他资源共同管理
#
def run_judge_flow(
benchmark_name,
benchmark_config,
judge_flow_config,
judge_flow_image,
dataset,
submit_config,
job_workspace,
):
host_dataset_filepath = os.path.join(job_workspace, "datafile")
host_st_config_filepath = os.path.join(job_workspace, "st_config")
host_log_filepath = os.path.join(job_workspace, "test.log")
host_result_filepath = os.path.join(job_workspace, "result.json")
host_bad_cases_filepath = os.path.join(job_workspace, "bad_cases.json")
dataset_name = dataset.name
# todo 下载dataset文件等
host_dataset_filepath = dataset.local_file
write_to_file(submit_config, host_st_config_filepath)
os.system("touch {}".format(host_log_filepath))
os.system("touch {}".format(host_result_filepath))
os.system("touch {}".format(host_bad_cases_filepath))
container_dataset_filepath = "/root/test/datafile"
container_st_config_filepath = "/root/test/st_config_file"
container_log_filepath = "/root/test/test.log"
container_result_filepath = "/root/test/result.json"
container_bad_cases_filepath = "/root/test/bad_cases.json"
client = docker.DockerClient(base_url="tcp://172.27.231.36:2375")
docker_image_repo = judge_flow_image[: judge_flow_image.rindex(":")]
docker_image_tag = judge_flow_image[judge_flow_image.rindex(":") + 1 :]
client.images.pull(docker_image_repo, docker_image_tag)
volumes = [
"{}:{}".format(host_dataset_filepath, container_dataset_filepath),
"{}:{}".format(host_st_config_filepath, container_st_config_filepath),
"{}:{}".format(host_log_filepath, container_log_filepath),
"{}:{}".format(host_result_filepath, container_result_filepath),
"{}:{}".format(host_bad_cases_filepath, container_bad_cases_filepath),
]
judge_flow_volumes = (
judge_flow_config["volumes"]
if judge_flow_config is not None and "volumes" in judge_flow_config
else {}
)
for v in judge_flow_volumes:
volumes.append(v)
environment = {
"BENCHMARK_NAME": benchmark_name,
"BENCHMARK_CONFIG": json.dumps(benchmark_config, ensure_ascii=False),
"DATASET_NAME": dataset_name,
"DATASET_FILEPATH": container_dataset_filepath,
"SUBMIT_CONFIG_FILEPATH": container_st_config_filepath,
"LOG_FILEPATH": container_log_filepath,
"RESULT_FILEPATH": container_result_filepath,
"BAD_CASES_FILEPATH": container_bad_cases_filepath,
}
judge_flow_environment = (
judge_flow_config["environment"]
if judge_flow_config is not None and "environment" in judge_flow_config
else {}
)
environment.update(judge_flow_environment)
# 格式:{'2222/tcp': 3333}
# {'2222/tcp': None}
ports = (
judge_flow_config["ports"]
if judge_flow_config is not None and "ports" in judge_flow_config
else {}
)
command = (
judge_flow_config["command"]
if judge_flow_config is not None and "command" in judge_flow_config
else ""
)
container = client.containers.run(
image=judge_flow_image,
ports=ports,
command=command,
volumes=volumes,
detach=True,
environment=environment,
)
retries = 1440
while retries > 0:
container.reload()
status = container.attrs["State"]["Status"]
if status == "exited":
if container.attrs["State"]["ExitCode"] == 0:
return (
"succeeded",
host_log_filepath,
host_result_filepath,
host_bad_cases_filepath,
)
else:
return (
"failed",
host_log_filepath,
host_result_filepath,
host_bad_cases_filepath,
)
retries -= 1
time.sleep(60)
container.stop()
return (
"killed",
host_log_filepath,
host_result_filepath,
host_bad_cases_filepath,
)
handler_job(26)关于数据集,上传数据集会加密转存到 ftp 中。不对称加密,解密时间太长了,最后选择了对称加密。
测试环境与正式环境的区分
是通过 config 文件的方式,config_prod.py & config_dev.py 里面存放了关于 DB、Kafka 等不同环境的配置。
最开始框架
最开始的时候,就先打算先整一个框架,将之前同步的逻辑转变为异步逻辑。
例如,最开始提交一个策略后,就开始启动评估程序,现在改为生成 id 后,就直接返回,由异步程序读取 mysql binlog,获得数据,写入到 kafka 中。这样避免了生成 id 后服务崩溃导致没有写入到 kafka 中。
同时会有定时任务去消费不同 kafka 主题的数据,比如 submit kafka 主题,会获得对应的竞赛 ID,获得对应的数据集 ID、评估程序 ID 等,针对每个数据集生成一个运行程序 Job 数据条目,插入到数据库中。
job kafka 主题,则会读取这个 job 对应的数据集 ID、评估程序 ID,启动对应的评估程序,将数据集、竞赛的信息文件挂载和环境变量传入。
最开始为了避免服务器不够用,会监控当前运行的程序数目。后面引入 K8S 后,避免过多的 pod 都在一直 pending,也算稍微先来先运行,就保留了这个机制。类似于限流机制。
关于安全性、意外情况有做哪些考虑
- 提交的时候,包含着提交者、辅助贡献者。贡献者会判断 id 是否存在,后续由于流动性较大,又增加了判断 id 是否还在公司内部。
load-sut 逻辑
before:
@resource_blue.route("/load_sut", methods=["POST"])
def load_sut():
job_id = request.form.get("job_id", type=int)
resource_name = request.form.get("resource_name")
docker_service = request.form.get("docker_service", "")
container_info = request.form.get("container_info")
compose_info = request.form.get("compose_info")
container_info_file = request.files.get("container_info_file")
compose_info_file = request.files.get("compose_info_file")
db-daemon 任务 + cal-submit-result executor + job executor
9.22 sampled-bad-case 接口,选择 1/10 或者 1/20 份 bad-case,给打榜人员进行调试。
同期差不多这个时候,前端(陈 yq)开始接入,
MR 44
K8S 交互 + helm + ceph
11-24 develop MR181
- service/helm:
- 将 sut 名字更改为 helm 可用名字
- views/resource.py:
- 增加 unload-sut 接口
- list-pods(labels)
- 打包保存 pod 日志
- helm uninstall pod
11-24 develop MR183
- models/__init__ 修改 listen_do_orm_excute(orm_execute_state: ORMExecuteState)
11-28 develop MR189 + MR190
- models/__init__ 修改 listen_do_orm_excute(orm_execute_state: ORMExecuteState)
11-29 develop MR192
- db-daemon/k8s 逻辑更新
11-29 develop MR197
- 增加 reverse-resource 保留资源配置,clean job 的时候,判断是否保留。
11-30 feat/get-sut-info MR199
- 增加 with-detail 参数,detail 中为 pod.to_dict() 结果
12-01 develop MR201
- db-daemon/gc.py 清理 workspace
- service/k8s 修改 get-pod-log 函数,有可能一个 pod 有多个 containers,需要获得各自 containers 日志,并在中间配置隔断符。
12-13 fix/soft_delete MR224
- models/__init__ 修改 listen_do_orm_excute(orm_execute_state: ORMExecuteState) fix: 软删除对 join 的表添加条件
12-15 feat/monitor —— feat/monitorxxx MR228 - MR233
- db-daemon/monitor 增加 prometheus 普罗米修斯打 label,记录过去 n 天的 job/submit 信息
12-18 develop
- service/helm、k8s:创建 Helm、K8S 工具类,将之前的工具函数放到该类下
- db-daemon/job.py 修改
12-20 feat/k8s-pod-describe
- get_bundle_job_info 函数:
- Before: pod.to_dict() pod info
- After:add 通过 k8s api 获得 pod events
12-20 develop
- db-daemon/k8s:增加 deployment + job + service 的清理。逻辑是,根据 label 获得 job-id,从数据库中获取到 job 状态。如果已结束 + 非保留 + 一定时间外,则清理资源。
12-21 leaderboard chart
- helm/chart 修改:大更改
12-22 feat/fix-helm-list
- helm list 增加 —max 20000
12-22 fix-delete-job
- k8s 的 delete_namespaced_job 增加 propagation_policy=‘Background’属性
"Background"值表示删除请求将在后台处理,不影响子资源。删除请求将尽快返回,而实际的资源清理将在后台进行。"Foreground"值表示删除请求将等待直到所有子资源被删除才会返回,这可能会导致删除操作的延迟,尤其是对于有大量子资源的情况。
12-25 fix/chart 又是一堆更新
01-02 feat/snapshot-log-simple-event
- 压缩日志快照内容中 event 内容,只记录 events 中的 type、reason、first_timestamp、last_timestamp、message。
03-15 fix/helm-pull
- “helm”, “pull”, chart_name, “—destination”, destination 指令,使用 tempfile 的临时文件夹。
03-19 feat/gpuvarcheck
- 不懂是做什么用的
03-21 feat/support-sub-chart
- views/resouces 中 load-sut 时候,将 pod-label、affinity 等注入到 global 中。
03-25 feat/system-setting
- 增加获得系统 setting 接口,含 forms/models/views/controllers
- 但是不知道是什么作用
该看 MR405 04-09
一些 bug:
-
合并 submit result 时,处理了不在 dataset-ids 范围内的 job
-
merge method 改进 03.01
-
fix k8s db daemon 寻找 pod 超时,03-11,MR366,有意思的 bug
-
white-list、black-list
-
network-white-list、network-black-list
-
其他 api 接口,为了适配其他运营平台
- submit-filter 接口:
- MR346 03-05 feat/interface 中的 views/submit + forms/submit
- 获取用户权限接口
- MR352 03-07 feat/auth
- submit-filter 接口:
-
resubmit 时,如果榜单不允许访问外部网络,且 submit 要求访问外部网络,且提交人不在允许外部访问的白名单里,直接 allow_network=False 提交
- MR378 feat/resubmit-network-limit
TODO
ceph 文件结构:datafile、workspace、submit、internal 文件夹。bad-case、detail-case 等放在什么位置。
如何解决连续两次访问数据库,数据库不一致的情况?
不存在的问题、外部服务异常的问题。