做了什么?
- 完成推荐架构中短视频场景离线数据流的重构和迁移⼯作,共迁移了 400 多属性;
- AB 实验验证,图文场景,Latency 降低 15 ms;短视频场景,Staytime 正向显著;miss 数目从 3000 条 miss 500,降低到 miss 200 条,miss 率从 16% 降低到 6%。(但实际上,不记得这些场景的数据是哪些了)
- 优化新数据流架构的代码,完善⼯具类;了解整个推荐架构的运作流程;两周报警值班;完成算法同学对于推荐内容新增属性的需求等。
- 由于新架构是那一年正在逐渐配置的,所以很多设施不完善,为此增加了很多指标显示:
- 某次查看离线数据流的日志时,通过筛选 exception 发现的问题。在某个算子在处理数据的某个属性时,尝试将一个非数值型的字符串转换为 int 型数据,从而触发异常。在查看代码后了解到,那个属性本来应该是数值型,但是存在一些异常数据的该属性并非是数值型,导致的问题。
- 在观察到这个问题后,去询问 mentor 这类数据是应该直接忽略掉,还是应该联系相关部门进行数据修正。再确认直接忽略后,写了一个工具类,捕获类似的转换异常,并针对该属性进行修正。
- 由于这类处理失败的场景没有被显示的看出,因此增加了一个失败监控。
- 类似的,在线推荐场景下,有个 miss 率的情况,也没有被统计到。miss 率高有很多种情况,比如用户新发布的视频、图文等在在线服务中无法获取到对应属性,获取属性的服务不可用等情况。如果 miss 率过高,会影响到推荐效果。所以增加了 miss 率的监控以及报警。
- 两周报警值班的时候,没事就看看日志,以防出现一些不必要的问题。
- 有遇到一些下游服务响应慢进而影响在线服务响应速度的情况。
一些 DAG 或者推荐系统的 Java 项目:
实习经历
实习期间做了什么:
离线数据任务的迁移,主要是算子的迁移
迁移的原因
- 多方使用 —— 单业务出错,其他出错 —— 推出不合时宜的视频 or 视频被假限流
- 推荐是逐步推荐的过程 —— 候选、粗排、精排、混排 —— 候选&候选、其他&正排 —— 时间差 —— miss 比例
- 旧算子串行 —— 新算子并行 —— DAG
- 多业务方共用一个数据流处理,头条、懂车帝、问答等均在这里。那么一个业务方的代码出现了差错,会导致其他业务方处理错误。例如,某段代码没有进行异常检查,导致出现了异常,当前算子的后续逻辑就无法执行,那么可能就会造成一些不该被推荐的视频被推荐了出来,或者某些视频一直无法推荐,作者以为被限流了。
- 推荐架构是一个逐步推荐的过程,候选-粗排-精排-混排。候选用的候选属性,其他使用正排属性。在历史的数据流中,候选属性的处理和正排属性的处理是分开的。这看起来是没问题的,但对于新发布的视频会存在时间差。如果某个新视频在 1 点计算出了候选属性,在 1 点 10 分计算出了正排属性。而候选模型在 1 点 5 分时选择将这个新视频进行推荐,那么在后面的流程中,是无法及时获取这个视频的正排属性的,也就存在一些 miss 情况。根据在线服务的日志中,这个 miss 率大概处于 3000 条候选数据有 500 条正排属性缺失的 miss 比例。日志中显示了 tag 属于哪个类别的,从而看到。
- 在旧的数据处理框架中,算子是串行执行的。新的数据处理框架中,采用 DAG 图,让没有依赖关系的算子并行执行。
离线数据流介绍 —— 更清晰
我所处的部门负责的方向是协助算法同学将头条、抖音、西瓜等的内容推荐到 VIVO、OPPO、小米等厂商浏览器或视频 APP 上。其中这分为在线服务和离线数据处理两个部分,我负责的就是离线部分。以短视频为例,短视频可能包含视频的创建时间、过期时间、可推荐度、作者、关键词、展现数、点击数、抖音点击数、西瓜点击数等数据。离线数据处理部分就是将根据到来的短视频 ID 从多个服务上获得其相关数据,将其写入到存储中,供模型构建时读取。
有时候还会从 Redis 等地方读取数据,这里存着一些敏感词汇或者必要的 tag,可能用于 filter 或者其他情况。
当前框架的最底层是基于 Flink 流式处理,经过基础架构团队进行了封装,我们只需要实现输入数据流的处理和短视频属性的处理两个部分内容。
在短视频属性处理部分,我们将从不同的属性处理逻辑放到不同的处理类中,这些处理类被称为算子。例如从计数服务中获得点击量等属性,从内容中台获得作者信息等属性,还有根据已获得属性进行后处理的算子。为了加快单个视频的执行速度,采用了 DAG 有向无环图的方式来构建算子之间的依赖关系,并使用线程池来执行任务。将无前置依赖的算子加入线程池中,并增加回调函数。执行结束后,通过 AtomicInteger 更新后置算子的入度,若入度为 0,则将后置算子加入到线程池中执行。
为什么要单独处理?受众不同,VIVO、OPPO、小米这些平台用户不同,为了让用户存留时间更久,模型需要分开训练;推荐的限制条件不同,所以也有不同的数据处理。
其中离线数据的数据源有三种,一种是新产生的短视频,一种是全量回扫的短视频,一种是属性更新需重新计算的短视频。三种数据源对应三个 Kafka Topic,均是由其他部门写入,我们读取数据进行处理。离线数据属性的存储从数据的使用方向又可以被分为候选数据、正排数据、倒排数据。候选数据用于推荐的候选阶段,这个阶段将所有的视频都作为输入,获得一个初步的推荐结果,如 3000 个。正排数据用于推荐的正排阶段,这个阶段在初步的推荐结果的基础上,进行进一步筛选,获得一个相对精确的推荐结果,如 300 个。同样也用于精排阶段,进一步筛选。倒排主要用于推荐某个特定类别的情况,当前没有用到。
latency 降低原因、staytime 提升原因,怎么量化正向显著,量化指标是怎么生成的,
首先,这些指标是线上服务所使用的指标。
latency 降低的原因在于,之前存储是在两个服务上,现在将其存储到了一个服务上,调用次数降低了。
staytime 降低的原因在于,短视频采用了新的数据回扫流,这个回扫流覆盖的视频量更多,优质内容更多,用户观看时间增加。
指标正向显著:通过假设检验计算出 p 值,如果 p 值小于 0.05,那么认为是正向显著。
指标生成有多种,有的是推荐服务中进行打点,如 latency;有的是上游后端业务方中进行打点。
数据流过程中是如何存储属性的
使用一个 Context 类,类内通过 ConcurentHashMap 存储。
其他注意事项
- 增加图文视频场景数据时,提前查看了当时的数据量消耗情况,以及当前的消费负载情况。具体数据没有记录,有些不清晰了。小说数据大概 20-30 万。千万条图片视频数据,大概。
- 小说数据大概 2 小时处理完成,之后算法侧将数据每天上午 10 点和下午 4 点分别导入 kafka 中。kafka 偏移量先递增再递减。
- 从多个数据源读取数据时,顺序是如何的呢?Kafka Consumer Group 配置多个 Topic?还是每个任务都会创建一个程序去执行?
- 数据如果处理失败,会放到 Kafka 的一个 topic 中后续再处理。
- 数据处理完存放在哪里呢?字节的一个存储服务(其实我也不知道)
- 时间信息中,有一个图,是先增加,然后持平,然后降低,然后为 0. 这样可以保证 kafka 数据都可以被消费,不存在一直没更新数据的消息。