reagent.workflow package

Submodules

reagent.workflow.cli module

reagent.workflow.env module

reagent.workflow.env.get_new_named_entity_ids(module_names: List[str]) Dict[str, int]
reagent.workflow.env.get_workflow_id() int

reagent.workflow.gym_batch_rl module

reagent.workflow.gym_batch_rl.evaluate_gym(env_name: str, model: reagent.model_managers.union.ModelManager__Union, publisher: reagent.publishers.union.ModelPublisher__Union, num_eval_episodes: int, passing_score_bar: float, module_name: str = 'default_model', max_steps: Optional[int] = None)
reagent.workflow.gym_batch_rl.initialize_seed(seed: int, env)
reagent.workflow.gym_batch_rl.make_agent_from_model(env: reagent.gym.envs.gym.Gym, model: reagent.model_managers.union.ModelManager__Union, publisher: reagent.publishers.union.ModelPublisher__Union, module_name: str)
reagent.workflow.gym_batch_rl.offline_gym_predictor(env_name: str, model: reagent.model_managers.union.ModelManager__Union, publisher: reagent.publishers.union.ModelPublisher__Union, pkl_path: str, num_train_transitions: int, max_steps: Optional[int], module_name: str = 'default_model', seed: int = 1)

Generate samples from a trained Policy on the Gym environment and saves results in a pandas df parquet.

reagent.workflow.gym_batch_rl.offline_gym_random(env_name: str, pkl_path: str, num_train_transitions: int, max_steps: Optional[int], seed: int = 1)

Generate samples from a random Policy on the Gym environment and saves results in a pandas df parquet.

reagent.workflow.gym_batch_rl.timeline_operator(pkl_path: str, input_table_spec: reagent.workflow.types.TableSpec)

Loads a pandas parquet, converts to pyspark, and uploads df to Hive. Then call the timeline operator.

reagent.workflow.identify_types_flow module

reagent.workflow.identify_types_flow.create_normalization_spec_spark(df, column, num_samples: int, seed: Optional[int] = None)

Returns approximately num_samples random rows from column of df.

reagent.workflow.identify_types_flow.identify_normalization_parameters(table_spec: reagent.workflow.types.TableSpec, column_name: str, preprocessing_options: reagent.workflow.types.PreprocessingOptions, seed: Optional[int] = None) Dict[int, reagent.core.parameters.NormalizationParameters]

Get normalization parameters

reagent.workflow.identify_types_flow.identify_sparse_normalization_parameters(feature_config: reagent.core.types.ModelFeatureConfig, table_spec: reagent.workflow.types.TableSpec, id_list_column: str, id_score_list_column: str, preprocessing_options: reagent.workflow.types.PreprocessingOptions)
reagent.workflow.identify_types_flow.normalization_helper(max_unique_enum_values: int, quantile_size: int, quantile_k2_threshold: float, skip_box_cox: bool = False, skip_quantiles: bool = False, feature_overrides: Optional[Dict[int, str]] = None, allowedlist_features: Optional[List[int]] = None, assert_allowedlist_feature_coverage: bool = True)

Construct a preprocessing closure to obtain normalization parameters from rows of feature_name and a sample of feature_values.

reagent.workflow.training module

