ml.rl.preprocessing package

Submodules

ml.rl.preprocessing.batch_preprocessor module

class ml.rl.preprocessing.batch_preprocessor.BatchPreprocessor

Bases: object

class ml.rl.preprocessing.batch_preprocessor.DiscreteDqnBatchPreprocessor(state_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor)

Bases: ml.rl.preprocessing.batch_preprocessor.BatchPreprocessor

class ml.rl.preprocessing.batch_preprocessor.ParametricDqnBatchPreprocessor(state_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor, action_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor)

Bases: ml.rl.preprocessing.batch_preprocessor.BatchPreprocessor

class ml.rl.preprocessing.batch_preprocessor.PolicyNetworkBatchPreprocessor(state_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor, action_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor)

Bases: ml.rl.preprocessing.batch_preprocessor.BatchPreprocessor

class ml.rl.preprocessing.batch_preprocessor.SequentialDiscreteDqnBatchPreprocessor(state_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor, action_dim: int, seq_len: int)

Bases: ml.rl.preprocessing.batch_preprocessor.DiscreteDqnBatchPreprocessor

class ml.rl.preprocessing.batch_preprocessor.SequentialParametricDqnBatchPreprocessor(state_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor, action_preprocessor: ml.rl.preprocessing.preprocessor.Preprocessor, seq_len: int)

Bases: ml.rl.preprocessing.batch_preprocessor.ParametricDqnBatchPreprocessor

ml.rl.preprocessing.feature_extractor module

class ml.rl.preprocessing.feature_extractor.FeatureExtractorBase(model_feature_config: Optional[ml.rl.types.ModelFeatureConfig] = None)

Bases: object

This is not a transformer because Caffe2 has a weird API. We cannot directly call functions. It’s easier to handle separately.

create_const(init_net, name, value, dtype=caffe2.python.core.DataType.FLOAT) → caffe2.python.core.BlobReference
create_empty_range(init_net: caffe2.python.core.Net) → caffe2.python.core.BlobReference
create_id_mapping(init_net: caffe2.python.core.Net, name: str, mapping: List[int]) → caffe2.python.core.BlobReference

Given the ID list in the mapping, create index from ID to its (1-base) index.

create_id_mappings(init_net: caffe2.python.core.Net, id_mapping_configs: Dict[str, ml.rl.types.IdMapping]) → Dict[str, caffe2.python.core.BlobReference]
abstract create_net() → ml.rl.preprocessing.feature_extractor.FeatureExtractorNet

Returns FeatureExtractorNet to perform feature extraction.

The returned net must have input & output record set so that it can be bound to actual inputs/outputs

extract(ws, extract_record)

If the extractor is to be run, e.g., by the reader, then subclass should implement

Parameters

extract_record (schema.Field) – the output the net

extract_float_features(net, name, field, keys_to_extract, missing_scalar) → Tuple[str, str]

Helper function to extract matrix from stacked sparse tensors

extract_id_list_features_ranges(net: caffe2.python.core.Net, name: str, field: caffe2.python.schema.List, feature_names: List[str], feature_ids: List[int], empty_range: caffe2.python.core.BlobReference) → Dict[str, Dict[str, caffe2.python.core.BlobReference]]

Convert the CSR-like format of ID-list to ranges and values. See https://caffe2.ai/docs/operators-catalogue#gatherranges

The return value is keyed by ID-list name

extract_id_score_list_features_ranges(net: caffe2.python.core.Net, name: str, field: caffe2.python.schema.List, feature_names: List[str], feature_ids: List[int], empty_range: caffe2.python.core.BlobReference) → Dict[str, Dict[str, caffe2.python.core.BlobReference]]

Convert the CSR-like format of ID-score-list to ranges and values. See https://caffe2.ai/docs/operators-catalogue#gatherranges

The return value is keyed by ID-score-list name

extract_sequence_float_features(net: caffe2.python.core.Net, name: str, sequence_feature_types: Dict[str, Type[ml.rl.types.SequenceFeatureBase]], field: caffe2.python.schema.List, empty_range: caffe2.python.schema.BlobReference, zero_float: caffe2.python.schema.BlobReference) → Dict[str, caffe2.python.core.BlobReference]

Convert CSR-like format of MAP<BIGINT, MAP<BIGINT, FLOAT>> to dictionary from sequence name to the blob containing the fixed-length sequence of vector of float features of each element. Each blob will be 3-D tensor. The first dimension is the batch size. The second dimension is each element in the list. The third dimension is ordered by the order given by SequenceFeatureBase.get_float_feature_infos(). These float features are not normalized.

