reagent.data package

Submodules

reagent.data.data_fetcher module

class reagent.data.data_fetcher.DataFetcher

Bases: object

query_data(input_table_spec: reagent.workflow.types.TableSpec, discrete_action: bool, actions: Optional[List[str]] = None, include_possible_actions=True, custom_reward_expression: Optional[str] = None, sample_range: Optional[Tuple[float, float]] = None, multi_steps: Optional[int] = None, gamma: Optional[float] = None) reagent.workflow.types.Dataset
query_data_synthetic_reward(input_table_spec: reagent.workflow.types.TableSpec, discrete_action_names: Optional[List[str]] = None, sample_range: Optional[Tuple[float, float]] = None, max_seq_len: Optional[int] = None) reagent.workflow.types.Dataset

reagent.data.manual_data_module module

class reagent.data.manual_data_module.ManualDataModule(*args: Any, **kwargs: Any)

Bases: reagent.data.reagent_data_module.ReAgentDataModule

abstract build_batch_preprocessor() reagent.preprocessing.batch_preprocessor.BatchPreprocessor
get_dataloader(dataset: reagent.workflow.types.Dataset, identity: str = 'Default')
get_normalization_data_map(keys: Optional[List[str]] = None) Dict[str, reagent.core.parameters.NormalizationData]
property model_manager
prepare_data(*args, **kwargs)

Use this to download and prepare data.

Warning

DO NOT set state to the model (use setup instead) since this is NOT called on every GPU in DDP/TPU

Example:

def prepare_data(self):
    # good
    download_data()
    tokenize()
    etc()

    # bad
    self.split = data_split
    self.some_state = some_other_state()

In DDP prepare_data can be called in two ways (using Trainer(prepare_data_per_node)):

  1. Once per node. This is the default and is only called on LOCAL_RANK=0.

  2. Once in total. Only called on GLOBAL_RANK=0.

Example:

# DEFAULT
# called once per node on LOCAL_RANK=0 of that node
Trainer(prepare_data_per_node=True)

# call on GLOBAL_RANK=0 (great for shared file systems)
Trainer(prepare_data_per_node=False)

Note

Setting prepare_data_per_node with the trainer flag is deprecated and will be removed in v1.7.0. Please set prepare_data_per_node in LightningDataModule or LightningModule directly instead.

This is called before requesting the dataloaders:

model.prepare_data()
initialize_distributed()
model.setup(stage)
model.train_dataloader()
model.val_dataloader()
model.test_dataloader()
abstract query_data(input_table_spec: reagent.workflow.types.TableSpec, sample_range: Optional[Tuple[float, float]], reward_options: reagent.workflow.types.RewardOptions, data_fetcher: reagent.data.data_fetcher.DataFetcher) reagent.workflow.types.Dataset

Massage input table into the format expected by the trainer

abstract run_feature_identification(input_table_spec: reagent.workflow.types.TableSpec) Dict[str, reagent.core.parameters.NormalizationData]

Derive preprocessing parameters from data.

setup(stage=None)

Called at the beginning of fit (train + validate), validate, test, and predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.

Parameters

stage – either 'fit', 'validate', 'test', or 'predict'

Example:

class LitModel(...):
    def __init__(self):
        self.l1 = None

    def prepare_data(self):
        download_data()
        tokenize()

        # don't do this
        self.something = else

    def setup(stage):
        data = Load_data(...)
        self.l1 = nn.Linear(28, data.num_classes)
abstract property should_generate_eval_dataset: bool
test_dataloader()

Implement one or multiple PyTorch DataLoaders for testing.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a postive integer.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Returns

A torch.utils.data.DataLoader or a sequence of them specifying testing samples.

Example:

def test_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=False, transform=transform,
                    download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=False
    )

    return loader

# can also return multiple dataloaders
def test_dataloader(self):
    return [loader_a, loader_b, ..., loader_n]

Note

If you don’t need a test dataset and a test_step(), you don’t need to implement this method.

Note

