reagent.replay_memory package

Submodules

reagent.replay_memory.circular_replay_buffer module

The standard DQN replay memory. This implementation is an out-of-graph replay memory + in-graph wrapper. It supports vanilla n-step updates of the form typically found in the literature, i.e. where rewards are accumulated for n steps and the intermediate trajectory is not exposed to the agent. This does not allow, for example, performing off-policy corrections.

class reagent.replay_memory.circular_replay_buffer.ReplayBuffer(observation_shape: Tuple[int, ...], stack_size: int, replay_capacity: int, batch_size: int, return_everything_as_stack: bool = False, return_as_timeline_format: bool = False, update_horizon: int = 1, gamma: float = 0.99, max_sample_attempts: int = 1000, extra_storage_types: Optional[List[reagent.replay_memory.circular_replay_buffer.shape_type]] = None, observation_dtype=numpy.uint8, action_shape: Tuple[int, ...] = (), action_dtype=numpy.int32, reward_shape: Tuple[int, ...] = (), reward_dtype=numpy.float32)

Bases: object

A simple Replay Buffer. Stores transitions, state, action, reward, next_state, terminal (and any extra contents specified) in a circular buffer and provides a uniform transition sampling function. When the states consist of stacks of observations storing the states is inefficient. This class writes observations and constructs the stacked states at sample time. .. attribute:: add_count

int, counter of how many transitions have been added (including the blank ones at the beginning of an episode).

add(observation, action, reward, terminal, *args, **kwargs)

Adds a transition to the replay memory. This function checks the types and handles the padding at the beginning of an episode. Then it calls the _add function. Since the next_observation in the transition will be the observation added next there is no need to pass it. If the replay memory is at capacity the oldest transition will be discarded. :param observation: np.array with shape observation_shape. :param action: int, the action in the transition. :param reward: float, the reward received in the transition. :param terminal: np.dtype, acts as a boolean indicating whether the transition

was terminal (1) or not (0).

Parameters

*args – extra contents with shapes and dtypes according to extra_storage_types.

classmethod create_from_env(env: gym.core.Env, *, replay_memory_size: int, batch_size: int, stack_size: int = 1, store_log_prob: bool = True, **kwargs)
cursor() → int

Index to the location where the next transition will be written.

get_add_args_signature() → List[reagent.replay_memory.circular_replay_buffer.shape_type]

The signature of the add function. Note - Derived classes may return a different signature. :returns:

list of ReplayElements defining the type of the argument signature needed

by the add function.

get_storage_signature() → List[reagent.replay_memory.circular_replay_buffer.shape_type]

Returns a default list of elements to be stored in this replay memory. Note - Derived classes may return a different signature. :returns: list of ReplayElements defining the type of the contents stored.

get_transition_elements(batch_size=None)

Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Returns

A namedtuple describing the method’s return type signature.

Return type

signature

is_empty() → bool

Is the Replay Buffer empty?

is_full() → bool

Is the Replay Buffer full?

is_valid_transition(index)
load(checkpoint_dir, suffix)

Restores the object from bundle_dictionary and numpy checkpoints. :param checkpoint_dir: str, the directory where to read the numpy checkpointed

files from.

Parameters

suffix – str, the suffix to use in numpy checkpoint files.

Raises

NotFoundError – If not all expected files are found in directory.

sample_all_valid_transitions()
sample_index_batch(batch_size: int) → torch.Tensor

Returns a batch of valid indices sampled uniformly. :param batch_size: int, number of indices returned.

Returns

1D tensor of ints, a batch of valid indices sampled uniformly.

Raises

RuntimeError – If there are no valid indices to sample.

sample_transition_batch(batch_size=None, indices=None)

Returns a batch of transitions (including any extra contents). If get_transition_elements has been overridden and defines elements not stored in self._store, an empty array will be returned and it will be left to the child class to fill it. For example, for the child class PrioritizedReplayBuffer, the contents of the sampling_probabilities are stored separately in a sum tree. When the transition is terminal next_state_batch has undefined contents. NOTE: This transition contains the indices of the sampled elements. These are only valid during the call to sample_transition_batch, i.e. they may be used by subclasses of this replay buffer but may point to different data as soon as sampling is done. NOTE: Tensors are reshaped. I.e., state is 2-D unless stack_size > 1. Scalar values are returned as (batch_size, 1) instead of (batch_size,). :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Parameters

