reagent.workflow package
Submodules
reagent.workflow.cli module
reagent.workflow.env module
- reagent.workflow.env.get_new_named_entity_ids(module_names: List[str]) Dict[str, int]
- reagent.workflow.env.get_workflow_id() int
reagent.workflow.gym_batch_rl module
- reagent.workflow.gym_batch_rl.evaluate_gym(env_name: str, model: reagent.model_managers.union.ModelManager__Union, publisher: reagent.publishers.union.ModelPublisher__Union, num_eval_episodes: int, passing_score_bar: float, module_name: str = 'default_model', max_steps: Optional[int] = None)
- reagent.workflow.gym_batch_rl.initialize_seed(seed: int, env)
- reagent.workflow.gym_batch_rl.make_agent_from_model(env: reagent.gym.envs.gym.Gym, model: reagent.model_managers.union.ModelManager__Union, publisher: reagent.publishers.union.ModelPublisher__Union, module_name: str)
- reagent.workflow.gym_batch_rl.offline_gym_predictor(env_name: str, model: reagent.model_managers.union.ModelManager__Union, publisher: reagent.publishers.union.ModelPublisher__Union, pkl_path: str, num_train_transitions: int, max_steps: Optional[int], module_name: str = 'default_model', seed: int = 1)
Generate samples from a trained Policy on the Gym environment and saves results in a pandas df parquet.
- reagent.workflow.gym_batch_rl.offline_gym_random(env_name: str, pkl_path: str, num_train_transitions: int, max_steps: Optional[int], seed: int = 1)
Generate samples from a random Policy on the Gym environment and saves results in a pandas df parquet.
- reagent.workflow.gym_batch_rl.timeline_operator(pkl_path: str, input_table_spec: reagent.workflow.types.TableSpec)
Loads a pandas parquet, converts to pyspark, and uploads df to Hive. Then call the timeline operator.
reagent.workflow.identify_types_flow module
- reagent.workflow.identify_types_flow.create_normalization_spec_spark(df, column, num_samples: int, seed: Optional[int] = None)
Returns approximately num_samples random rows from column of df.
- reagent.workflow.identify_types_flow.identify_normalization_parameters(table_spec: reagent.workflow.types.TableSpec, column_name: str, preprocessing_options: reagent.workflow.types.PreprocessingOptions, seed: Optional[int] = None) Dict[int, reagent.core.parameters.NormalizationParameters]
Get normalization parameters
- reagent.workflow.identify_types_flow.identify_sparse_normalization_parameters(feature_config: reagent.core.types.ModelFeatureConfig, table_spec: reagent.workflow.types.TableSpec, id_list_column: str, id_score_list_column: str, preprocessing_options: reagent.workflow.types.PreprocessingOptions)
- reagent.workflow.identify_types_flow.normalization_helper(max_unique_enum_values: int, quantile_size: int, quantile_k2_threshold: float, skip_box_cox: bool = False, skip_quantiles: bool = False, feature_overrides: Optional[Dict[int, str]] = None, allowedlist_features: Optional[List[int]] = None, assert_allowedlist_feature_coverage: bool = True)
Construct a preprocessing closure to obtain normalization parameters from rows of feature_name and a sample of feature_values.
reagent.workflow.training module
- reagent.workflow.training.identify_and_train_network(input_table_spec: reagent.workflow.types.TableSpec, model: reagent.model_managers.union.ModelManager__Union, num_epochs: int, use_gpu: Optional[bool] = None, reward_options: Optional[reagent.workflow.types.RewardOptions] = None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None, warmstart_path: Optional[str] = None, validator: Optional[reagent.validators.union.ModelValidator__Union] = None, publisher: Optional[reagent.publishers.union.ModelPublisher__Union] = None) reagent.workflow.types.RLTrainingOutput
- reagent.workflow.training.query_and_train(input_table_spec: reagent.workflow.types.TableSpec, model: reagent.model_managers.union.ModelManager__Union, num_epochs: int, use_gpu: bool, *, setup_data: Optional[Dict[str, bytes]] = None, saved_setup_data: Optional[Dict[str, bytes]] = None, normalization_data_map: Optional[Dict[str, reagent.core.parameters.NormalizationData]] = None, reward_options: Optional[reagent.workflow.types.RewardOptions] = None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None, warmstart_path: Optional[str] = None, validator: Optional[reagent.validators.union.ModelValidator__Union] = None, publisher: Optional[reagent.publishers.union.ModelPublisher__Union] = None, named_model_ids: Optional[Dict[str, int]] = None, recurring_period: Optional[datetime.datetime] = None) reagent.workflow.types.RLTrainingOutput
- reagent.workflow.training.run_publisher(publisher: reagent.publishers.union.ModelPublisher__Union, model_chooser: reagent.model_managers.union.ModelManager__Union, training_output: reagent.workflow.types.RLTrainingOutput, setup_data: Optional[Dict[str, bytes]], recurring_workflow_ids: Dict[str, int], child_workflow_id: int, recurring_period: Optional[datetime.datetime]) reagent.workflow.types.RLTrainingOutput
- reagent.workflow.training.run_validator(validator: reagent.validators.union.ModelValidator__Union, training_output: reagent.workflow.types.RLTrainingOutput) reagent.workflow.types.RLTrainingOutput
- reagent.workflow.training.train_workflow(model_manager: reagent.model_managers.model_manager.ModelManager, train_dataset: Optional[reagent.workflow.types.Dataset], eval_dataset: Optional[reagent.workflow.types.Dataset], *, num_epochs: int, use_gpu: bool, named_model_ids: Dict[str, int], child_workflow_id: int, setup_data: Optional[Dict[str, bytes]] = None, normalization_data_map: Optional[Dict[str, reagent.core.parameters.NormalizationData]] = None, reward_options: Optional[reagent.workflow.types.RewardOptions] = None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None, warmstart_path: Optional[str] = None) reagent.workflow.types.RLTrainingOutput
reagent.workflow.training_reports module
- class reagent.workflow.training_reports.ActorCriticTrainingReport
- class reagent.workflow.training_reports.DQNTrainingReport(td_loss: Optional[float] = None, reward_ips: Optional[reagent.evaluation.cpe.CpeEstimate] = None, reward_dm: Optional[reagent.evaluation.cpe.CpeEstimate] = None, reward_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None, value_sequential_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None, value_weighted_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None, value_magic_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None)
Bases:
reagent.core.result_registries.TrainingReport
- reward_dm: Optional[reagent.evaluation.cpe.CpeEstimate] = None
- reward_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
- reward_ips: Optional[reagent.evaluation.cpe.CpeEstimate] = None
- td_loss: Optional[float] = None
- value_magic_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
- value_sequential_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
- value_weighted_dr: Optional[reagent.evaluation.cpe.CpeEstimate] = None
- class reagent.workflow.training_reports.ParametricDQNTrainingReport
- class reagent.workflow.training_reports.Seq2RewardTrainingReport
- class reagent.workflow.training_reports.SlateQTrainingReport
- class reagent.workflow.training_reports.WorldModelTrainingReport
reagent.workflow.types module
- class reagent.workflow.types.ModelFeatureConfigProvider__Union(raw: Optional[reagent.models.model_feature_config_provider.RawModelFeatureConfigProvider] = None)
Bases:
reagent.core.tagged_union.TaggedUnion
- make_union_instance(instance_class=None)
- raw: Optional[reagent.models.model_feature_config_provider.RawModelFeatureConfigProvider] = None
- class reagent.workflow.types.PreprocessingOptions(num_samples: int = 100000, max_unique_enum_values: int = 10, quantile_size: int = 20, quantile_k2_threshold: float = 1000.0, skip_box_cox: bool = False, skip_quantiles: bool = True, feature_overrides: Optional[Dict[int, str]] = None, tablesample: Optional[float] = None, set_missing_value_to_zero: Optional[bool] = False, allowedlist_features: Optional[List[int]] = None, assert_allowedlist_feature_coverage: bool = True)
Bases:
object
- allowedlist_features: Optional[List[int]] = None
- assert_allowedlist_feature_coverage: bool = True
- feature_overrides: Optional[Dict[int, str]] = None
- max_unique_enum_values: int = 10
- num_samples: int = 100000
- quantile_k2_threshold: float = 1000.0
- quantile_size: int = 20
- set_missing_value_to_zero: Optional[bool] = False
- skip_box_cox: bool = False
- skip_quantiles: bool = True
- tablesample: Optional[float] = None
- class reagent.workflow.types.PublishingResult__Union(no_publishing_results: Optional[reagent.core.result_types.NoPublishingResults] = None)
Bases:
reagent.core.tagged_union.TaggedUnion
- make_union_instance(instance_class=None)
- no_publishing_results: Optional[reagent.core.result_types.NoPublishingResults] = None
- class reagent.workflow.types.RLTrainingOutput(output_paths: Dict[str, str] = <factory>, validation_result: Optional[reagent.workflow.types.ValidationResult__Union] = None, publishing_result: Optional[reagent.workflow.types.PublishingResult__Union] = None, training_report: Optional[reagent.workflow.types.RLTrainingReport] = None, logger_data: Dict[str, Dict[str, List[Tuple[float, float]]]] = <factory>)
Bases:
object
- logger_data: Dict[str, Dict[str, List[Tuple[float, float]]]]
- output_paths: Dict[str, str]
- publishing_result: Optional[reagent.workflow.types.PublishingResult__Union] = None
- training_report: Optional[reagent.workflow.types.RLTrainingReport] = None
- validation_result: Optional[reagent.workflow.types.ValidationResult__Union] = None
- class reagent.workflow.types.RLTrainingReport(dqn_report: Optional[reagent.workflow.training_reports.DQNTrainingReport] = None, actor_critic_report: Optional[reagent.workflow.training_reports.ActorCriticTrainingReport] = None, world_model_report: Optional[reagent.workflow.training_reports.WorldModelTrainingReport] = None, parametric_dqn_report: Optional[reagent.workflow.training_reports.ParametricDQNTrainingReport] = None, slate_q_report: Optional[reagent.workflow.training_reports.SlateQTrainingReport] = None, seq2reward_report: Optional[reagent.workflow.training_reports.Seq2RewardTrainingReport] = None)
Bases:
reagent.core.tagged_union.TaggedUnion
- actor_critic_report: Optional[reagent.workflow.training_reports.ActorCriticTrainingReport] = None
- dqn_report: Optional[reagent.workflow.training_reports.DQNTrainingReport] = None
- make_union_instance(instance_class=None)
- parametric_dqn_report: Optional[reagent.workflow.training_reports.ParametricDQNTrainingReport] = None
- seq2reward_report: Optional[reagent.workflow.training_reports.Seq2RewardTrainingReport] = None
- slate_q_report: Optional[reagent.workflow.training_reports.SlateQTrainingReport] = None
- world_model_report: Optional[reagent.workflow.training_reports.WorldModelTrainingReport] = None
- class reagent.workflow.types.ReaderOptions(minibatch_size: int = 1024, petastorm_reader_pool_type: str = 'thread')
Bases:
object
- minibatch_size: int = 1024
- petastorm_reader_pool_type: str = 'thread'
- class reagent.workflow.types.ResourceOptions(gpu: int = 1, cpu: Optional[int] = None, memory: Optional[str] = '40g', min_nodes: Optional[int] = 1, max_nodes: Optional[int] = 1)
Bases:
object
- cpu: Optional[int] = None
- gpu: int = 1
- max_nodes: Optional[int] = 1
- memory: Optional[str] = '40g'
- min_nodes: Optional[int] = 1
- property use_gpu
- class reagent.workflow.types.RewardOptions(custom_reward_expression: Optional[str] = None, metric_reward_values: Optional[Dict[str, float]] = None)
Bases:
object
- custom_reward_expression: Optional[str] = None
- metric_reward_values: Optional[Dict[str, float]] = None
- class reagent.workflow.types.TableSpec(table_name: str, table_sample: Optional[float] = None, eval_table_sample: Optional[float] = None, test_table_sample: Optional[float] = None)
Bases:
object
- eval_table_sample: Optional[float] = None
- table_name: str
- table_sample: Optional[float] = None
- test_table_sample: Optional[float] = None
- class reagent.workflow.types.TrainerConf
Bases:
object
- class reagent.workflow.types.ValidationResult__Union(no_validation_results: Optional[reagent.core.result_types.NoValidationResults] = None)
Bases:
reagent.core.tagged_union.TaggedUnion
- make_union_instance(instance_class=None)
- no_validation_results: Optional[reagent.core.result_types.NoValidationResults] = None
reagent.workflow.utils module
- class reagent.workflow.utils.PetastormLightningDataModule(*args: Any, **kwargs: Any)
Bases:
pytorch_lightning.core.datamodule.LightningDataModule
- test_dataloader()
Implement one or multiple PyTorch DataLoaders for testing.
The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a postive integer.
For data processing use the following pattern:
download in
prepare_data()
process and split in
setup()
However, the above are only necessary for distributed processing.
Warning
do not assign state in prepare_data
fit()
…
prepare_data()
setup()
val_dataloader()
Note
Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.
- Returns
A
torch.utils.data.DataLoader
or a sequence of them specifying testing samples.
Example:
def test_dataloader(self): transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (1.0,))]) dataset = MNIST(root='/path/to/mnist/', train=False, transform=transform, download=True) loader = torch.utils.data.DataLoader( dataset=dataset, batch_size=self.batch_size, shuffle=False ) return loader # can also return multiple dataloaders def test_dataloader(self): return [loader_a, loader_b, ..., loader_n]
Note
If you don’t need a test dataset and a
test_step()
, you don’t need to implement this method.Note
In the case where you return multiple test dataloaders, the
test_step()
will have an argumentdataloader_idx
which matches the order here.
- train_dataloader()
Implement one or more PyTorch DataLoaders for training.
- Returns
A collection of
torch.utils.data.DataLoader
specifying training samples. In the case of multiple dataloaders, please see this page.
The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.
For data processing use the following pattern:
download in
prepare_data()
process and split in
setup()
However, the above are only necessary for distributed processing.
Warning
do not assign state in prepare_data
fit()
…
prepare_data()
setup()
Note
Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.
Example:
# single dataloader def train_dataloader(self): transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (1.0,))]) dataset = MNIST(root='/path/to/mnist/', train=True, transform=transform, download=True) loader = torch.utils.data.DataLoader( dataset=dataset, batch_size=self.batch_size, shuffle=True ) return loader # multiple dataloaders, return as list def train_dataloader(self): mnist = MNIST(...) cifar = CIFAR(...) mnist_loader = torch.utils.data.DataLoader( dataset=mnist, batch_size=self.batch_size, shuffle=True ) cifar_loader = torch.utils.data.DataLoader( dataset=cifar, batch_size=self.batch_size, shuffle=True ) # each batch will be a list of tensors: [batch_mnist, batch_cifar] return [mnist_loader, cifar_loader] # multiple dataloader, return as dict def train_dataloader(self): mnist = MNIST(...) cifar = CIFAR(...) mnist_loader = torch.utils.data.DataLoader( dataset=mnist, batch_size=self.batch_size, shuffle=True ) cifar_loader = torch.utils.data.DataLoader( dataset=cifar, batch_size=self.batch_size, shuffle=True ) # each batch will be a dict of tensors: {'mnist': batch_mnist, 'cifar': batch_cifar} return {'mnist': mnist_loader, 'cifar': cifar_loader}
- reagent.workflow.utils.collate_and_preprocess(batch_preprocessor: reagent.preprocessing.batch_preprocessor.BatchPreprocessor, use_gpu: bool)
Helper for Petastorm’s DataLoader to preprocess. TODO(kaiwenw): parallelize preprocessing by using transform of Petastorm reader Should pin memory and preprocess in reader and convert to gpu in collate_fn.
- reagent.workflow.utils.get_petastorm_dataloader(dataset: reagent.workflow.types.Dataset, batch_size: int, batch_preprocessor: reagent.preprocessing.batch_preprocessor.BatchPreprocessor, use_gpu: bool, reader_options: reagent.workflow.types.ReaderOptions)
get petastorm loader for dataset (with preprocessor)
- reagent.workflow.utils.get_rank() int
Returns the torch.distributed rank of the process. 0 represents the main process and is the default if torch.distributed isn’t set up
- reagent.workflow.utils.get_table_row_count(parquet_url: str)
- reagent.workflow.utils.train_eval_lightning(train_dataset, eval_dataset, test_dataset, trainer_module, data_module, num_epochs, logger_name: str, batch_preprocessor=None, reader_options: Optional[reagent.workflow.types.ReaderOptions] = None, checkpoint_path: Optional[str] = None, resource_options: Optional[reagent.workflow.types.ResourceOptions] = None) pytorch_lightning.trainer.trainer.Trainer