In the case where you return multiple test dataloaders, the test_step() will have an argument dataloader_idx which matches the order here.

train_dataloader()

Implement one or more PyTorch DataLoaders for training.

Returns

A collection of torch.utils.data.DataLoader specifying training samples. In the case of multiple dataloaders, please see this page.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Example:

# single dataloader
def train_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=True, transform=transform,
                    download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=True
    )
    return loader

# multiple dataloaders, return as list
def train_dataloader(self):
    mnist = MNIST(...)
    cifar = CIFAR(...)
    mnist_loader = torch.utils.data.DataLoader(
        dataset=mnist, batch_size=self.batch_size, shuffle=True
    )
    cifar_loader = torch.utils.data.DataLoader(
        dataset=cifar, batch_size=self.batch_size, shuffle=True
    )
    # each batch will be a list of tensors: [batch_mnist, batch_cifar]
    return [mnist_loader, cifar_loader]

# multiple dataloader, return as dict
def train_dataloader(self):
    mnist = MNIST(...)
    cifar = CIFAR(...)
    mnist_loader = torch.utils.data.DataLoader(
        dataset=mnist, batch_size=self.batch_size, shuffle=True
    )
    cifar_loader = torch.utils.data.DataLoader(
        dataset=cifar, batch_size=self.batch_size, shuffle=True
    )
    # each batch will be a dict of tensors: {'mnist': batch_mnist, 'cifar': batch_cifar}
    return {'mnist': mnist_loader, 'cifar': cifar_loader}
val_dataloader()

Implement one or multiple PyTorch DataLoaders for validation.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

It’s recommended that all data downloads and preparation happen in prepare_data().

Note

Lightning adds the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Returns

A torch.utils.data.DataLoader or a sequence of them specifying validation samples.

Examples:

def val_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=False,
                    transform=transform, download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=False
    )

    return loader

# can also return multiple dataloaders
def val_dataloader(self):
    return [loader_a, loader_b, ..., loader_n]

Note

If you don’t need a validation dataset and a validation_step(), you don’t need to implement this method.

Note

In the case where you return multiple validation dataloaders, the validation_step() will have an argument dataloader_idx which matches the order here.

class reagent.data.manual_data_module.TrainEvalSampleRanges(train_sample_range, eval_sample_range)

Bases: NamedTuple

eval_sample_range: Tuple[float, float]

Alias for field number 1

train_sample_range: Tuple[float, float]

Alias for field number 0

reagent.data.manual_data_module.collate_and_preprocess(batch_preprocessor: reagent.preprocessing.batch_preprocessor.BatchPreprocessor, use_gpu: bool)

Helper for Petastorm’s DataLoader to preprocess. TODO(kaiwenw): parallelize preprocessing by using transform of Petastorm reader Should pin memory and preprocess in reader and convert to gpu in collate_fn.

reagent.data.manual_data_module.get_sample_range(input_table_spec: reagent.workflow.types.TableSpec, calc_cpe_in_training: bool) reagent.data.manual_data_module.TrainEvalSampleRanges

reagent.data.oss_data_fetcher module

class reagent.data.oss_data_fetcher.OssDataFetcher

Bases: reagent.data.data_fetcher.DataFetcher

query_data(input_table_spec: reagent.workflow.types.TableSpec, discrete_action: bool, actions: Optional[List[str]] = None, include_possible_actions=True, custom_reward_expression: Optional[str] = None, sample_range: Optional[Tuple[float, float]] = None, multi_steps: Optional[int] = None, gamma: Optional[float] = None) reagent.workflow.types.Dataset

Perform reward calculation, hashing mdp + subsampling and other preprocessing such as sparse2dense.

reagent.data.oss_data_fetcher.calc_custom_reward(df, custom_reward_expression: str)
reagent.data.oss_data_fetcher.calc_reward_multi_steps(df, multi_steps: int, gamma: float)
reagent.data.oss_data_fetcher.discrete_action_preprocessing(df, actions: List[str], multi_steps: Optional[int] = None)
Inputted actions and possible_actions are strings, which isn’t supported

