Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: RTPInputV2 support weighted tag feature & sequence feature & negative sampler #297

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions easy_rec/python/compat/feature_column/feature_column_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3595,6 +3595,10 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections,
layer_utils.append_tensor_to_collection(
compat_ops.GraphKeys.RANK_SERVICE_EMBEDDING, embedding_attrs['name'],
'input', sparse_tensors.id_tensor)
if sparse_tensors.weight_tensor is not None:
layer_utils.append_tensor_to_collection(
compat_ops.GraphKeys.RANK_SERVICE_EMBEDDING, embedding_attrs['name'],
'weighted_input', sparse_tensors.weight_tensor)

return predictions

Expand Down
4 changes: 2 additions & 2 deletions easy_rec/python/core/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def get(self, ids):
sampled_values = tf.py_func(self._get_impl, [ids], self._attr_tf_types)
result_dict = {}
for k, t, v in zip(self._attr_names, self._attr_tf_types, sampled_values):
v.set_shape([self._num_sample])
v.set_shape([None])
result_dict[k] = v
return result_dict

Expand Down Expand Up @@ -508,7 +508,7 @@ def get(self, src_ids, dst_ids):
self._attr_tf_types)
result_dict = {}
for k, t, v in zip(self._attr_names, self._attr_tf_types, sampled_values):
v.set_shape([self._num_sample])
v.set_shape([None])
result_dict[k] = v
return result_dict

Expand Down
219 changes: 153 additions & 66 deletions easy_rec/python/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,100 +262,100 @@ def _get_labels(self, fields):
])

def _preprocess(self, field_dict):
"""Preprocess the feature columns.

preprocess some feature columns, such as TagFeature or LookupFeature,
it is expected to handle batch inputs and single input,
it could be customized in subclasses
"""Preprocess the feature columns with negative sampling."""
parsed_dict = {}
neg_samples = self._maybe_negative_sample(field_dict)
if neg_samples:
for k, v in neg_samples.items():
if k in field_dict:
field_dict[k] = tf.concat([field_dict[k], v], axis=0)
else:
print('appended fields: %s' % k)
parsed_dict[k] = v
self._appended_fields.append(k)
for k, v in self._preprocess_without_negative_sample(field_dict).items():
parsed_dict[k] = v
return parsed_dict

Args:
field_dict: string to tensor, tensors are dense,
could be of shape [batch_size], [batch_size, None], or of shape []
def _maybe_negative_sample(self, field_dict):
"""Negative sampling.

Returns:
output_dict: some of the tensors are transformed into sparse tensors,
such as input tensors of tag features and lookup features
output_dict: if negative sampling is enabled, sampled fields dict is
returned. otherwise None is returned.
"""
parsed_dict = {}

if self._sampler is not None and self._mode != tf.estimator.ModeKeys.PREDICT:
if self._mode != tf.estimator.ModeKeys.TRAIN:
self._sampler.set_eval_num_sample()
sampler_type = self._data_config.WhichOneof('sampler')
sampler_config = getattr(self._data_config, sampler_type)
item_ids = field_dict[sampler_config.item_id_field]
item_ids = self._maybe_squeeze_input(
field_dict[sampler_config.item_id_field], name='item_id')
if sampler_type in ['negative_sampler', 'negative_sampler_in_memory']:
sampled = self._sampler.get(item_ids)
elif sampler_type == 'negative_sampler_v2':
user_ids = field_dict[sampler_config.user_id_field]
user_ids = self._maybe_squeeze_input(
field_dict[sampler_config.user_id_field], name='user_id')
sampled = self._sampler.get(user_ids, item_ids)
elif sampler_type.startswith('hard_negative_sampler'):
user_ids = field_dict[sampler_config.user_id_field]
user_ids = self._maybe_squeeze_input(
field_dict[sampler_config.user_id_field], name='user_id')
sampled = self._sampler.get(user_ids, item_ids)
else:
raise ValueError('Unknown sampler %s' % sampler_type)
for k, v in sampled.items():
if k in field_dict:
field_dict[k] = tf.concat([field_dict[k], v], axis=0)
else:
print('appended fields: %s' % k)
parsed_dict[k] = v
self._appended_fields.append(k)
return sampled
else:
return None

def _preprocess_without_negative_sample(self,
field_dict,
ignore_absent_fields=False):
"""Preprocess the feature columns.