extract_sequence_id_features(net: caffe2.python.core.Net, name: str, sequence_feature_types: Dict[str, Type[ml.rl.types.SequenceFeatureBase]], sequence_id_features: Dict[str, Dict[str, ml.rl.types.IdFeatureConfig]], field: caffe2.python.schema.List, empty_range: caffe2.python.schema.BlobReference, zero_int64: caffe2.python.schema.BlobReference) → Dict[str, Dict[str, caffe2.python.core.BlobReference]]

Convert CSR-like format of MAP<BIGINT, LIST<BIGINT>> to dictionary from sequence name to dictionary from ID-list name to the blob containing the fixed-length sequence of IDs. Each blob will be 2-D tensor. The first dimension is the batch size. The second dimension is each element in the list.

static fetch(ws, b, to_torch=True)
fetch_state_sequence_features(record: caffe2.python.schema.Struct, fetch_func) → ml.rl.types.SequenceFeatures

Pull the values from Caffe2’s blobs into PyTorch’s tensors.

get_state_sequence_features_schema(sequence_id_features: Dict[str, Dict[str, caffe2.python.core.BlobReference]], sequence_float_features: Dict[str, caffe2.python.core.BlobReference]) → caffe2.python.schema.Struct

Layout the record to match SequenceFeatures type. This is necessary to make ONNX exporting works.

map_ids(net: caffe2.python.core.Net, name: str, map_handler: caffe2.python.core.BlobReference, raw_ids: caffe2.python.core.BlobReference) → caffe2.python.core.BlobReference

Map raw ID to index (into embedding lookup table, usually)

map_sequence_id_features(net: caffe2.python.core.Net, name: str, map_handlers: Dict[str, caffe2.python.core.BlobReference], raw_sequence_id_features: Dict[str, Dict[str, caffe2.python.core.BlobReference]], sequence_id_feature_configs: Dict[str, Dict[str, ml.rl.types.IdFeatureConfig]]) → Dict[str, Dict[str, caffe2.python.core.BlobReference]]

Map raw IDs of all sequences’ ID features to index (into embedding lookup table)

range_to_dense(net: caffe2.python.core.Net, name: str, ranges: caffe2.python.core.BlobReference, values: caffe2.python.core.BlobReference, max_length: int, zero_val: caffe2.python.core.BlobReference) → caffe2.python.core.BlobReference

Convert batch of variable-length lists (in range format) to fixed-length lists.

read_actions_to_mask(net, name, num_actions, action, action_size_plus_one)
class ml.rl.preprocessing.feature_extractor.FeatureExtractorNet

Bases: tuple

init_net will only be run once. The external outputs of init_net are assumed to be parameters of net and will be saved in the predictor file. net should not use any parameters not initialized by init_net.

property init_net

Alias for field number 1

property net

Alias for field number 0

class ml.rl.preprocessing.feature_extractor.InputColumn

Bases: object

ACTION = 'action'
ACTION_PRESENCE = 'action_presence'
ACTION_PROBABILITY = 'action_probability'
MDP_ID = 'mdp_id'
METRICS = 'metrics'
NEXT_ACTION = 'next_action'
NEXT_ACTION_PRESENCE = 'next_action_presence'
NEXT_STATE_FEATURES = 'next_state_features'
NEXT_STATE_FEATURES_PRESENCE = 'next_state_features_presence'
NEXT_STATE_ID_LIST_FEATURES = 'next_state_id_list_features'
NEXT_STATE_ID_SCORE_LIST_FEATURES = 'next_state_id_score_list_features'
NOT_TERMINAL = 'not_terminal'
POSSIBLE_ACTIONS = 'possible_actions'
POSSIBLE_ACTIONS_MASK = 'possible_actions_mask'
POSSIBLE_ACTIONS_PRESENCE = 'possible_actions_presence'
POSSIBLE_NEXT_ACTIONS = 'possible_next_actions'
POSSIBLE_NEXT_ACTIONS_MASK = 'possible_next_actions_mask'
POSSIBLE_NEXT_ACTIONS_PRESENCE = 'possible_next_actions_presence'
REWARD = 'reward'
SEQUENCE_NUMBER = 'sequence_number'
STATE_FEATURES = 'state_features'
STATE_FEATURES_PRESENCE = 'state_features_presence'
STATE_ID_LIST_FEATURES = 'state_id_list_features'
STATE_ID_SCORE_LIST_FEATURES = 'state_id_score_list_features'
STEP = 'step'
TIME_DIFF = 'time_diff'
TIME_SINCE_FIRST = 'time_since_first'
class ml.rl.preprocessing.feature_extractor.PredictorFeatureExtractor(state_normalization_parameters: Dict[int, ml.rl.parameters.NormalizationParameters], action_normalization_parameters: Optional[Dict[int, ml.rl.parameters.NormalizationParameters]] = None, normalize: bool = True, set_missing_value_to_zero: bool = False, model_feature_config: Optional[ml.rl.types.ModelFeatureConfig] = None, use_time_since_first: Optional[bool] = None, time_since_first_normalization_parameters: Optional[ml.rl.parameters.NormalizationParameters] = None)