for PyTorch Tensors. Here, we represent them with LongType. (a) action and next_action are strings, so simply return their position

in the action_space (as given by argument actions).

  1. possible_actions and possible_next_actions are list of strs, so return an existence bitvector of length len(actions), where ith index is true iff actions[i] was in the list.

By-product: output not_terminal from preprocessing actions.

reagent.data.oss_data_fetcher.get_distinct_keys(df, col_name, is_col_arr_map=False)

Return list of distinct keys. Set is_col_arr_map to be true if column is an array of Maps. Otherwise, assume column is a Map.

reagent.data.oss_data_fetcher.hash_mdp_id_and_subsample(df, sample_range: Optional[Tuple[float, float]] = None)

Since mdp_id is a string but Pytorch Tensors do not store strings, we hash them with crc32, which is treated as a cryptographic hash (with range [0, MAX_UINT32-1]). We also perform an optional subsampling based on this hash value. NOTE: we’re assuming no collisions in this hash! Otherwise, two mdp_ids can be indistinguishable after the hash. TODO: change this to a deterministic subsample.

reagent.data.oss_data_fetcher.infer_action_names(df, multi_steps: Optional[int])
reagent.data.oss_data_fetcher.infer_metrics_names(df, multi_steps: Optional[int])

Infer possible metrics names. Assume in multi-step case, metrics is an array of maps.

reagent.data.oss_data_fetcher.infer_states_names(df, multi_steps: Optional[int])

Infer possible state names from states and next state features.

reagent.data.oss_data_fetcher.make_existence_bitvector_udf(arr: List[str])

one-hot encode elements of target depending on their existence in arr.

reagent.data.oss_data_fetcher.make_get_step_udf(multi_steps: Optional[int])

Get step count by taking length of next_states_features array.

reagent.data.oss_data_fetcher.make_next_udf(multi_steps: Optional[int], return_type)

Generic udf to get next (after multi_steps) item, provided item type.

reagent.data.oss_data_fetcher.make_sparse2dense(df, col_name: str, possible_keys: List)

Given a list of possible keys, convert sparse map to dense array. In our example, both value_type is assumed to be a float.

reagent.data.oss_data_fetcher.make_where_udf(arr: List[str])

Return index of item in arr, and len(arr) if not found.

reagent.data.oss_data_fetcher.misc_column_preprocessing(df, multi_steps: Optional[int])

Miscellaneous columns are step, time_diff, sequence_number, not_terminal.

reagent.data.oss_data_fetcher.parametric_action_preprocessing(df, actions: List[str], multi_steps: Optional[int] = None, include_possible_actions: bool = True)
reagent.data.oss_data_fetcher.rand_string(length)
reagent.data.oss_data_fetcher.select_relevant_columns(df, discrete_action: bool = True, include_possible_actions: bool = True)

Select all the relevant columns and perform type conversions.

reagent.data.oss_data_fetcher.set_reward_col_as_reward(df, custom_reward_expression: Optional[str] = None, multi_steps: Optional[int] = None, gamma: Optional[float] = None)
reagent.data.oss_data_fetcher.state_and_metrics_sparse2dense(df, states: List[int], metrics: List[str], multi_steps: Optional[int])

Sparse-to-dense preprocessing of Map columns, which are states and metrics. For each column of type Map, w/ name X, output two columns.

Map values are assumed to be scalar. This process is called sparse-to-dense. X = {“state_features”, “next_state_features”, “metrics”}. (a) Replace column X with a dense repesentation of the inputted (sparse) map.

Dense representation is to concatenate map values into a list.

  1. Create new column X_presence, which is a list of same length as (a) and the ith entry is 1 iff the key was present in the original map.

reagent.data.oss_data_fetcher.upload_as_parquet(df) reagent.workflow.types.Dataset

Generate a random parquet. Fails if cannot generate a non-existent name.

reagent.data.reagent_data_module module

