reagent.replay_memory package¶
Submodules¶
reagent.replay_memory.circular_replay_buffer module¶
The standard DQN replay memory. This implementation is an outofgraph replay memory + ingraph wrapper. It supports vanilla nstep updates of the form typically found in the literature, i.e. where rewards are accumulated for n steps and the intermediate trajectory is not exposed to the agent. This does not allow, for example, performing offpolicy corrections.

class
reagent.replay_memory.circular_replay_buffer.
ReplayBuffer
(observation_shape: Tuple[int, ...], stack_size: int, replay_capacity: int, batch_size: int, return_everything_as_stack: bool = False, return_as_timeline_format: bool = False, update_horizon: int = 1, gamma: float = 0.99, max_sample_attempts: int = 1000, extra_storage_types: Optional[List[reagent.replay_memory.circular_replay_buffer.shape_type]] = None, observation_dtype=numpy.uint8, action_shape: Tuple[int, ...] = (), action_dtype=numpy.int32, reward_shape: Tuple[int, ...] = (), reward_dtype=numpy.float32)¶ Bases:
object
A simple Replay Buffer. Stores transitions, state, action, reward, next_state, terminal (and any extra contents specified) in a circular buffer and provides a uniform transition sampling function. When the states consist of stacks of observations storing the states is inefficient. This class writes observations and constructs the stacked states at sample time. .. attribute:: add_count
int, counter of how many transitions have been added (including the blank ones at the beginning of an episode).

add
(observation, action, reward, terminal, *args, **kwargs)¶ Adds a transition to the replay memory. This function checks the types and handles the padding at the beginning of an episode. Then it calls the _add function. Since the next_observation in the transition will be the observation added next there is no need to pass it. If the replay memory is at capacity the oldest transition will be discarded. :param observation: np.array with shape observation_shape. :param action: int, the action in the transition. :param reward: float, the reward received in the transition. :param terminal: np.dtype, acts as a boolean indicating whether the transition
was terminal (1) or not (0).
 Parameters
*args – extra contents with shapes and dtypes according to extra_storage_types.

classmethod
create_from_env
(env: gym.core.Env, *, replay_memory_size: int, batch_size: int, stack_size: int = 1, store_log_prob: bool = True, **kwargs)¶

cursor
() → int¶ Index to the location where the next transition will be written.

get_add_args_signature
() → List[reagent.replay_memory.circular_replay_buffer.shape_type]¶ The signature of the add function. Note  Derived classes may return a different signature. :returns:
 list of ReplayElements defining the type of the argument signature needed
by the add function.

get_storage_signature
() → List[reagent.replay_memory.circular_replay_buffer.shape_type]¶ Returns a default list of elements to be stored in this replay memory. Note  Derived classes may return a different signature. :returns: list of ReplayElements defining the type of the contents stored.

get_transition_elements
(batch_size=None)¶ Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Returns
A namedtuple describing the method’s return type signature.
 Return type
signature

is_empty
() → bool¶ Is the Replay Buffer empty?

is_full
() → bool¶ Is the Replay Buffer full?

is_valid_transition
(index)¶

load
(checkpoint_dir, suffix)¶ Restores the object from bundle_dictionary and numpy checkpoints. :param checkpoint_dir: str, the directory where to read the numpy checkpointed
files from.
 Parameters
suffix – str, the suffix to use in numpy checkpoint files.
 Raises
NotFoundError – If not all expected files are found in directory.

sample_all_valid_transitions
()¶

sample_index_batch
(batch_size: int) → torch.Tensor¶ Returns a batch of valid indices sampled uniformly. :param batch_size: int, number of indices returned.
 Returns
1D tensor of ints, a batch of valid indices sampled uniformly.
 Raises
RuntimeError – If there are no valid indices to sample.

sample_transition_batch
(batch_size=None, indices=None)¶ Returns a batch of transitions (including any extra contents). If get_transition_elements has been overridden and defines elements not stored in self._store, an empty array will be returned and it will be left to the child class to fill it. For example, for the child class PrioritizedReplayBuffer, the contents of the sampling_probabilities are stored separately in a sum tree. When the transition is terminal next_state_batch has undefined contents. NOTE: This transition contains the indices of the sampled elements. These are only valid during the call to sample_transition_batch, i.e. they may be used by subclasses of this replay buffer but may point to different data as soon as sampling is done. NOTE: Tensors are reshaped. I.e., state is 2D unless stack_size > 1. Scalar values are returned as (batch_size, 1) instead of (batch_size,). :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Parameters
indices – None or Tensor, the indices of every transition in the batch. If None, sample the indices uniformly.
 Returns
 tuple of Tensors with the shape and type as in