Bases: ml.rl.preprocessing.feature_extractor.FeatureExtractorBase

This class assumes that action is not in the input unless it’s parametric action.

The features (of both states & actions, if any) are expected to come in the following blobs: - input/float_features.keys - input/float_features.values - input/float_features.lengths

TODO: Support int features

create_net()

Returns FeatureExtractorNet to perform feature extraction.

The returned net must have input & output record set so that it can be bound to actual inputs/outputs

extract(ws, extract_record)

If the extractor is to be run, e.g., by the reader, then subclass should implement

Parameters

extract_record (schema.Field) – the output the net

class ml.rl.preprocessing.feature_extractor.TrainingFeatureExtractor(state_normalization_parameters: Dict[int, ml.rl.parameters.NormalizationParameters], action_normalization_parameters: Optional[Dict[int, ml.rl.parameters.NormalizationParameters]] = None, include_possible_actions: bool = True, normalize: bool = True, max_num_actions: int = None, set_missing_value_to_zero: bool = None, multi_steps: Optional[int] = None, metrics_to_score: Optional[List[str]] = None, model_feature_config: Optional[ml.rl.types.ModelFeatureConfig] = None, use_time_since_first: Optional[bool] = None, time_since_first_normalization_parameters: Optional[ml.rl.parameters.NormalizationParameters] = None)

Bases: ml.rl.preprocessing.feature_extractor.FeatureExtractorBase

Extract: - State - Action - Next state - Possible next actions/Next actions

create_net()

Returns FeatureExtractorNet to perform feature extraction.

The returned net must have input & output record set so that it can be bound to actual inputs/outputs

extract(ws, extract_record)

If the extractor is to be run, e.g., by the reader, then subclass should implement

Parameters

extract_record (schema.Field) – the output the net

class ml.rl.preprocessing.feature_extractor.WorldModelFeatureExtractor(seq_len, state_normalization_parameters: Dict[int, ml.rl.parameters.NormalizationParameters], action_normalization_parameters: Optional[Dict[int, ml.rl.parameters.NormalizationParameters]] = None, discrete_action_names: Optional[List[str]] = None, normalize: Optional[bool] = True)

Bases: ml.rl.preprocessing.feature_extractor.FeatureExtractorBase

Extract: - State - Action - Next state - Reward - Not terminal

create_net()

Returns FeatureExtractorNet to perform feature extraction.

The returned net must have input & output record set so that it can be bound to actual inputs/outputs

extract(ws, extract_record)

If the extractor is to be run, e.g., by the reader, then subclass should implement

Parameters

extract_record (schema.Field) – the output the net

ml.rl.preprocessing.feature_extractor.id_list_schema()
ml.rl.preprocessing.feature_extractor.id_score_list_schema()
ml.rl.preprocessing.feature_extractor.map_schema()

ml.rl.preprocessing.identify_types module

ml.rl.preprocessing.identify_types.identify_type(values, enum_threshold=100)

ml.rl.preprocessing.normalization module

class ml.rl.preprocessing.normalization.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: json.encoder.JSONEncoder

default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
ml.rl.preprocessing.normalization.construct_action_scale_tensor(action_norm_params, action_scale_overrides)

Construct tensors that will rescale each action value on each dimension i from [min_serving_value[i], max_serving_value[i]] to [-1, 1] for training.

