Module ilpyt.envs.dummy_vec_env

DummyVecEnv is a vectorized OpenAI Gym environment object, implemented in a serial fashion. This is good for debugging purposes. Adapted from: https://github.com/openai/baselines/

Expand source code
"""
`DummyVecEnv` is a vectorized OpenAI Gym environment object, implemented in a 
serial fashion. This is good for debugging purposes. Adapted from: 
https://github.com/openai/baselines/
"""

from collections import OrderedDict

import gym
import numpy as np

from ilpyt.envs.vec_env import VecEnv


def copy_obs_dict(obs):
    """
    Deep-copy an observation dict.
    """
    return {k: np.copy(v) for k, v in obs.items()}


def dict_to_obs(obs_dict):
    """
    Convert an observation dict into a raw array if the
    original observation space was not a Dict space.
    """
    if set(obs_dict.keys()) == {None}:
        return obs_dict[None]
    return obs_dict


def obs_space_info(obs_space):
    """
    Get dict-structured information about a gym.Space.
    Returns
    -------
      A tuple (keys, shapes, dtypes):
        keys: a list of dict keys
        shapes: a dict mapping keys to shapes
        dtypes: a dict mapping keys to dtypes
    """
    if isinstance(obs_space, gym.spaces.Dict):
        assert isinstance(obs_space.spaces, OrderedDict)
        subspaces = obs_space.spaces
    elif isinstance(obs_space, gym.spaces.Tuple):
        assert isinstance(obs_space.spaces, tuple)
        subspaces = {
            i: obs_space.spaces[i] for i in range(len(obs_space.spaces))
        }
    else:
        subspaces = {None: obs_space}
    keys = []
    shapes = {}
    dtypes = {}
    for key, box in subspaces.items():
        keys.append(key)
        shapes[key] = box.shape
        dtypes[key] = box.dtype
    return keys, shapes, dtypes


def obs_to_dict(obs):
    """
    Convert an observation into a dict.
    """
    if isinstance(obs, dict):
        return obs
    return {None: obs}


class DummyVecEnv(VecEnv):
    """
    VecEnv that does runs multiple environments sequentially, that is,
    the step and reset commands are send to one environment at a time.
    Useful when debugging and when num_env == 1 (in the latter case,
    avoids communication overhead)
    """

    def __init__(self, env_fns):
        """
        Parameters
        ----------
        env_fns: iterable of callables
            functions that build environments
        """
        self.envs = [fn() for fn in env_fns]
        env = self.envs[0]
        VecEnv.__init__(
            self, len(env_fns), env.observation_space, env.action_space
        )
        obs_space = env.observation_space
        self.keys, shapes, dtypes = obs_space_info(obs_space)

        self.buf_obs = {
            k: np.zeros((self.num_envs,) + tuple(shapes[k]), dtype=dtypes[k])
            for k in self.keys
        }
        self.buf_dones = np.zeros((self.num_envs,), dtype=np.bool)
        self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32)
        self.buf_infos = [{} for _ in range(self.num_envs)]
        self.actions = None
        self.spec = self.envs[0].spec

    def step_async(self, actions):
        listify = True
        try:
            if len(actions) == self.num_envs:
                listify = False
        except TypeError:
            pass

        if not listify:
            self.actions = actions
        else:
            assert (
                self.num_envs == 1
            ), "actions {} is either not a list or has a wrong size - cannot match to {} environments".format(
                actions, self.num_envs
            )
            self.actions = [actions]

    def step_wait(self):
        # if isinstance(self.actions, torch.Tensor):
        # self.actions = self.actions.cpu().numpy()
        for e in range(self.num_envs):
            action = self.actions[e]
            # if isinstance(self.envs[e].action_space, spaces.Discrete):
            #    action = int(action)

            (
                obs,
                self.buf_rews[e],
                self.buf_dones[e],
                self.buf_infos[e],
            ) = self.envs[e].step(action)
            if self.buf_dones[e]:
                obs = self.envs[e].reset()
            self._save_obs(e, obs)
        return (
            self._obs_from_buf(),
            np.copy(self.buf_rews),
            np.copy(self.buf_dones),
            self.buf_infos.copy(),
        )

    def reset(self):
        for e in range(self.num_envs):
            obs = self.envs[e].reset()
            self._save_obs(e, obs)
        return self._obs_from_buf()

    def _save_obs(self, e, obs):
        for k in self.keys:
            if k is None:
                self.buf_obs[k][e] = obs
            else:
                self.buf_obs[k][e] = obs[k]

    def _obs_from_buf(self):
        return dict_to_obs(copy_obs_dict(self.buf_obs))

    def get_images(self):
        return [env.render(mode='rgb_array') for env in self.envs]

    def render(self, mode='human'):
        if self.num_envs == 1:
            return self.envs[0].render(mode=mode)
        else:
            return super().render(mode=mode)

Functions

def copy_obs_dict(obs)

Deep-copy an observation dict.

Expand source code
def copy_obs_dict(obs):
    """
    Deep-copy an observation dict.
    """
    return {k: np.copy(v) for k, v in obs.items()}
def dict_to_obs(obs_dict)

