本
文
摘
要
一、背景
多任务学习通过在一个模型中同时学习多个不同的目标,如CTR和CVR,最近被越来越多的应用到线上的推荐系统中。当不同的学习任务之间较为相关时,多任务学习可以通过任务之间的信息共享,来提升学习的效率。但通常情况下,任务之间的相关性并不强,有时候甚至是有冲突的,此时应用多任务学习可能带来负迁移(negative transfer)现象,也就是说,相关性不强的任务之间的信息共享,会影响网络的表现。
PLE模型是腾讯发表在RecSys ’20上的文章,这篇paper获得了recsys’20的best paper award。这篇文章号称极大的缓解了多任务学习中存在的两大顽疾:负迁移(negative transfer)和跷跷板(seesaw phenomenon)的现象,由此带来了相比较其他MTL模型比较大的性能提升。从论文呈现的实验结果也确实是这样的,但从模型结构上来看,更像是大力出奇迹,即性能的提升是由参数量变多而带来的。这篇paper能拿best paper,一方面是实验结果所呈现出来的比较大的性能提升,另一方面是数据分析做的很好,实验做的也很全,因此看起来工作做的很扎实,这是非常值得学习的地方。
多任务学习相对于多个单任务学习的模型,往往能够提升一部分任务的效果,同时牺牲另外部分任务的效果。即使通过MMOE这种方式减轻负迁移现象,跷跷板现象仍然是广泛存在的。论文提出了Progressive Layered Extraction (简称PLE),来解决多任务学习的跷跷板现象。详细情况可查看论文:
先来说说多任务学习领域中存在的两大问题:
负迁移(negative transfer):MTL(mul task learning)提出来的目的是为了不同任务,尤其是数据量较少的任务可以借助transfer learning(通过共享embedding,当然你也可以不仅共享embedding,再往上共享基层全连接网络等等这些很常见的操作)。但经常事与愿违,当两个任务之间的相关性很弱(比如一个任务是判断一张图片是否是狗,另一个任务是判断是否是飞机)或者非常复杂时,往往发生负迁移,即共享了之后效果反而很差,还不如不共享。跷跷板现象:还是当两个task之间相关性很弱或者很复杂时,往往出现的现象是:一个task性能的提升是通过损害另一个task的性能做到的。这种现象存在很久,PLE论文里给它起了个非常贴切的名字“跷跷板”,想象一下:即一边高,另一边就低。此前已经有部分研究来减轻负迁移现象,如谷歌提出的MMOE模型。
先来从整体上看看这个模型,大家自行回顾一下MMOE
可能就能体会到我前面说PLE性能提升更像是复杂参数所带来的这句话了,因为粗略的看,PLE做了deep化(expert完后再来一层expert),显然要比浅层的效果来得好。
二、多任务学习
2.1 Single-Level MTL Models
主要包含以下几种形式:
1)Hard Parameter Sharing:这也是最为常见的MTL模型,不同的任务底层的模块是共享的,然后共享层的输出分别输入到不同任务的独有模块中,得到各自的输出。当两个任务相关性较高时,用这种结构往往可以取得不错的效果,但任务相关性不高时,会存在负迁移现象,导致效果不理想。
2)Asymmetry Sharing(不对称共享):可以看到,这种结构的MTL,不同任务的底层模块有各自对应的输出,但其中部分任务的输出会被其他任务所使用,而部分任务则使用自己独有的输出。哪部分任务使用其他任务的输出,则需要人为指定。
3)Customized Sharing(自定义共享):可以看到,这种结构的MTL,不同任务的底层模块不仅有各自独立的输出,还有共享的输出。2和3这两种结构同样是论文提出的,但不会过多的介绍。
4)MMOE:这种结构的MTL之前的文章中也都介绍过了,相信大家也比较熟悉。底层包含多个Expert,然后基于门控机制,不同任务会对不同Expert的输出进行过滤。
5)CGC:这是本文提出的结构,后文会进行详细的介绍,此处省略。
2.2 Multi-Level MTL Models
Multi-Level MTL Models主要包含以下几种形式:
1)Cross-Stitch Network(“十字绣”网络):出自论文《Cross-stitch Networks for Multi-task Learning》,上图中可能表示的不太清楚,可以参考下图:
从上面的公式中可以看出,当aBA或者aAB值为0时,说明两者没有共享的特征,相反的,当两者的值越大,说明共享部分越大。
2)Sluice Network(水闸网络):名字都比较有意思,哈哈。这个结构出自论文《Sluice networks: Learning what to share between loosely related tasks》,模型结构比较复杂,本文不做详述,感兴趣的同学可以阅读原文
3)ML-MMOE:这是MMOE的多级结构,不再赘述
4)PLE:CGC的进阶版本,同样是本文提出的结构,后文会进行详细的介绍,此处省略。
三、PLE模型原理(PROGRESSIVE LAYERED EXTRACTION)
3.1 seesaw phenomenon
我们先来看一下MTL中的seesaw phenomenon(跷跷板现象),论文主要基于腾讯视频推荐中的多任务学习为例进行介绍,其视频推荐架构如下图:
这里主要关注VCR和VTR两个任务。VCR任务可理解为视频完成度,假设10min的视频,观看了5min,则VCR=0.5。这是回归问题,并以MSE作为评估指标。VTR表示此次观看是否是一次有效观看,即观看时长是否在给定的阈值之上,这是二分类问题(如果没有观看,样本Label为0),并以AUC为评估指标。
两个任务之间的关系比较复杂。 首先,VTR的标签是播放动作和VCR的耦合结果,因为只有观看时间超过阈值的播放动作才被视为有效观看。 其次,播放动作的分布更加复杂,在存在WIFI时,部分场景有自动播放机制,这些样本就有较高的平均播放概率,而没有自动播放且需要人为显式点击的场景下,视频的平均播放概率则较低。
论文对比了上述所有结构的MTL在腾讯视频VCR和VTR两个任务上相对单任务模型的离线训练结果:
可以看到,几乎所有的网络结构都是在一个任务上表现优于单任务模型,而在另一个任务上表现差于单任务模型,这就是所谓的跷跷板现象。MMoE尽管有了一定的改进,在VTR上取得了不错的收益,但在VCR上的收益接近于0。MMoE模型存在以下两方面的缺点,首先,MMoE中所有的Expert是被所有任务所共享的,这可能无法捕捉到任务之间更复杂的关系,从而给部分任务带来一定的噪声;其次,不同的Expert之间也没有交互,联合优化的效果有所折扣。
针对以上两点,本文提出了PLE结构,在两个任务上都取得了相对单任务模型不错的收益,有效解决了跷跷板现象。
3.2 CGC模型原理(Customized Gate Control)
CGC可以看作是PLE的简单版本,本文先对其进行介绍,其结构如下图所示:
CGC可以看作是Customized Sharing和MMoE的结合版本。每个任务有共享的Expert和独有的Expert。对任务A来说,将Experts A里面的多个Expert的输出以及Experts Shared里面的多个Expert的输出,通过类似于MMoE的门控机制之后输入到任务A的上层网络中,计算公式如下:
其中, g^{k}(x) 是下层模块的输出,w^{k}(x) 是第k个任务针对不同Expert输出的权重, S^{k}(x) 则是第k个任务所用到的Expert的输出,例如对于任务A,使用Experts A和Experts Shared里面的多个Expert的输出。
3.3 Progressive Layered Extraction
在CGC的基础上,Progressive Layered Extraction(以下简称PLE)考虑了不同的Expert之间的交互,可以看作是Customized Sharing和ML-MMOE的结合版本,其结构图如下
3.4 MTL训练优化
传统的MTL的损失是各任务损失的加权和:
而在腾讯视频场景下,不同任务的样本空间是不一样的,比如计算视频的完成度,必须有视频点击行为才可以。不同任务的样本空间如下图所示:
解决样本空间不一致的问题,前面我们介绍过ESMM的方式。而本文则是在Loss上进行一定的优化,不同的任务仍使用其各自样本空间中的样本:
其中 \sigma_{k}^{i} 取值为0或1,表示第i个样本是否属于第k个任务的样本空间。
其次是不同任务之间权重的优化。关于MTL的权重设置,最常见的是人工设置,这需要不断的尝试来探索最优的权重组合,另一种则是阿里提出的通过帕累托最优来计算优化不同任务的权重。本文也是人工设置权重的方式,不过在不同的训练轮次,权重会进行改变。在每一轮,权重的计算如下:
上式中所有的参数均为人工设定的超参数。
四、实验结果
最后简单看一下实验结果。首先是离线的训练结果,表中的收益均是相较于单任务学习模型的:
接下来是线上A/B实验的结果:
可以看到,无论是离线训练还是线上A/B,PLE均取得了最佳的效果。
接下来,论文比较了在任务之间相关系数不同的情况下,Hard Parameter Sharing、MMOE和PLE的结果:
可以看到,无论任务之间的相关程度如何,PLE均取得了最优的效果。
最后,论文对比了MMOE和PLE不同Expert的输出均值,来比较不同模型的Expert利用率(expert utilization)。为方便比较,将MMOE的Expert设置为3个,而PLE&CGC中,每个任务独有的Expert为1个,共享的为1个。这样不同模型都是有三个Expert。结果如下:
可以看到,无论是MMOE还是ML-MMOE,不同任务在三个Expert上的权重都是接近的,这其实更接近于一种Hard Parameter Sharing的方式,但对于CGC&PLE来说,不同任务在共享Expert上的权重是有较大差异的,其针对不同的任务,能够有效利用共享Expert和独有Expert的信息,这也解释了为什么其能够达到比MMOE更好的训练结果。
五、代码实践
人生就苦短,我选API!
5.1 DeepCTR
import tensorflow as tf from ...feature_column import build_input_features, input_from_feature_columns from ...layers.core import PredictionLayer, DNN from ...layers.utils import combined_dnn_input, reduce_sum def PLE(dnn_feature_columns, shared_expert_num=1, specific_expert_num=1, num_levels=2, expert_dnn_hidden_units=(256,), tower_dnn_hidden_units=(64,), gate_dnn_hidden_units=(), l2_reg_embedding=0.00001, l2_reg_dnn=0, seed=1024, dnn_dropout=0, dnn_activation=relu, dnn_use_bn=False, task_types=(binary, binary), task_names=(ctr, ctcvr)): """Instantiates the multi level of Customized Gate Control of Progressive Layered Extraction architecture. :param dnn_feature_columns: An iterable containing all the features used by deep part of the model. :param shared_expert_num: integer, number of task-shared experts. :param specific_expert_num: integer, number of task-specific experts. :param num_levels: integer, number of CGC levels. :param expert_dnn_hidden_units: list,list of positive integer or empty list, the layer number and units in each layer of expert DNN. :param tower_dnn_hidden_units: list,list of positive integer or empty list, the layer number and units in each layer of task-specific DNN. :param gate_dnn_hidden_units: list,list of positive integer or empty list, the layer number and units in each layer of gate DNN. :param l2_reg_embedding: float. L2 regularizer strength applied to embedding vector. :param l2_reg_dnn: float. L2 regularizer strength applied to DNN. :param seed: integer ,to use as random seed. :param dnn_dropout: float in [0,1), the probability we will drop out a given DNN coordinate. :param dnn_activation: Activation function to use in DNN. :param dnn_use_bn: bool. Whether use BatchNormalization before activation or not in DNN. :param task_types: list of str, indicating the loss of each tasks, ``"binary"`` for binary logloss, ``"regression"`` for regression loss. e.g. [binary, regression] :param task_names: list of str, indicating the predict target of each tasks :return: a Keras model instance. """ num_tasks = len(task_names) if num_tasks <= 1: raise ValueError("num_tasks must be greater than 1") if len(task_types) != num_tasks: raise ValueError("num_tasks must be equal to the length of task_types") for task_type in task_types: if task_type not in [binary, regression]: raise ValueError("task must be binary or regression, {} is illegal".format(task_type)) features = build_input_features(dnn_feature_columns) inputs_list = list(features.values()) sparse_embedding_list, dense_value_list = input_from_feature_columns(features, dnn_feature_columns, l2_reg_embedding, seed) dnn_input = combined_dnn_input(sparse_embedding_list, dense_value_list) # single Extraction Layer def cgc_net(inputs, level_name, is_last=False): # inputs: [task1, task2, ... taskn, shared task] specific_expert_outputs = [] # build task-specific expert layer for i in range(num_tasks): for j in range(specific_expert_num): expert_network = DNN(expert_dnn_hidden_units, dnn_activation, l2_reg_dnn, dnn_dropout, dnn_use_bn, seed=seed, name=level_name + task_ + task_names[i] + _expert_specific_ + str(j))( inputs[i]) specific_expert_outputs.append(expert_network) # build task-shared expert layer shared_expert_outputs = [] for k in range(shared_expert_num): expert_network = DNN(expert_dnn_hidden_units, dnn_activation, l2_reg_dnn, dnn_dropout, dnn_use_bn, seed=seed, name=level_name + expert_shared_ + str(k))(inputs[-1]) shared_expert_outputs.append(expert_network) # task_specific gate (count = num_tasks) cgc_outs = [] for i in range(num_tasks): # concat task-specific expert and task-shared expert cur_expert_num = specific_expert_num + shared_expert_num # task_specific + task_shared cur_experts = specific_expert_outputs[ i * specific_expert_num:(i + 1) * specific_expert_num] + shared_expert_outputs expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(cur_experts) # build gate layers gate_input = DNN(gate_dnn_hidden_units, dnn_activation, l2_reg_dnn, dnn_dropout, dnn_use_bn, seed=seed, name=level_name + gate_specific_ + task_names[i])( inputs[i]) # gate[i] for task input[i] gate_out = tf.keras.layers.Dense(cur_expert_num, use_bias=False, activation=softmax, name=level_name + gate_softmax_specific_ + task_names[i])(gate_input) gate_out = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(gate_out) # gate multiply the expert gate_mul_expert = tf.keras.layers.Lambda(lambda x: reduce_sum(x[0] * x[1], axis=1, keep_dims=False), name=level_name + gate_mul_expert_specific_ + task_names[i])( [expert_concat, gate_out]) cgc_outs.append(gate_mul_expert) # task_shared gate, if the level not in last, add one shared gate if not is_last: cur_expert_num = num_tasks * specific_expert_num + shared_expert_num cur_experts = specific_expert_outputs + shared_expert_outputs # all the expert include task-specific expert and task-shared expert expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(cur_experts) # build gate layers gate_input = DNN(gate_dnn_hidden_units, dnn_activation, l2_reg_dnn, dnn_dropout, dnn_use_bn, seed=seed, name=level_name + gate_shared)(inputs[-1]) # gate for shared task input gate_out = tf.keras.layers.Dense(cur_expert_num, use_bias=False, activation=softmax, name=level_name + gate_softmax_shared)(gate_input) gate_out = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(gate_out) # gate multiply the expert gate_mul_expert = tf.keras.layers.Lambda(lambda x: reduce_sum(x[0] * x[1], axis=1, keep_dims=False), name=level_name + gate_mul_expert_shared)( [expert_concat, gate_out]) cgc_outs.append(gate_mul_expert) return cgc_outs # build Progressive Layered Extraction ple_inputs = [dnn_input] * (num_tasks + 1) # [task1, task2, ... taskn, shared task] ple_outputs = [] for i in range(num_levels): if i == num_levels - 1: # the last level ple_outputs = cgc_net(inputs=ple_inputs, level_name=level_ + str(i) + _, is_last=True) else: ple_outputs = cgc_net(inputs=ple_inputs, level_name=level_ + str(i) + _, is_last=False) ple_inputs = ple_outputs task_outs = [] for task_type, task_name, ple_out in zip(task_types, task_names, ple_outputs): # build tower layer tower_output = DNN(tower_dnn_hidden_units, dnn_activation, l2_reg_dnn, dnn_dropout, dnn_use_bn, seed=seed, name=tower_ + task_name)(ple_out) logit = tf.keras.layers.Dense(1, use_bias=False, activation=None)(tower_output) output = PredictionLayer(task_type, name=task_name)(logit) task_outs.append(output) model = tf.keras.models.Model(inputs=inputs_list, outputs=task_outs) return model5.2 PaddlePaddle
import paddle import paddle.nn as nn import paddle.nn.functional as F class PLELayer(nn.Layer): def __init__(self, feature_size, task_num, exp_per_task, shared_num, expert_size, tower_size, level_number): super(PLELayer, self).__init__() self.task_num = task_num self.exp_per_task = exp_per_task self.shared_num = shared_num self.expert_size = expert_size self.tower_size = tower_size self.level_number = level_number # ple layer self.ple_layers = [] for i in range(0, self.level_number): if i == self.level_number - 1: ple_layer = self.add_sublayer( name=lev_ + str(i), sublayer=SinglePLELayer( feature_size, task_num, exp_per_task, shared_num, expert_size, lev_ + str(i), True)) self.ple_layers.append(ple_layer) break else: ple_layer = self.add_sublayer( name=lev_ + str(i), sublayer=SinglePLELayer( feature_size, task_num, exp_per_task, shared_num, expert_size, lev_ + str(i), False)) self.ple_layers.append(ple_layer) feature_size = expert_size # task tower self._param_tower = [] self._param_tower_out = [] for i in range(0, self.task_num): linear = self.add_sublayer( name=tower_ + str(i), sublayer=nn.Linear( expert_size, tower_size, weight_attr=nn.initializer.Constant(value=0.1), bias_attr=nn.initializer.Constant(value=0.1), #bias_attr=paddle.ParamAttr(learning_rate=1.0), name=tower_ + str(i))) self._param_tower.append(linear) linear = self.add_sublayer( name=tower_out_ + str(i), sublayer=nn.Linear( tower_size, 2, weight_attr=nn.initializer.Constant(value=0.1), bias_attr=nn.initializer.Constant(value=0.1), name=tower_out_ + str(i))) self._param_tower_out.append(linear) def forward(self, input_data): inputs_ple = [] # task_num part + shared part for i in range(0, self.task_num + 1): inputs_ple.append(input_data) # multiple ple layer ple_out = [] for i in range(0, self.level_number): ple_out = self.ple_layers[i](inputs_ple) inputs_ple = ple_out #assert len(ple_out) == self.task_num output_layers = [] for i in range(0, self.task_num): cur_tower = self._param_tower[i](ple_out[i]) cur_tower = F.relu(cur_tower) out = self._param_tower_out[i](cur_tower) out = F.softmax(out) out = paddle.clip(out, min=1e-15, max=1.0 - 1e-15) output_layers.append(out) return output_layers class SinglePLELayer(nn.Layer): def __init__(self, input_feature_size, task_num, exp_per_task, shared_num, expert_size, level_name, if_last): super(SinglePLELayer, self).__init__() self.task_num = task_num self.exp_per_task = exp_per_task self.shared_num = shared_num self.expert_size = expert_size self.if_last = if_last self._param_expert = [] # task-specific expert part for i in range(0, self.task_num): for j in range(0, self.exp_per_task): linear = self.add_sublayer( name=level_name + "_exp_" + str(i) + "_" + str(j), sublayer=nn.Linear( input_feature_size, expert_size, weight_attr=nn.initializer.Constant(value=0.1), bias_attr=nn.initializer.Constant(value=0.1), name=level_name + "_exp_" + str(i) + "_" + str(j))) self._param_expert.append(linear) # shared expert part for i in range(0, self.shared_num): linear = self.add_sublayer( name=level_name + "_exp_shared_" + str(i), sublayer=nn.Linear( input_feature_size, expert_size, weight_attr=nn.initializer.Constant(value=0.1), bias_attr=nn.initializer.Constant(value=0.1), name=level_name + "_exp_shared_" + str(i))) self._param_expert.append(linear) # task gate part self._param_gate = [] cur_expert_num = self.exp_per_task + self.shared_num for i in range(0, self.task_num): linear = self.add_sublayer( name=level_name + "_gate_" + str(i), sublayer=nn.Linear( input_feature_size, cur_expert_num, weight_attr=nn.initializer.Constant(value=0.1), bias_attr=nn.initializer.Constant(value=0.1), name=level_name + "_gate_" + str(i))) self._param_gate.append(linear) # shared gate if not if_last: cur_expert_num = self.task_num * self.exp_per_task + self.shared_num linear = self.add_sublayer( name=level_name + "_gate_shared_", sublayer=nn.Linear( input_feature_size, cur_expert_num, weight_attr=nn.initializer.Constant(value=0.1), bias_attr=nn.initializer.Constant(value=0.1), name=level_name + "_gate_shared_")) self._param_gate_shared = linear def forward(self, input_data): expert_outputs = [] # task-specific expert part for i in range(0, self.task_num): for j in range(0, self.exp_per_task): linear_out = self._param_expert[i * self.task_num + j]( input_data[i]) expert_output = F.relu(linear_out) expert_outputs.append(expert_output) # shared expert part for i in range(0, self.shared_num): linear_out = self._param_expert[self.exp_per_task * self.task_num + i](input_data[-1]) expert_output = F.relu(linear_out) expert_outputs.append(expert_output) # task gate part outputs = [] for i in range(0, self.task_num): cur_expert_num = self.exp_per_task + self.shared_num linear_out = self._param_gate[i](input_data[i]) cur_gate = F.softmax(linear_out) cur_gate = paddle.reshape(cur_gate, [-1, cur_expert_num, 1]) # f^{k}(x) = sum_{i=1}^{n}(g^{k}(x)_{i} * f_{i}(x)) cur_experts = expert_outputs[i * self.exp_per_task:( i + 1) * self.exp_per_task] + expert_outputs[-int( self.shared_num):] expert_concat = paddle.concat(x=cur_experts, axis=1) expert_concat = paddle.reshape( expert_concat, [-1, cur_expert_num, self.expert_size]) cur_gate_expert = paddle.multiply(x=expert_concat, y=cur_gate) cur_gate_expert = paddle.sum(x=cur_gate_expert, axis=1) outputs.append(cur_gate_expert) # shared gate if not self.if_last: cur_expert_num = self.task_num * self.exp_per_task + self.shared_num linear_out = self._param_gate_shared(input_data[-1]) cur_gate = F.softmax(linear_out) cur_gate = paddle.reshape(cur_gate, [-1, cur_expert_num, 1]) cur_experts = expert_outputs expert_concat = paddle.concat(x=cur_experts, axis=1) expert_concat = paddle.reshape( expert_concat, [-1, cur_expert_num, self.expert_size]) cur_gate_expert = paddle.multiply(x=expert_concat, y=cur_gate) cur_gate_expert = paddle.sum(x=cur_gate_expert, axis=1) outputs.append(cur_gate_expert) return outputsloss函数代码:
def net(self, inputs, is_infer=False): input_data = inputs[0] label_income = inputs[1] label_marital = inputs[2] PLE = PLELayer(self.feature_size, self.task_num, self.exp_per_task, self.shared_num, self.expert_size, self.tower_size, self.level_number) pred_income, pred_marital = PLE.forward(input_data) pred_income_1 = paddle.slice( pred_income, axes=[1], starts=[1], ends=[2]) pred_marital_1 = paddle.slice( pred_marital, axes=[1], starts=[1], ends=[2]) auc_income, batch_auc_1, auc_states_1 = paddle.static.auc( #auc_income = AUC( input=pred_income, label=paddle.cast( x=label_income, dtype=int64)) #auc_marital = AUC( auc_marital, batch_auc_2, auc_states_2 = paddle.static.auc( input=pred_marital, label=paddle.cast( x=label_marital, dtype=int64)) if is_infer: fetch_dict = {auc_income: auc_income, auc_marital: auc_marital} return fetch_dict cost_income = paddle.nn.functional.log_loss( input=pred_income_1, label=paddle.cast( label_income, dtype="float32")) cost_marital = paddle.nn.functional.log_loss( input=pred_marital_1, label=paddle.cast( label_marital, dtype="float32")) avg_cost_income = paddle.mean(x=cost_income) avg_cost_marital = paddle.mean(x=cost_marital) cost = avg_cost_income + avg_cost_marital self._cost = cost fetch_dict = { cost: cost, auc_income: auc_income, auc_marital: auc_marital } return fetch_dict