indices – None or Tensor, the indices of every transition in the batch. If None, sample the indices uniformly.

Returns

tuple of Tensors with the shape and type as in

get_transition_elements().

Return type

transition_batch

Raises

ValueError – If an element to be sampled is missing from the replay buffer.

save(checkpoint_dir, iteration_number)

Save the ReplayBuffer attributes into a file. This method will save all the replay buffer’s state in a single file. :param checkpoint_dir: str, the directory where numpy checkpoint files should be

saved.

Parameters

iteration_number – int, iteration_number to use as a suffix in naming numpy checkpoint files.

set_index_valid_status(idx: int, is_valid: bool)
property size
reagent.replay_memory.circular_replay_buffer.ReplayElement

alias of reagent.replay_memory.circular_replay_buffer.shape_type

reagent.replay_memory.prioritized_replay_buffer module

An implementation of Prioritized Experience Replay (PER). This implementation is based on the paper “Prioritized Experience Replay” by Tom Schaul et al. (2015). Many thanks to Tom Schaul, John Quan, and Matteo Hessel for providing useful pointers on the algorithm and its implementation.

class reagent.replay_memory.prioritized_replay_buffer.PrioritizedReplayBuffer(observation_shape, stack_size, replay_capacity, batch_size, update_horizon=1, gamma=0.99, max_sample_attempts=1000, extra_storage_types=None, observation_dtype=numpy.uint8, action_shape=(), action_dtype=numpy.int32, reward_shape=(), reward_dtype=numpy.float32)

Bases: reagent.replay_memory.circular_replay_buffer.ReplayBuffer

An out-of-graph Replay Buffer for Prioritized Experience Replay. See circular_replay_buffer.py for details.

get_add_args_signature()

The signature of the add function. The signature is the same as the one for ReplayBuffer, with an added priority. :returns:

list of ReplayElements defining the type of the argument signature needed

by the add function.

get_priority(indices)

Fetches the priorities correspond to a batch of memory indices. For any memory location not yet used, the corresponding priority is 0. :param indices: np.array with dtype int32, of indices in range

[0, replay_capacity).

Returns

float, the corresponding priorities.

Return type

priorities

get_transition_elements(batch_size=None)

Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Returns

A namedtuple describing the method’s return type signature.

Return type

signature

sample_index_batch(batch_size: int) → torch.Tensor

Returns a batch of valid indices sampled as in Schaul et al. (2015). :param batch_size: int, number of indices returned.

Returns

1D tensor of ints, a batch of valid indices sampled uniformly.

Raises

Exception – If the batch was not constructed after maximum number of tries.

sample_transition_batch(batch_size=None, indices=None)

Returns a batch of transitions with extra storage and the priorities. The extra storage are defined through the extra_storage_types constructor argument. When the transition is terminal next_state_batch has undefined contents. :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Parameters

indices – None or 1D tensor of ints, the indices of every transition in the batch. If None, sample the indices uniformly.

Returns

tuple of np.arrays with the shape and type as in

get_transition_elements().

Return type

transition_batch

set_priority(indices, priorities)

Sets the priority of the given elements according to Schaul et al. :param indices: np.array with dtype int32, of indices in range

[0, replay_capacity).

Parameters

priorities – float, the corresponding priorities.

reagent.replay_memory.sum_tree module

A sum tree data structure. Used for prioritized experience replay. See prioritized_replay_buffer.py and Schaul et al. (2015).

class reagent.replay_memory.sum_tree.SumTree(capacity: int)

Bases: object

A sum tree data structure for storing replay priorities. A sum tree is a complete binary tree whose leaves contain values called priorities. Internal nodes maintain the sum of the priorities of all leaf nodes in their subtree. For capacity = 4, the tree may look like this:

2.5


+-+-+ +-+-+ |1.5| |1.0| +-+-+ +-+-+

+—-+—-+ +—-+—-+ | | | |

+-+-+ +-+-+ +-+-+ +-+-+ |0.5| |1.0| |0.5| |0.5| +—+ +—+ +—+ +—+ This is stored in a list of numpy arrays: self.nodes = [ [2.5], [1.5, 1], [0.5, 1, 0.5, 0.5] ] For conciseness, we allocate arrays as powers of two, and pad the excess elements with zero values. This is similar to the usual array-based representation of a complete binary tree, but is a little more user-friendly.