Convert an observation dict into a raw array if the original observation space was not a Dict space.

Expand source code
def dict_to_obs(obs_dict):
    """
    Convert an observation dict into a raw array if the
    original observation space was not a Dict space.
    """
    if set(obs_dict.keys()) == {None}:
        return obs_dict[None]
    return obs_dict
def obs_space_info(obs_space)

Get dict-structured information about a gym.Space. Returns


A tuple (keys, shapes, dtypes): keys: a list of dict keys shapes: a dict mapping keys to shapes dtypes: a dict mapping keys to dtypes

Expand source code
def obs_space_info(obs_space):
    """
    Get dict-structured information about a gym.Space.
    Returns
    -------
      A tuple (keys, shapes, dtypes):
        keys: a list of dict keys
        shapes: a dict mapping keys to shapes
        dtypes: a dict mapping keys to dtypes
    """
    if isinstance(obs_space, gym.spaces.Dict):
        assert isinstance(obs_space.spaces, OrderedDict)
        subspaces = obs_space.spaces
    elif isinstance(obs_space, gym.spaces.Tuple):
        assert isinstance(obs_space.spaces, tuple)
        subspaces = {
            i: obs_space.spaces[i] for i in range(len(obs_space.spaces))
        }
    else:
        subspaces = {None: obs_space}
    keys = []
    shapes = {}
    dtypes = {}
    for key, box in subspaces.items():
        keys.append(key)
        shapes[key] = box.shape
        dtypes[key] = box.dtype
    return keys, shapes, dtypes
def obs_to_dict(obs)

Convert an observation into a dict.

Expand source code
def obs_to_dict(obs):
    """
    Convert an observation into a dict.
    """
    if isinstance(obs, dict):
        return obs
    return {None: obs}

Classes

class DummyVecEnv (env_fns)

VecEnv that does runs multiple environments sequentially, that is, the step and reset commands are send to one environment at a time. Useful when debugging and when num_env == 1 (in the latter case, avoids communication overhead)

Parameters

env_fns : iterable of callables
functions that build environments
Expand source code
class DummyVecEnv(VecEnv):
    """
    VecEnv that does runs multiple environments sequentially, that is,
    the step and reset commands are send to one environment at a time.
    Useful when debugging and when num_env == 1 (in the latter case,
    avoids communication overhead)
    """

    def __init__(self, env_fns):
        """
        Parameters
        ----------
        env_fns: iterable of callables
            functions that build environments
        """
        self.envs = [fn() for fn in env_fns]
        env = self.envs[0]
        VecEnv.__init__(
            self, len(env_fns), env.observation_space, env.action_space
        )
        obs_space = env.observation_space
        self.keys, shapes, dtypes = obs_space_info(obs_space)

        self.buf_obs = {
            k: np.zeros((self.num_envs,) + tuple(shapes[k]), dtype=dtypes[k])
            for k in self.keys
        }
        self.buf_dones = np.zeros((self.num_envs,), dtype=np.bool)
        self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32)
        self.buf_infos = [{} for _ in range(self.num_envs)]
        self.actions = None
        self.spec = self.envs[0].spec

    def step_async(self, actions):
        listify = True
        try:
            if len(actions) == self.num_envs:
                listify = False
        except TypeError:
            pass

        if not listify:
            self.actions = actions
        else:
            assert (
                self.num_envs == 1
            ), "actions {} is either not a list or has a wrong size - cannot match to {} environments".format(
                actions, self.num_envs
            )
            self.actions = [actions]

    def step_wait(self):
        # if isinstance(self.actions, torch.Tensor):
        # self.actions = self.actions.cpu().numpy()
        for e in range(self.num_envs):
            action = self.actions[e]
            # if isinstance(self.envs[e].action_space, spaces.Discrete):
            #    action = int(action)

            (
                obs,
                self.buf_rews[e],
                self.buf_dones[e],
                self.buf_infos[e],
            ) = self.envs[e].step(action)
            if self.buf_dones[e]:
                obs = self.envs[e].reset()
            self._save_obs(e, obs)
        return (
            self._obs_from_buf(),
            np.copy(self.buf_rews),
            np.copy(self.buf_dones),
            self.buf_infos.copy(),
        )

    def reset(self):
        for e in range(self.num_envs):
            obs = self.envs[e].reset()
            self._save_obs(e, obs)
        return self._obs_from_buf()

    def _save_obs(self, e, obs):
        for k in self.keys:
            if k is None:
                self.buf_obs[k][e] = obs
            else:
                self.buf_obs[k][e] = obs[k]

    def _obs_from_buf(self):
        return dict_to_obs(copy_obs_dict(self.buf_obs))

    def get_images(self):
        return [env.render(mode='rgb_array') for env in self.envs]

    def render(self, mode='human'):
        if self.num_envs == 1:
            return self.envs[0].render(mode=mode)
        else:
            return super().render(mode=mode)

Ancestors

Methods

def render(self, mode='human')
Expand source code
def render(self, mode='human'):
    if self.num_envs == 1:
        return self.envs[0].render(mode=mode)
    else:
        return super().render(mode=mode)

Inherited members