class reagent.data.reagent_data_module.ReAgentDataModule(*args: Any, **kwargs: Any)

Bases: pytorch_lightning.core.datamodule.LightningDataModule

abstract get_normalization_data_map(keys: Optional[List[str]] = None) Dict[str, reagent.core.parameters.NormalizationData]

reagent.data.spark_utils module

reagent.data.spark_utils.SPARK_JAR_FROM_ROOT_DIR = 'preprocessing/target/rl-preprocessing-1.1.jar'

SPARK_JAR is abspath to the above jar file.

Assume file structure ReAgent/

preprocessing/… reagent/…

reagent.data.spark_utils.call_spark_class(spark, class_name: str, args: str)
reagent.data.spark_utils.get_spark_session(config: Optional[Dict[str, str]] = {'spark.app.name': 'ReAgent', 'spark.driver.extraClassPath': '/Users/czxttkl/github/ReAgent/preprocessing/target/rl-preprocessing-1.1.jar', 'spark.driver.host': '127.0.0.1', 'spark.master': 'local[*]', 'spark.sql.catalogImplementation': 'hive', 'spark.sql.execution.arrow.enabled': 'true', 'spark.sql.session.timeZone': 'UTC', 'spark.sql.shuffle.partitions': '12', 'spark.sql.warehouse.dir': '/Users/czxttkl/github/ReAgent/docs/spark-warehouse'})
reagent.data.spark_utils.get_table_url(table_name: str) str

Module contents

class reagent.data.DataFetcher

Bases: object

query_data(input_table_spec: reagent.workflow.types.TableSpec, discrete_action: bool, actions: Optional[List[str]] = None, include_possible_actions=True, custom_reward_expression: Optional[str] = None, sample_range: Optional[Tuple[float, float]] = None, multi_steps: Optional[int] = None, gamma: Optional[float] = None) reagent.workflow.types.Dataset
query_data_synthetic_reward(input_table_spec: reagent.workflow.types.TableSpec, discrete_action_names: Optional[List[str]] = None, sample_range: Optional[Tuple[float, float]] = None, max_seq_len: Optional[int] = None) reagent.workflow.types.Dataset
class reagent.data.ManualDataModule(*args: Any, **kwargs: Any)

Bases: reagent.data.reagent_data_module.ReAgentDataModule

abstract build_batch_preprocessor() reagent.preprocessing.batch_preprocessor.BatchPreprocessor
get_dataloader(dataset: reagent.workflow.types.Dataset, identity: str = 'Default')
get_normalization_data_map(keys: Optional[List[str]] = None) Dict[str, reagent.core.parameters.NormalizationData]
property model_manager
prepare_data(*args, **kwargs)

Use this to download and prepare data.

Warning

DO NOT set state to the model (use setup instead) since this is NOT called on every GPU in DDP/TPU

Example:

def prepare_data(self):
    # good
    download_data()
    tokenize()
    etc()

    # bad
    self.split = data_split
    self.some_state = some_other_state()

In DDP prepare_data can be called in two ways (using Trainer(prepare_data_per_node)):

  1. Once per node. This is the default and is only called on LOCAL_RANK=0.

  2. Once in total. Only called on GLOBAL_RANK=0.

Example:

# DEFAULT
# called once per node on LOCAL_RANK=0 of that node
Trainer(prepare_data_per_node=True)

# call on GLOBAL_RANK=0 (great for shared file systems)
Trainer(prepare_data_per_node=False)

Note

Setting prepare_data_per_node with the trainer flag is deprecated and will be removed in v1.7.0. Please set prepare_data_per_node in LightningDataModule or LightningModule directly instead.

This is called before requesting the dataloaders:

model.prepare_data()
initialize_distributed()
model.setup(stage)
model.train_dataloader()
model.val_dataloader()
model.test_dataloader()
abstract query_data(input_table_spec: reagent.workflow.types.TableSpec, sample_range: Optional[Tuple[float, float]], reward_options: reagent.workflow.types.RewardOptions, data_fetcher: reagent.data.data_fetcher.DataFetcher) reagent.workflow.types.Dataset