reagent.workflow.training.identify_and_train_network(input_table_spec: reagent.workflow.types.TableSpec, model: reagent.model_managers.union.ModelManager__Union, num_epochs: int, use_gpu: Optional[bool] = None, reward_options: Optional[reagent.workflow.types.RewardOptions] = None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None, warmstart_path: Optional[str] = None, validator: Optional[reagent.validators.union.ModelValidator__Union] = None, publisher: Optional[reagent.publishers.union.ModelPublisher__Union] = None) reagent.workflow.types.RLTrainingOutput
reagent.workflow.training.query_and_train(input_table_spec: reagent.workflow.types.TableSpec, model: reagent.model_managers.union.ModelManager__Union, num_epochs: int, use_gpu: bool, *, setup_data: Optional[Dict[str, bytes]] = None, saved_setup_data: Optional[Dict[str, bytes]] = None, normalization_data_map: Optional[Dict[str, reagent.core.parameters.NormalizationData]] = None, reward_options: Optional[reagent.workflow.types.RewardOptions] = None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None, warmstart_path: Optional[str] = None, validator: Optional[reagent.validators.union.ModelValidator__Union] = None, publisher: Optional[reagent.publishers.union.ModelPublisher__Union] = None, named_model_ids: Optional[Dict[str, int]] = None, recurring_period: Optional[datetime.datetime] = None) reagent.workflow.types.RLTrainingOutput
reagent.workflow.training.run_publisher(publisher: reagent.publishers.union.ModelPublisher__Union, model_chooser: reagent.model_managers.union.ModelManager__Union, training_output: reagent.workflow.types.RLTrainingOutput, setup_data: Optional[Dict[str, bytes]], recurring_workflow_ids: Dict[str, int], child_workflow_id: int, recurring_period: Optional[datetime.datetime]) reagent.workflow.types.RLTrainingOutput
reagent.workflow.training.run_validator(validator: reagent.validators.union.ModelValidator__Union, training_output: reagent.workflow.types.RLTrainingOutput) reagent.workflow.types.RLTrainingOutput
reagent.workflow.training.train_workflow(model_manager: reagent.model_managers.model_manager.ModelManager, train_dataset: Optional[reagent.workflow.types.Dataset], eval_dataset: Optional[reagent.workflow.types.Dataset], *, num_epochs: int, use_gpu: bool, named_model_ids: Dict[str, int], child_workflow_id: int, setup_data: Optional[Dict[str, bytes]] = None, normalization_data_map: Optional[Dict[str, reagent.core.parameters.NormalizationData]] = None, reward_options: Optional[reagent.workflow.types.RewardOptions] = None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None, warmstart_path: Optional[str] = None) reagent.workflow.types.RLTrainingOutput

reagent.workflow.training_reports module

class reagent.workflow.training_reports.ActorCriticTrainingReport

Bases: reagent.core.result_registries.TrainingReport

class reagent.workflow.training_reports.DQNTrainingReport(td_loss: Optional[float] = None, reward_ips: Optional[reagent.evaluation.cpe.CpeEstimate] = None, reward_dm: Optional[reagent.evaluation.cpe.CpeEstimate] = None, reward_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None, value_sequential_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None, value_weighted_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None, value_magic_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None)

Bases: reagent.core.result_registries.TrainingReport

reward_dm: Optional[reagent.evaluation.cpe.CpeEstimate] = None
reward_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
reward_ips: Optional[reagent.evaluation.cpe.CpeEstimate] = None
td_loss: Optional[float] = None
value_magic_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
value_sequential_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
value_weighted_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
class reagent.workflow.training_reports.ParametricDQNTrainingReport

Bases: reagent.core.result_registries.TrainingReport

class reagent.workflow.training_reports.Seq2RewardTrainingReport

Bases: reagent.core.result_registries.TrainingReport

class reagent.workflow.training_reports.SlateQTrainingReport

Bases: reagent.core.result_registries.TrainingReport

class reagent.workflow.training_reports.WorldModelTrainingReport

Bases: reagent.core.result_registries.TrainingReport

reagent.workflow.types module

class reagent.workflow.types.Dataset(parquet_url: str)

Bases: object

parquet_url: str
class reagent.workflow.types.ModelFeatureConfigProvider__Union(raw: Optional[reagent.models.model_feature_config_provider.RawModelFeatureConfigProvider] = None)

Bases: reagent.core.tagged_union.TaggedUnion

make_union_instance(instance_class=None)
raw: Optional[reagent.models.model_feature_config_provider.RawModelFeatureConfigProvider] = None
class reagent.workflow.types.PreprocessingOptions(num_samples: int = 100000, max_unique_enum_values: int = 10, quantile_size: int = 20, quantile_k2_threshold: float = 1000.0, skip_box_cox: bool = False, skip_quantiles: bool = True, feature_overrides: Optional[Dict[int, str]] = None, tablesample: Optional[float] = None, set_missing_value_to_zero: Optional[bool] = False, allowedlist_features: Optional[List[int]] = None, assert_allowedlist_feature_coverage: bool = True)