get(node_index: int) → float

Returns the value of the leaf node corresponding to the index. :param node_index: The index of the leaf node.

Returns

The value of the leaf node.

sample(query_value: Optional[float] = None) → int

Samples an element from the sum tree. Each element has probability p_i / sum_j p_j of being picked, where p_i is the (positive) value associated with node i (possibly unnormalized). :param query_value: float in [0, 1], used as the random value to select a :param sample. If None, will select one randomly in [0, 1).:

Returns

int, a random element from the sum tree.

Raises

Exception – If the sum tree is empty (i.e. its node values sum to 0), or if the supplied query_value is larger than the total sum.

set(node_index: int, value: float) → None

Sets the value of a leaf node and updates internal nodes accordingly. This operation takes O(log(capacity)). :param node_index: int, the index of the leaf node to be updated. :param value: float, the value which we assign to the node. This value must be

nonnegative. Setting value = 0 will cause the element to never be sampled.

Raises

ValueError – If the given value is negative.

stratified_sample(batch_size: int) → List[int]

Performs stratified sampling using the sum tree. Let R be the value at the root (total value of sum tree). This method will divide [0, R) into batch_size segments, pick a random number from each of those segments, and use that random number to sample from the sum_tree. This is as specified in Schaul et al. (2015). :param batch_size: int, the number of strata to use.

Returns

list of batch_size elements sampled from the sum tree.

Raises

Exception – If the sum tree is empty (i.e. its node values sum to 0).

reagent.replay_memory.utils module

reagent.replay_memory.utils.replay_buffer_to_pre_timeline_df(is_discrete_action: bool, replay_buffer: reagent.replay_memory.circular_replay_buffer.ReplayBuffer) → pandas.DataFrame

Format needed for uploading dataset to Hive, and then run timeline.

Module contents

class reagent.replay_memory.ReplayBuffer(observation_shape: Tuple[int, ...], stack_size: int, replay_capacity: int, batch_size: int, return_everything_as_stack: bool = False, return_as_timeline_format: bool = False, update_horizon: int = 1, gamma: float = 0.99, max_sample_attempts: int = 1000, extra_storage_types: Optional[List[reagent.replay_memory.circular_replay_buffer.shape_type]] = None, observation_dtype=numpy.uint8, action_shape: Tuple[int, ...] = (), action_dtype=numpy.int32, reward_shape: Tuple[int, ...] = (), reward_dtype=numpy.float32)

Bases: object

A simple Replay Buffer. Stores transitions, state, action, reward, next_state, terminal (and any extra contents specified) in a circular buffer and provides a uniform transition sampling function. When the states consist of stacks of observations storing the states is inefficient. This class writes observations and constructs the stacked states at sample time. .. attribute:: add_count

int, counter of how many transitions have been added (including the blank ones at the beginning of an episode).

add(observation, action, reward, terminal, *args, **kwargs)

Adds a transition to the replay memory. This function checks the types and handles the padding at the beginning of an episode. Then it calls the _add function. Since the next_observation in the transition will be the observation added next there is no need to pass it. If the replay memory is at capacity the oldest transition will be discarded. :param observation: np.array with shape observation_shape. :param action: int, the action in the transition. :param reward: float, the reward received in the transition. :param terminal: np.dtype, acts as a boolean indicating whether the transition

was terminal (1) or not (0).

Parameters

*args – extra contents with shapes and dtypes according to extra_storage_types.

classmethod create_from_env(env: gym.core.Env, *, replay_memory_size: int, batch_size: int, stack_size: int = 1, store_log_prob: bool = True, **kwargs)
cursor() → int

Index to the location where the next transition will be written.

get_add_args_signature() → List[reagent.replay_memory.circular_replay_buffer.shape_type]

The signature of the add function. Note - Derived classes may return a different signature. :returns:

list of ReplayElements defining the type of the argument signature needed

by the add function.

get_storage_signature() → List[reagent.replay_memory.circular_replay_buffer.shape_type]

Returns a default list of elements to be stored in this replay memory. Note - Derived classes may return a different signature. :returns: list of ReplayElements defining the type of the contents stored.

get_transition_elements(batch_size=None)

Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Returns

A namedtuple describing the method’s return type signature.

Return type

signature

is_empty() → bool