Massage input table into the format expected by the trainer

abstract run_feature_identification(input_table_spec: reagent.workflow.types.TableSpec) Dict[str, reagent.core.parameters.NormalizationData]

Derive preprocessing parameters from data.

setup(stage=None)

Called at the beginning of fit (train + validate), validate, test, and predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.

Parameters

stage – either 'fit', 'validate', 'test', or 'predict'

Example:

class LitModel(...):
    def __init__(self):
        self.l1 = None

    def prepare_data(self):
        download_data()
        tokenize()

        # don't do this
        self.something = else

    def setup(stage):
        data = Load_data(...)
        self.l1 = nn.Linear(28, data.num_classes)
abstract property should_generate_eval_dataset: bool
test_dataloader()

Implement one or multiple PyTorch DataLoaders for testing.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a postive integer.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Returns

A torch.utils.data.DataLoader or a sequence of them specifying testing samples.

Example:

def test_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=False, transform=transform,
                    download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=False
    )

    return loader

# can also return multiple dataloaders
def test_dataloader(self):
    return [loader_a, loader_b, ..., loader_n]

Note

If you don’t need a test dataset and a test_step(), you don’t need to implement this method.

Note

In the case where you return multiple test dataloaders, the test_step() will have an argument dataloader_idx which matches the order here.

train_dataloader()

Implement one or more PyTorch DataLoaders for training.

Returns

A collection of torch.utils.data.DataLoader specifying training samples. In the case of multiple dataloaders, please see this page.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Example:

# single dataloader
def train_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=True, transform=transform,
                    download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=True
    )
    return loader

# multiple dataloaders, return as list
def train_dataloader(self):
    mnist = MNIST(...)
    cifar = CIFAR(...)
    mnist_loader = torch.utils.data.DataLoader(
        dataset=mnist, batch_size=self.batch_size, shuffle=True
    )
    cifar_loader = torch.utils.data.DataLoader(
        dataset=cifar, batch_size=self.batch_size, shuffle=True
    )
    # each batch will be a list of tensors: [batch_mnist, batch_cifar]
    return [mnist_loader, cifar_loader]

# multiple dataloader, return as dict
def train_dataloader(self):
    mnist = MNIST(...)
    cifar = CIFAR(...)
    mnist_loader = torch.utils.data.DataLoader(
        dataset=mnist, batch_size=self.batch_size, shuffle=True
    )
    cifar_loader = torch.utils.data.DataLoader(
        dataset=cifar, batch_size=self.batch_size, shuffle=True
    )
    # each batch will be a dict of tensors: {'mnist': batch_mnist, 'cifar': batch_cifar}
    return {'mnist': mnist_loader, 'cifar': cifar_loader}
val_dataloader()

Implement one or multiple PyTorch DataLoaders for validation.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

It’s recommended that all data downloads and preparation happen in prepare_data().

Note

Lightning adds the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Returns

A torch.utils.data.DataLoader or a sequence of them specifying validation samples.

Examples:

def val_dataloader(self):
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize((0.5,), (1.0,))])
    dataset = MNIST(root='/path/to/mnist/', train=False,
                    transform=transform, download=True)
    loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=self.batch_size,
        shuffle=False
    )

    return loader

# can also return multiple dataloaders
def val_dataloader(self):
    return [loader_a, loader_b, ..., loader_n]

Note

If you don’t need a validation dataset and a validation_step(), you don’t need to implement this method.

Note

In the case where you return multiple validation dataloaders, the validation_step() will have an argument dataloader_idx which matches the order here.

class reagent.data.ReAgentDataModule(*args: Any, **kwargs: Any)

Bases: pytorch_lightning.core.datamodule.LightningDataModule

abstract get_normalization_data_map(keys: Optional[List[str]] = None) Dict[str, reagent.core.parameters.NormalizationData]