get_transition_elements().
 Return type
transition_batch
 Raises
ValueError – If an element to be sampled is missing from the replay buffer.

save
(checkpoint_dir, iteration_number)¶ Save the ReplayBuffer attributes into a file. This method will save all the replay buffer’s state in a single file. :param checkpoint_dir: str, the directory where numpy checkpoint files should be
saved.
 Parameters
iteration_number – int, iteration_number to use as a suffix in naming numpy checkpoint files.

set_index_valid_status
(idx: int, is_valid: bool)¶

property
size
¶


reagent.replay_memory.circular_replay_buffer.
ReplayElement
¶ alias of
reagent.replay_memory.circular_replay_buffer.shape_type
reagent.replay_memory.prioritized_replay_buffer module¶
An implementation of Prioritized Experience Replay (PER). This implementation is based on the paper “Prioritized Experience Replay” by Tom Schaul et al. (2015). Many thanks to Tom Schaul, John Quan, and Matteo Hessel for providing useful pointers on the algorithm and its implementation.

class
reagent.replay_memory.prioritized_replay_buffer.
PrioritizedReplayBuffer
(observation_shape, stack_size, replay_capacity, batch_size, update_horizon=1, gamma=0.99, max_sample_attempts=1000, extra_storage_types=None, observation_dtype=numpy.uint8, action_shape=(), action_dtype=numpy.int32, reward_shape=(), reward_dtype=numpy.float32)¶ Bases:
reagent.replay_memory.circular_replay_buffer.ReplayBuffer
An outofgraph Replay Buffer for Prioritized Experience Replay. See circular_replay_buffer.py for details.

get_add_args_signature
()¶ The signature of the add function. The signature is the same as the one for ReplayBuffer, with an added priority. :returns:
 list of ReplayElements defining the type of the argument signature needed
by the add function.

get_priority
(indices)¶ Fetches the priorities correspond to a batch of memory indices. For any memory location not yet used, the corresponding priority is 0. :param indices: np.array with dtype int32, of indices in range
[0, replay_capacity).
 Returns
float, the corresponding priorities.
 Return type
priorities

get_transition_elements
(batch_size=None)¶ Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Returns
A namedtuple describing the method’s return type signature.
 Return type
signature

sample_index_batch
(batch_size: int) → torch.Tensor¶ Returns a batch of valid indices sampled as in Schaul et al. (2015). :param batch_size: int, number of indices returned.
 Returns
1D tensor of ints, a batch of valid indices sampled uniformly.
 Raises
Exception – If the batch was not constructed after maximum number of tries.

sample_transition_batch
(batch_size=None, indices=None)¶ Returns a batch of transitions with extra storage and the priorities. The extra storage are defined through the extra_storage_types constructor argument. When the transition is terminal next_state_batch has undefined contents. :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Parameters
indices – None or 1D tensor of ints, the indices of every transition in the batch. If None, sample the indices uniformly.
 Returns
 tuple of np.arrays with the shape and type as in
get_transition_elements().
 Return type
transition_batch

set_priority
(indices, priorities)¶ Sets the priority of the given elements according to Schaul et al. :param indices: np.array with dtype int32, of indices in range
[0, replay_capacity).
 Parameters
priorities – float, the corresponding priorities.

reagent.replay_memory.sum_tree module¶
A sum tree data structure. Used for prioritized experience replay. See prioritized_replay_buffer.py and Schaul et al. (2015).

class
reagent.replay_memory.sum_tree.
SumTree
(capacity: int)¶ Bases:
object
A sum tree data structure for storing replay priorities. A sum tree is a complete binary tree whose leaves contain values called priorities. Internal nodes maintain the sum of the priorities of all leaf nodes in their subtree. For capacity = 4, the tree may look like this:
+++ +++ +++ +++ 0.5 1.0 0.5 0.5 +—+ +—+ +—+ +—+ This is stored in a list of numpy arrays: self.nodes = [ [2.5], [1.5, 1], [0.5, 1, 0.5, 0.5] ] For conciseness, we allocate arrays as powers of two, and pad the excess elements with zero values. This is similar to the usual arraybased representation of a complete binary tree, but is a little more userfriendly.