ml.rl.preprocessing.normalization.deserialize(parameters_json) → Dict[int, ml.rl.parameters.NormalizationParameters]
ml.rl.preprocessing.normalization.get_action_output_parameters(action_normalization_parameters)
ml.rl.preprocessing.normalization.get_feature_norm_metadata(feature_name, feature_value_list, norm_params)
ml.rl.preprocessing.normalization.get_feature_start_indices(sorted_features, normalization_parameters)

Returns the starting index for each feature in the output feature vector

ml.rl.preprocessing.normalization.get_num_output_features(normalization_parameters)
ml.rl.preprocessing.normalization.identify_parameter(feature_name, values, max_unique_enum_values=100, quantile_size=20, quantile_k2_threshold=1000.0, skip_box_cox=False, skip_quantiles=False, feature_type=None)
ml.rl.preprocessing.normalization.no_op_feature()
ml.rl.preprocessing.normalization.serialize(parameters)
ml.rl.preprocessing.normalization.serialize_one(feature_parameters)
ml.rl.preprocessing.normalization.sort_features_by_normalization(normalization_parameters)

Helper function to return a sorted list from a normalization map. Also returns the starting index for each feature type

ml.rl.preprocessing.postprocessor module

class ml.rl.preprocessing.postprocessor.Postprocessor(normalization_parameters: Dict[int, ml.rl.parameters.NormalizationParameters], use_gpu: bool)

Bases: torch.nn.Module

Inverting action

forward(input: torch.Tensor) → torch.Tensor
input_prototype() → Tuple[torch.Tensor]

ml.rl.preprocessing.preprocessor module

class ml.rl.preprocessing.preprocessor.Preprocessor(normalization_parameters: Dict[int, ml.rl.parameters.NormalizationParameters], use_gpu: bool)

Bases: torch.nn.Module

forward(input: torch.Tensor, input_presence_byte: torch.Tensor) → torch.Tensor

Preprocess the input matrix :param input tensor

input_prototype() → Tuple[torch.Tensor, torch.Tensor]

ml.rl.preprocessing.preprocessor_net module

class ml.rl.preprocessing.preprocessor_net.PreprocessorNet

Bases: object

normalize_dense_matrix(input_matrix: str, features: List[int], normalization_parameters: Dict[int, ml.rl.parameters.NormalizationParameters], blobname_prefix: str, split_expensive_feature_groups: bool) → Tuple[str, List[str]]

Normalizes inputs according to parameters. Expects a dense matrix whose ith column corresponds to feature i.

Note that the Caffe2 BatchBoxCox operator isn’t implemented on CUDA GPU so we need to use a CPU context.

Parameters
  • input_matrix – Input matrix to normalize.

  • features – Array that maps feature ids to column indices.

  • normalization_parameters – Mapping from feature names to NormalizationParameters.

  • blobname_prefix – Prefix for input blobs to norm_net.

  • num_output_features – The number of features in an output processed datapoint. If set to None, this function will compute it.

preprocess_blob(blob, normalization_parameters)

Takes in a blob and its normalization parameters. Outputs a tuple whose first element is a blob containing the normalized input blob and whose second element contains all the parameter blobs used to create it.

Call this from a CPU context and ensure the input blob exists in it.

ml.rl.preprocessing.sparse_to_dense module

class ml.rl.preprocessing.sparse_to_dense.Caffe2SparseToDenseProcessor(sorted_features: List[int], set_missing_value_to_zero: bool = False)

Bases: ml.rl.preprocessing.sparse_to_dense.SparseToDenseProcessor

process(sparse_data: ml.rl.caffe_utils.StackedAssociativeArray) → Tuple[str, str, List[str]]
class ml.rl.preprocessing.sparse_to_dense.PandasSparseToDenseProcessor(sorted_features: List[int], set_missing_value_to_zero: bool = False)

Bases: ml.rl.preprocessing.sparse_to_dense.SparseToDenseProcessor

process(sparse_data) → Tuple[torch.Tensor, torch.Tensor]
class ml.rl.preprocessing.sparse_to_dense.PythonSparseToDenseProcessor(sorted_features: List[int], set_missing_value_to_zero: bool = False)

Bases: ml.rl.preprocessing.sparse_to_dense.SparseToDenseProcessor

process(sparse_data: List[Dict[int, float]]) → Tuple[torch.Tensor, torch.Tensor]
class ml.rl.preprocessing.sparse_to_dense.SparseToDenseProcessor(sorted_features: List[int], set_missing_value_to_zero: bool = False)

Bases: object

Module contents