preprocess some feature columns, such as TagFeature or LookupFeature,
it is expected to handle batch inputs and single input,
it could be customized in subclasses

Args:
field_dict: string to tensor, tensors are dense,
could be of shape [batch_size], [batch_size, None], or of shape []

Returns:
output_dict: some of the tensors are transformed into sparse tensors,
such as input tensors of tag features and lookup features
"""
parsed_dict = {}

for fc in self._feature_configs:
feature_name = fc.feature_name
feature_type = fc.feature_type
absent_input_names = []
for input_name in fc.input_names:
if input_name not in field_dict:
absent_input_names.append(input_name)
if absent_input_names:
if ignore_absent_fields:
continue
else:
raise KeyError('feature [{}] lacks input [{}]'.format(
feature_name, ', '.join(absent_input_names)))
input_0 = fc.input_names[0]
if feature_type == fc.TagFeature:
input_0 = fc.input_names[0]
field = field_dict[input_0]
# Construct the output of TagFeature according to the dimension of field_dict.
# When the input field exceeds 2 dimensions, convert TagFeature to 2D output.

if fc.HasField('kv_separator') and len(fc.input_names) > 1:
assert False, 'Tag Feature Error, ' \
'Cannot set kv_separator and multi input_names in one feature config. Feature: %s.' % input_0

if len(field.get_shape()) < 2 or field.get_shape()[-1] == 1:
# Construct the output of TagFeature according to the dimension of field_dict.
# When the input field exceeds 2 dimensions, convert TagFeature to 2D output.
if len(field.get_shape()) == 0:
field = tf.expand_dims(field, axis=0)
elif len(field.get_shape()) == 2:
field = tf.squeeze(field, axis=-1)
if fc.HasField('kv_separator') and len(fc.input_names) > 1:
assert False, 'Tag Feature Error, ' \
'Cannot set kv_separator and multi input_names in one feature config. Feature: %s.' % input_0
parsed_dict[input_0] = tf.string_split(field, fc.separator)
if fc.HasField('kv_separator'):
indices = parsed_dict[input_0].indices
tmp_kvs = parsed_dict[input_0].values
tmp_kvs = tf.string_split(
tmp_kvs, fc.kv_separator, skip_empty=False)
tmp_kvs = tf.reshape(tmp_kvs.values, [-1, 2])
tmp_ks, tmp_vs = tmp_kvs[:, 0], tmp_kvs[:, 1]

check_list = [
tf.py_func(
check_string_to_number, [tmp_vs, input_0], Tout=tf.bool)
] if self._check_mode else []
with tf.control_dependencies(check_list):
tmp_vs = tf.string_to_number(
tmp_vs, tf.float32, name='kv_tag_wgt_str_2_flt_%s' % input_0)
parsed_dict[input_0] = tf.sparse.SparseTensor(
indices, tmp_ks, parsed_dict[input_0].dense_shape)
input_wgt = input_0 + '_WEIGHT'
parsed_dict[input_wgt] = tf.sparse.SparseTensor(
indices, tmp_vs, parsed_dict[input_0].dense_shape)
self._appended_fields.append(input_wgt)
if not fc.HasField('hash_bucket_size'):
check_list = [
tf.py_func(
check_string_to_number,
[parsed_dict[input_0].values, input_0],
Tout=tf.bool)
] if self._check_mode else []
with tf.control_dependencies(check_list):
vals = tf.string_to_number(
parsed_dict[input_0].values,
tf.int32,
name='tag_fea_%s' % input_0)
parsed_dict[input_0] = tf.sparse.SparseTensor(
parsed_dict[input_0].indices, vals,
parsed_dict[input_0].dense_shape)
if len(fc.input_names) > 1:
input_1 = fc.input_names[1]
field = field_dict[input_1]
Expand All @@ -382,11 +382,62 @@ def _preprocess(self, field_dict):
tf.identity(field_vals),
field.dense_shape)
parsed_dict[input_1] = field
elif isinstance(field, tf.SparseTensor):
# filter out empty values
nonempty_selection = tf.where(tf.not_equal(field.values, ''))[:, 0]
parsed_dict[input_0] = tf.sparse.SparseTensor(
indices=tf.gather(field.indices, nonempty_selection),
values=tf.gather(field.values, nonempty_selection),
dense_shape=field.dense_shape)
if len(fc.input_names) > 1:
input_1 = fc.input_names[1]
parsed_dict[input_1] = tf.sparse.SparseTensor(
indices=tf.gather(field_dict[input_1].indices,
nonempty_selection),
values=tf.gather(field_dict[input_1].values,
nonempty_selection),
dense_shape=field_dict[input_1].dense_shape)
else:
parsed_dict[input_0] = field_dict[input_0]
parsed_dict[input_0] = field
if len(fc.input_names) > 1:
input_1 = fc.input_names[1]
parsed_dict[input_1] = field_dict[input_1]

if fc.HasField('kv_separator'):
indices = parsed_dict[input_0].indices
tmp_kvs = parsed_dict[input_0].values
# split into keys and values
tmp_kvs = tf.string_split(tmp_kvs, fc.kv_separator, skip_empty=False)
tmp_kvs = tf.reshape(tmp_kvs.values, [-1, 2])
tmp_ks, tmp_vs = tmp_kvs[:, 0], tmp_kvs[:, 1]
check_list = [
tf.py_func(
check_string_to_number, [tmp_vs, input_0], Tout=tf.bool)
] if self._check_mode else []
with tf.control_dependencies(check_list):
tmp_vs = tf.string_to_number(
tmp_vs, tf.float32, name='kv_tag_wgt_str_2_flt_%s' % input_0)
parsed_dict[input_0] = tf.sparse.SparseTensor(
indices, tmp_ks, parsed_dict[input_0].dense_shape)
input_wgt = input_0 + '_WEIGHT'
parsed_dict[input_wgt] = tf.sparse.SparseTensor(
indices, tmp_vs, parsed_dict[input_0].dense_shape)
self._appended_fields.append(input_wgt)
if not fc.HasField('hash_bucket_size'):
check_list = [
tf.py_func(
check_string_to_number,
[parsed_dict[input_0].values, input_0],
Tout=tf.bool)
] if self._check_mode else []
with tf.control_dependencies(check_list):
vals = tf.string_to_number(
parsed_dict[input_0].values,
tf.int32,
name='tag_fea_%s' % input_0)
parsed_dict[input_0] = tf.sparse.SparseTensor(
parsed_dict[input_0].indices, vals,
parsed_dict[input_0].dense_shape)
elif feature_type == fc.LookupFeature:
assert feature_name is not None and feature_name != ''
assert len(fc.input_names) == 2
Expand Down Expand Up @@ -708,8 +759,12 @@ def _preprocess(self, field_dict):

if self._data_config.HasField('sample_weight'):
if self._mode != tf.estimator.ModeKeys.PREDICT:
parsed_dict[constant.SAMPLE_WEIGHT] = field_dict[
self._data_config.sample_weight]
if self._data_config.sample_weight in field_dict:
parsed_dict[constant.SAMPLE_WEIGHT] = field_dict[
self._data_config.sample_weight]
elif not ignore_absent_fields:
raise KeyError('sample weight field [{}] is absent'.format(
self._data_config.sample_weight))
return parsed_dict

def _lookup_preprocess(self, fc, field_dict):
Expand Down Expand Up @@ -829,3 +884,35 @@ def _input_fn(mode=None, params=None, config=None):

_input_fn.input_creator = self
return _input_fn

def _maybe_squeeze_input(self, tensor, name=None):
default_value = None
if isinstance(tensor, tf.SparseTensor):
if tensor.dtype == tf.string:
default_value = ''
elif tensor.dtype.is_integer:
default_value = -1
else:
default_value = tensor.dtype.as_numpy_dtype()
with tf.name_scope('squeeze_input/{}'.format(name)):
rank = len(tensor.get_shape())
if rank != 1:
tensor_shape = tf.shape(tensor, out_type=tf.int64)
check_list = [
tf.assert_equal(
tf.reduce_prod(tensor_shape[1:]),
tf.constant(1, dtype=tensor_shape.dtype),
message='{} must not have multi values'.format(name))
]
with tf.control_dependencies(check_list):
if isinstance(tensor, tf.SparseTensor):
return tf.sparse_to_dense(
tensor.indices[:, :1], [tensor_shape[0]],
tensor.values,
default_value=default_value)
else:
return tf.reshape(tensor, [tensor_shape[0]])
elif isinstance(tensor, tf.SparseTensor):
return tf.sparse.to_dense(tensor, default_value=default_value)
else:
return tensor
Loading