Is the Replay Buffer empty?

is_full() → bool

Is the Replay Buffer full?

is_valid_transition(index)
load(checkpoint_dir, suffix)

Restores the object from bundle_dictionary and numpy checkpoints. :param checkpoint_dir: str, the directory where to read the numpy checkpointed

files from.

Parameters

suffix – str, the suffix to use in numpy checkpoint files.

Raises

NotFoundError – If not all expected files are found in directory.

sample_all_valid_transitions()
sample_index_batch(batch_size: int) → torch.Tensor

Returns a batch of valid indices sampled uniformly. :param batch_size: int, number of indices returned.

Returns

1D tensor of ints, a batch of valid indices sampled uniformly.

Raises

RuntimeError – If there are no valid indices to sample.

sample_transition_batch(batch_size=None, indices=None)

Returns a batch of transitions (including any extra contents). If get_transition_elements has been overridden and defines elements not stored in self._store, an empty array will be returned and it will be left to the child class to fill it. For example, for the child class PrioritizedReplayBuffer, the contents of the sampling_probabilities are stored separately in a sum tree. When the transition is terminal next_state_batch has undefined contents. NOTE: This transition contains the indices of the sampled elements. These are only valid during the call to sample_transition_batch, i.e. they may be used by subclasses of this replay buffer but may point to different data as soon as sampling is done. NOTE: Tensors are reshaped. I.e., state is 2-D unless stack_size > 1. Scalar values are returned as (batch_size, 1) instead of (batch_size,). :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Parameters

indices – None or Tensor, the indices of every transition in the batch. If None, sample the indices uniformly.

Returns

tuple of Tensors with the shape and type as in

get_transition_elements().

Return type

transition_batch

Raises

ValueError – If an element to be sampled is missing from the replay buffer.

save(checkpoint_dir, iteration_number)

Save the ReplayBuffer attributes into a file. This method will save all the replay buffer’s state in a single file. :param checkpoint_dir: str, the directory where numpy checkpoint files should be

saved.

Parameters

iteration_number – int, iteration_number to use as a suffix in naming numpy checkpoint files.

set_index_valid_status(idx: int, is_valid: bool)
property size
class reagent.replay_memory.PrioritizedReplayBuffer(observation_shape, stack_size, replay_capacity, batch_size, update_horizon=1, gamma=0.99, max_sample_attempts=1000, extra_storage_types=None, observation_dtype=numpy.uint8, action_shape=(), action_dtype=numpy.int32, reward_shape=(), reward_dtype=numpy.float32)

Bases: reagent.replay_memory.circular_replay_buffer.ReplayBuffer

An out-of-graph Replay Buffer for Prioritized Experience Replay. See circular_replay_buffer.py for details.

get_add_args_signature()

The signature of the add function. The signature is the same as the one for ReplayBuffer, with an added priority. :returns:

list of ReplayElements defining the type of the argument signature needed

by the add function.

get_priority(indices)

Fetches the priorities correspond to a batch of memory indices. For any memory location not yet used, the corresponding priority is 0. :param indices: np.array with dtype int32, of indices in range

[0, replay_capacity).

Returns

float, the corresponding priorities.

Return type

priorities

get_transition_elements(batch_size=None)

Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Returns

A namedtuple describing the method’s return type signature.

Return type

signature

sample_index_batch(batch_size: int) → torch.Tensor

Returns a batch of valid indices sampled as in Schaul et al. (2015). :param batch_size: int, number of indices returned.

Returns

1D tensor of ints, a batch of valid indices sampled uniformly.

Raises

Exception – If the batch was not constructed after maximum number of tries.

sample_transition_batch(batch_size=None, indices=None)

Returns a batch of transitions with extra storage and the priorities. The extra storage are defined through the extra_storage_types constructor argument. When the transition is terminal next_state_batch has undefined contents. :param batch_size: int, number of transitions returned. If None, the default

batch_size will be used.

Parameters

indices – None or 1D tensor of ints, the indices of every transition in the batch. If None, sample the indices uniformly.

Returns

tuple of np.arrays with the shape and type as in

get_transition_elements().

Return type

transition_batch

set_priority(indices, priorities)

Sets the priority of the given elements according to Schaul et al. :param indices: np.array with dtype int32, of indices in range

[0, replay_capacity).

Parameters

priorities – float, the corresponding priorities.