get
(node_index: int) → float¶ Returns the value of the leaf node corresponding to the index. :param node_index: The index of the leaf node.
 Returns
The value of the leaf node.

sample
(query_value: Optional[float] = None) → int¶ Samples an element from the sum tree. Each element has probability p_i / sum_j p_j of being picked, where p_i is the (positive) value associated with node i (possibly unnormalized). :param query_value: float in [0, 1], used as the random value to select a :param sample. If None, will select one randomly in [0, 1).:
 Returns
int, a random element from the sum tree.
 Raises
Exception – If the sum tree is empty (i.e. its node values sum to 0), or if the supplied query_value is larger than the total sum.

set
(node_index: int, value: float) → None¶ Sets the value of a leaf node and updates internal nodes accordingly. This operation takes O(log(capacity)). :param node_index: int, the index of the leaf node to be updated. :param value: float, the value which we assign to the node. This value must be
nonnegative. Setting value = 0 will cause the element to never be sampled.
 Raises
ValueError – If the given value is negative.

stratified_sample
(batch_size: int) → List[int]¶ Performs stratified sampling using the sum tree. Let R be the value at the root (total value of sum tree). This method will divide [0, R) into batch_size segments, pick a random number from each of those segments, and use that random number to sample from the sum_tree. This is as specified in Schaul et al. (2015). :param batch_size: int, the number of strata to use.
 Returns
list of batch_size elements sampled from the sum tree.
 Raises
Exception – If the sum tree is empty (i.e. its node values sum to 0).

reagent.replay_memory.utils module¶

reagent.replay_memory.utils.
replay_buffer_to_pre_timeline_df
(is_discrete_action: bool, replay_buffer: reagent.replay_memory.circular_replay_buffer.ReplayBuffer) → pandas.DataFrame¶ Format needed for uploading dataset to Hive, and then run timeline.
Module contents¶

class
reagent.replay_memory.
ReplayBuffer
(observation_shape: Tuple[int, ...], stack_size: int, replay_capacity: int, batch_size: int, return_everything_as_stack: bool = False, return_as_timeline_format: bool = False, update_horizon: int = 1, gamma: float = 0.99, max_sample_attempts: int = 1000, extra_storage_types: Optional[List[reagent.replay_memory.circular_replay_buffer.shape_type]] = None, observation_dtype=numpy.uint8, action_shape: Tuple[int, ...] = (), action_dtype=numpy.int32, reward_shape: Tuple[int, ...] = (), reward_dtype=numpy.float32)¶ Bases:
object
A simple Replay Buffer. Stores transitions, state, action, reward, next_state, terminal (and any extra contents specified) in a circular buffer and provides a uniform transition sampling function. When the states consist of stacks of observations storing the states is inefficient. This class writes observations and constructs the stacked states at sample time. .. attribute:: add_count
int, counter of how many transitions have been added (including the blank ones at the beginning of an episode).

add
(observation, action, reward, terminal, *args, **kwargs)¶ Adds a transition to the replay memory. This function checks the types and handles the padding at the beginning of an episode. Then it calls the _add function. Since the next_observation in the transition will be the observation added next there is no need to pass it. If the replay memory is at capacity the oldest transition will be discarded. :param observation: np.array with shape observation_shape. :param action: int, the action in the transition. :param reward: float, the reward received in the transition. :param terminal: np.dtype, acts as a boolean indicating whether the transition
was terminal (1) or not (0).
 Parameters
*args – extra contents with shapes and dtypes according to extra_storage_types.

classmethod
create_from_env
(env: gym.core.Env, *, replay_memory_size: int, batch_size: int, stack_size: int = 1, store_log_prob: bool = True, **kwargs)¶

cursor
() → int¶ Index to the location where the next transition will be written.

get_add_args_signature
() → List[reagent.replay_memory.circular_replay_buffer.shape_type]¶ The signature of the add function. Note  Derived classes may return a different signature. :returns:
 list of ReplayElements defining the type of the argument signature needed
by the add function.

get_storage_signature
() → List[reagent.replay_memory.circular_replay_buffer.shape_type]¶ Returns a default list of elements to be stored in this replay memory. Note  Derived classes may return a different signature. :returns: list of ReplayElements defining the type of the contents stored.

get_transition_elements
(batch_size=None)¶ Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Returns
A namedtuple describing the method’s return type signature.
 Return type
signature

is_empty
() → bool¶ Is the Replay Buffer empty?

is_full
() → bool¶ Is the Replay Buffer full?

is_valid_transition
(index)¶

