"""Contains classes that provide the ability to run multiple environments in subprocesses."""
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import gym
[docs]class MultiEnv(object):
"""An environment that maintains multiple :obj:`SubprocessEnv`\s and executes them in parallel.
The environments will be reset automatically when a terminal state is reached. That means that :meth:`reset`
actually only has to be called once in the beginning.
"""
[docs] def __init__(self, envs):
"""
Args:
envs (:obj:`list` of :obj:`SubprocessEnv`):
The environments. The observation and action spaces must be equal across the environments.
"""
super().__init__()
self._envs = [_AutoResetWrapper(env) for env in envs]
self._executor = concurrent.futures.ThreadPoolExecutor(len(self._envs))
@property
def envs(self):
""":obj:`list` of :obj:`gym.Env`:
The environments.
"""
return self._envs
@property
def observation_space(self):
""":obj:`gym.spaces.Space`:
The observation space used by all environments.
"""
return self._envs[0].observation_space
@property
def action_space(self):
""":obj:`gym.spaces.Space`:
The action space used by all environments.
"""
return self._envs[0].action_space
[docs] def reset(self):
"""Resets all environments.
Returns:
:obj:`list`:
A list of observations received from each environment.
"""
observations = list(self._executor.map(lambda env: env.reset(), self._envs))
return observations
[docs] def step(self, actions):
"""Proceeds one step in all environments.
Args:
actions (:obj:`list`):
A list of actions to be executed in the environments.
Returns:
:obj:`tuple`:
A tuple of (`observations`, `rewards`, `terminals`, `infos`). Each element is a list containing the
values received from the environments.
"""
def call_step(env_action):
env, action = env_action
if action is None:
return None, None, None, None
return env.step(action)
observations, rewards, terminals, infos = zip(*list(
self._executor.map(call_step, zip(self._envs, actions))))
return list(observations), list(rewards), list(terminals), list(infos)
[docs] def close(self):
"""Closes all environments.
"""
for env in self._envs:
self._executor.submit(env.close)
self._executor.shutdown()
[docs]def create_subprocess_envs(env_fns):
"""Utility function that creates environments by calling the functions in `env_fns` and wrapping the returned
environments in :obj:`SubprocessEnv`\s. They will be started and initialized in parallel.
Args:
env_fns (:obj:`list` of :obj:`callable`):
A list of functions that return a :obj:`gym.Env`. They should not be instances of :obj:`SubprocessEnv`.
Returns:
:obj:`list` of :obj:`SubprocessEnv`:
A list of the created environments.
"""
# create processes and let them create the environments in parallel
envs = []
for env_fn in env_fns:
env = SubprocessEnv(env_fn)
env.start()
envs.append(env)
# call `initialize()`, which blocks the execution, in parallel using multiple threads
# this also ensures that the creation of the environments is finished when returning from this function
with concurrent.futures.ThreadPoolExecutor(len(envs)) as executor:
for env in envs:
executor.submit(env.initialize)
return envs
[docs]class _AutoResetWrapper(gym.Wrapper):
def __init__(self, env):
super().__init__(env)
self._terminated = False
[docs] def step(self, action):
if self._terminated:
self.env.reset()
observation, reward, terminal, info = self.env.step(action)
self._terminated = terminal
return observation, reward, terminal, info
[docs] def reset(self, **kwargs):
observation = self.env.reset(**kwargs)
self._terminated = False
return observation
[docs]class SubprocessEnv(gym.Env):
"""Maintains a :obj:`gym.Env` inside a subprocess, so it can run concurrently. If the subprocess ends unexpectedly,
it will be recreated automatically without interrupting the execution.
To use the subprocess :meth:`start` has to be called first. After that :meth:`initialize` has to be called to
retrieve the observation space and the action space from the underlying environment. The purpose of these methods is
that multiple :obj:`SubprocessEnv`\s can be created and started in parallel without blocking the execution, which
creates the underlying :obj:`gym.Env` already. Afterwards :meth:`start`, which blocks the execution, can be called
in parallel. See :meth:`create_subprocess_envs` which implements this idea.
"""
class _Command:
INIT, STEP, RESET, RENDER, CLOSE = range(5)
[docs] def __init__(self, env_fn):
"""
Args:
env_fn (:obj:`callable`):
A function that returns a :obj:`gym.Env`. It will be called inside the subprocess, so watch out for
referencing variables on the main process or the like. It possibly will be called multiple times, since
the subprocess will be recreated when it unexpectedly ends.
"""
self._env_fn = env_fn
self._parent_connection, self._child_connection, self._process = None, None, None
self._started = False
self._initialized = False
self._reset()
self._action_space = None
self._observation_space = None
def _check_closed(self):
if self._process is None:
raise ValueError('The subprocess was closed already.')
def _check_initialized(self, method):
self._check_closed()
if not self._started:
raise ValueError('The subprocess is not started yet. '
'Call \'start()\' and \'initialize()\' before calling \'{}\'.'.format(method))
if not self._initialized:
raise ValueError('The subprocess is not initialized yet. '
'Call \'initialize()\' before calling \'{}\'.'.format(method))
def _reset(self):
self._started = False
self._initialized = False
self._parent_connection, self._child_connection = multiprocessing.Pipe(duplex=True)
self._process = multiprocessing.Process(target=SubprocessEnv._child_worker,
args=(self._child_connection, self._env_fn), name='SubprocessEnv')
self._process.daemon = True
[docs] def start(self):
"""Starts the subprocess. Does not block. You should call :meth:`initialize` afterwards.
"""
self._check_closed()
if not self._started:
self._process.start()
self._child_connection.close()
self._started = True
[docs] def initialize(self):
"""Retrieves the observation space and the action space from the environment in the subprocess. This method
blocks until the execution is finished. :meth:`start` must have been called.
"""
self._check_closed()
if not self._started:
raise ValueError('The subprocess is not started yet. Call \'start()\' before \'initialize()\'.')
if not self._initialized:
self._action_space, self._observation_space = self._communicate(SubprocessEnv._Command.INIT)
self._initialized = True
def _communicate(self, command, arg=None):
try:
return self._unsafe_communicate(command, arg)
except (BrokenPipeError, ConnectionResetError, EOFError):
# FIXME small bug: also restarts when exiting program with KeyboardInterrupt
# restart process when connection is lost
self._parent_connection.close()
self._child_connection.close()
self._process.terminate()
self._process.join()
self._reset()
self.start()
# initialize
self._action_space, self._observation_space = self._unsafe_communicate(SubprocessEnv._Command.INIT)
self._initialized = True
if command != SubprocessEnv._Command.RESET:
self._unsafe_communicate(SubprocessEnv._Command.RESET)
return self._unsafe_communicate(command, arg)
def _unsafe_communicate(self, command, arg=None):
self._parent_connection.send((command, arg))
return self._parent_connection.recv()
@property
def action_space(self):
""":obj:`gym.spaces.Space`:
The action space of the underlying environment. Does not block the execution. :meth:`start` and
:meth:`initialize` must have been called.
"""
self._check_initialized('action_space')
return self._action_space
@property
def observation_space(self):
""":obj:`gym.spaces.Space`:
The observation space of the underlying environment. Does not block the execution. :meth:`start` and
:meth:`initialize` must have been called.
"""
self._check_initialized('observation_space')
return self._observation_space
[docs] def step(self, action):
"""Remotely calls :meth:`gym.Env.step` in the underlying environment. This method blocks until execution is
finished. :meth:`start` and :meth:`initialize` must have been called.
Args:
action:
The `action` argument passed to :meth:`gym.Env.step`.
Returns:
:obj:`tuple`:
A tuple of (`observation`, `reward`, `terminal`, `info`). The values returned by
:meth:`gym.Env.step`.
"""
self._check_initialized('step()')
return self._communicate(SubprocessEnv._Command.STEP, action)
[docs] def reset(self, **kwargs):
"""Remotely calls :meth:`gym.Env.reset` in the underlying environment. This method blocks until execution is
finished. :meth:`start` and :meth:`initialize` must have been called.
Args:
kwargs (:obj:`dict`):
Keyword arguments passed to :meth:`gym.Env.reset`.
Returns:
The value returned by :meth:`gym.Env.reset`.
"""
self._check_initialized('reset()')
return self._communicate(SubprocessEnv._Command.RESET, kwargs)
[docs] def render(self, mode='human'):
"""Remotely calls :meth:`gym.Env.render` in the subprocess. This methods blocks until execution is finished.
:meth:`start` and :meth:`initialize` must have been called.
Args:
mode (:obj:`str`):
The `mode` argument passed to :meth:`gym.Env.render`.
Returns:
The value returned by :meth:`gym.Env.render`.
"""
self._check_initialized('render()')
if mode == 'human':
mode = None # treat 'human' as default mode, so don't need to send
result = self._communicate(SubprocessEnv._Command.RENDER, mode)
return result
[docs] def close(self):
"""Closes the subprocess.
"""
self._check_closed()
if self._started:
try:
if self._process.is_alive():
self._parent_connection.send((SubprocessEnv._Command.CLOSE, None))
self._parent_connection.close()
self._child_connection.close()
except (BrokenPipeError, ConnectionResetError, EOFError):
pass
self._started = False
self._initialized = False
self._process.join()
self._process = None
@staticmethod
def _child_worker(child_connection, env_fn):
env = None
try:
while True:
command, arg = child_connection.recv()
response = None
if command == SubprocessEnv._Command.INIT:
env = env_fn()
response = (env.action_space, env.observation_space)
elif command == SubprocessEnv._Command.STEP:
action = arg
response = env.step(action)
elif command == SubprocessEnv._Command.RESET:
kwargs = arg
if kwargs is None:
kwargs = dict()
response = env.reset(**kwargs)
elif command == SubprocessEnv._Command.RENDER:
mode = arg
if mode is None:
mode = 'human'
response = env.render(mode)
elif command == SubprocessEnv._Command.CLOSE:
break
child_connection.send(response)
except KeyboardInterrupt:
pass
if env is not None:
env.close()
child_connection.close()