Bases: object

allowedlist_features: Optional[List[int]] = None
assert_allowedlist_feature_coverage: bool = True
feature_overrides: Optional[Dict[int, str]] = None
max_unique_enum_values: int = 10
num_samples: int = 100000
quantile_k2_threshold: float = 1000.0
quantile_size: int = 20
set_missing_value_to_zero: Optional[bool] = False
skip_box_cox: bool = False
skip_quantiles: bool = True
tablesample: Optional[float] = None
class reagent.workflow.types.PublishingResult__Union(no_publishing_results: Optional[reagent.core.result_types.NoPublishingResults] = None)

Bases: reagent.core.tagged_union.TaggedUnion

make_union_instance(instance_class=None)
no_publishing_results: Optional[reagent.core.result_types.NoPublishingResults] = None
class reagent.workflow.types.RLTrainingOutput(output_paths: Dict[str, str] = <factory>, validation_result: Optional[reagent.workflow.types.ValidationResult__Union] = None, publishing_result: Optional[reagent.workflow.types.PublishingResult__Union] = None, training_report: Optional[reagent.workflow.types.RLTrainingReport] = None, logger_data: Dict[str, Dict[str, List[Tuple[float, float]]]] = <factory>)

Bases: object

logger_data: Dict[str, Dict[str, List[Tuple[float, float]]]]
output_paths: Dict[str, str]
publishing_result: Optional[reagent.workflow.types.PublishingResult__Union] = None
training_report: Optional[reagent.workflow.types.RLTrainingReport] = None
validation_result: Optional[reagent.workflow.types.ValidationResult__Union] = None
class reagent.workflow.types.RLTrainingReport(dqn_report: Optional[reagent.workflow.training_reports.DQNTrainingReport] = None, actor_critic_report: Optional[reagent.workflow.training_reports.ActorCriticTrainingReport] = None, world_model_report: Optional[reagent.workflow.training_reports.WorldModelTrainingReport] = None, parametric_dqn_report: Optional[reagent.workflow.training_reports.ParametricDQNTrainingReport] = None, slate_q_report: Optional[reagent.workflow.training_reports.SlateQTrainingReport] = None, seq2reward_report: Optional[reagent.workflow.training_reports.Seq2RewardTrainingReport] = None)

Bases: reagent.core.tagged_union.TaggedUnion

actor_critic_report: Optional[reagent.workflow.training_reports.ActorCriticTrainingReport] = None
dqn_report: Optional[reagent.workflow.training_reports.DQNTrainingReport] = None
make_union_instance(instance_class=None)
parametric_dqn_report: Optional[reagent.workflow.training_reports.ParametricDQNTrainingReport] = None
seq2reward_report: Optional[reagent.workflow.training_reports.Seq2RewardTrainingReport] = None
slate_q_report: Optional[reagent.workflow.training_reports.SlateQTrainingReport] = None
world_model_report: Optional[reagent.workflow.training_reports.WorldModelTrainingReport] = None
class reagent.workflow.types.ReaderOptions(minibatch_size: int = 1024, petastorm_reader_pool_type: str = 'thread')

Bases: object

minibatch_size: int = 1024
petastorm_reader_pool_type: str = 'thread'
class reagent.workflow.types.ResourceOptions(gpu: int = 1, cpu: Optional[int] = None, memory: Optional[str] = '40g', min_nodes: Optional[int] = 1, max_nodes: Optional[int] = 1)

Bases: object

cpu: Optional[int] = None
gpu: int = 1
max_nodes: Optional[int] = 1
memory: Optional[str] = '40g'
min_nodes: Optional[int] = 1
property use_gpu
class reagent.workflow.types.RewardOptions(custom_reward_expression: Optional[str] = None, metric_reward_values: Optional[Dict[str, float]] = None)

Bases: object

custom_reward_expression: Optional[str] = None
metric_reward_values: Optional[Dict[str, float]] = None
class reagent.workflow.types.TableSpec(table_name: str, table_sample: Optional[float] = None, eval_table_sample: Optional[float] = None, test_table_sample: Optional[float] = None)