load
(checkpoint_dir, suffix)¶ Restores the object from bundle_dictionary and numpy checkpoints. :param checkpoint_dir: str, the directory where to read the numpy checkpointed
files from.
 Parameters
suffix – str, the suffix to use in numpy checkpoint files.
 Raises
NotFoundError – If not all expected files are found in directory.

sample_all_valid_transitions
()¶

sample_index_batch
(batch_size: int) → torch.Tensor¶ Returns a batch of valid indices sampled uniformly. :param batch_size: int, number of indices returned.
 Returns
1D tensor of ints, a batch of valid indices sampled uniformly.
 Raises
RuntimeError – If there are no valid indices to sample.

sample_transition_batch
(batch_size=None, indices=None)¶ Returns a batch of transitions (including any extra contents). If get_transition_elements has been overridden and defines elements not stored in self._store, an empty array will be returned and it will be left to the child class to fill it. For example, for the child class PrioritizedReplayBuffer, the contents of the sampling_probabilities are stored separately in a sum tree. When the transition is terminal next_state_batch has undefined contents. NOTE: This transition contains the indices of the sampled elements. These are only valid during the call to sample_transition_batch, i.e. they may be used by subclasses of this replay buffer but may point to different data as soon as sampling is done. NOTE: Tensors are reshaped. I.e., state is 2D unless stack_size > 1. Scalar values are returned as (batch_size, 1) instead of (batch_size,). :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Parameters
indices – None or Tensor, the indices of every transition in the batch. If None, sample the indices uniformly.
 Returns
 tuple of Tensors with the shape and type as in
get_transition_elements().
 Return type
transition_batch
 Raises
ValueError – If an element to be sampled is missing from the replay buffer.

save
(checkpoint_dir, iteration_number)¶ Save the ReplayBuffer attributes into a file. This method will save all the replay buffer’s state in a single file. :param checkpoint_dir: str, the directory where numpy checkpoint files should be
saved.
 Parameters
iteration_number – int, iteration_number to use as a suffix in naming numpy checkpoint files.

set_index_valid_status
(idx: int, is_valid: bool)¶

property
size
¶


class
reagent.replay_memory.
PrioritizedReplayBuffer
(observation_shape, stack_size, replay_capacity, batch_size, update_horizon=1, gamma=0.99, max_sample_attempts=1000, extra_storage_types=None, observation_dtype=numpy.uint8, action_shape=(), action_dtype=numpy.int32, reward_shape=(), reward_dtype=numpy.float32)¶ Bases:
reagent.replay_memory.circular_replay_buffer.ReplayBuffer
An outofgraph Replay Buffer for Prioritized Experience Replay. See circular_replay_buffer.py for details.

get_add_args_signature
()¶ The signature of the add function. The signature is the same as the one for ReplayBuffer, with an added priority. :returns:
 list of ReplayElements defining the type of the argument signature needed
by the add function.

get_priority
(indices)¶ Fetches the priorities correspond to a batch of memory indices. For any memory location not yet used, the corresponding priority is 0. :param indices: np.array with dtype int32, of indices in range
[0, replay_capacity).
 Returns
float, the corresponding priorities.
 Return type
priorities

get_transition_elements
(batch_size=None)¶ Returns a ‘type signature’ for sample_transition_batch. :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Returns
A namedtuple describing the method’s return type signature.
 Return type
signature

sample_index_batch
(batch_size: int) → torch.Tensor¶ Returns a batch of valid indices sampled as in Schaul et al. (2015). :param batch_size: int, number of indices returned.
 Returns
1D tensor of ints, a batch of valid indices sampled uniformly.
 Raises
Exception – If the batch was not constructed after maximum number of tries.

sample_transition_batch
(batch_size=None, indices=None)¶ Returns a batch of transitions with extra storage and the priorities. The extra storage are defined through the extra_storage_types constructor argument. When the transition is terminal next_state_batch has undefined contents. :param batch_size: int, number of transitions returned. If None, the default
batch_size will be used.
 Parameters
indices – None or 1D tensor of ints, the indices of every transition in the batch. If None, sample the indices uniformly.
 Returns
 tuple of np.arrays with the shape and type as in
get_transition_elements().
 Return type
transition_batch

set_priority
(indices, priorities)¶ Sets the priority of the given elements according to Schaul et al. :param indices: np.array with dtype int32, of indices in range
[0, replay_capacity).
 Parameters
priorities – float, the corresponding priorities.