Bases: object

eval_table_sample: Optional[float] = None
table_name: str
table_sample: Optional[float] = None
test_table_sample: Optional[float] = None
class reagent.workflow.types.TrainerConf

Bases: object

class reagent.workflow.types.ValidationResult__Union(no_validation_results: Optional[reagent.core.result_types.NoValidationResults] = None)

Bases: reagent.core.tagged_union.TaggedUnion

make_union_instance(instance_class=None)
no_validation_results: Optional[reagent.core.result_types.NoValidationResults] = None

reagent.workflow.utils module

class reagent.workflow.utils.PetastormLightningDataModule(*args: Any, **kwargs: Any)

Bases: pytorch_lightning.core.datamodule.LightningDataModule

test_dataloader()

Implement one or multiple PyTorch DataLoaders for testing.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a postive integer.

For data processing use the following pattern:

  • download in prepare_data()

  • process and split in setup()

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Returns

A torch.utils.data.DataLoader or a sequence of them specifying testing samples.

Example:

def test_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=False, transform=transform,
                    download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=False
    )

    return loader

# can also return multiple dataloaders
def test_dataloader(self):
    return [loader_a, loader_b, ..., loader_n]

Note

If you don’t need a test dataset and a test_step(), you don’t need to implement this method.

Note

In the case where you return multiple test dataloaders, the test_step() will have an argument dataloader_idx which matches the order here.

train_dataloader()

Implement one or more PyTorch DataLoaders for training.

Returns

A collection of torch.utils.data.DataLoader specifying training samples. In the case of multiple dataloaders, please see this page.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

For data processing use the following pattern:

  • download in prepare_data()

  • process and split in setup()

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Example:

# single dataloader
def train_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=True, transform=transform,
                    download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=True
    )
    return loader

# multiple dataloaders, return as list
def train_dataloader(self):
    mnist = MNIST(...)
    cifar = CIFAR(...)
    mnist_loader = torch.utils.data.DataLoader(
        dataset=mnist, batch_size=self.batch_size, shuffle=True
    )
    cifar_loader = torch.utils.data.DataLoader(
        dataset=cifar, batch_size=self.batch_size, shuffle=True
    )
    # each batch will be a list of tensors: [batch_mnist, batch_cifar]
    return [mnist_loader, cifar_loader]

# multiple dataloader, return as dict
def train_dataloader(self):
    mnist = MNIST(...)
    cifar = CIFAR(...)
    mnist_loader = torch.utils.data.DataLoader(
        dataset=mnist, batch_size=self.batch_size, shuffle=True
    )
    cifar_loader = torch.utils.data.DataLoader(
        dataset=cifar, batch_size=self.batch_size, shuffle=True
    )
    # each batch will be a dict of tensors: {'mnist': batch_mnist, 'cifar': batch_cifar}
    return {'mnist': mnist_loader, 'cifar': cifar_loader}
reagent.workflow.utils.collate_and_preprocess(batch_preprocessor: reagent.preprocessing.batch_preprocessor.BatchPreprocessor, use_gpu: bool)

Helper for Petastorm’s DataLoader to preprocess. TODO(kaiwenw): parallelize preprocessing by using transform of Petastorm reader Should pin memory and preprocess in reader and convert to gpu in collate_fn.

reagent.workflow.utils.get_petastorm_dataloader(dataset: reagent.workflow.types.Dataset, batch_size: int, batch_preprocessor: reagent.preprocessing.batch_preprocessor.BatchPreprocessor, use_gpu: bool, reader_options: reagent.workflow.types.ReaderOptions)

get petastorm loader for dataset (with preprocessor)

reagent.workflow.utils.get_rank() int

Returns the torch.distributed rank of the process. 0 represents the main process and is the default if torch.distributed isn’t set up

reagent.workflow.utils.get_table_row_count(parquet_url: str)
reagent.workflow.utils.train_eval_lightning(train_dataset, eval_dataset, test_dataset, trainer_module, data_module, num_epochs, logger_name: str, batch_preprocessor=None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, checkpoint_path: Optional[str] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None) pytorch_lightning.trainer.trainer.Trainer

Module contents