Module ilpyt.algos.gcl

An implementation of the Guided Cost Learning (GCL) algorithm. This algorithm was described in the paper "Guided Cost Learning: Deep Inverse Optimal Control via Policy Optimization" by PChelsea Finn, Sergey Levine, and Pieter Abbeel, and presented at ICML 2016.

For more details, please refer to the paper:

Expand source code
An implementation of the Guided Cost Learning (GCL) algorithm. This algorithm 
was described in the paper "Guided Cost Learning: Deep Inverse Optimal Control 
via Policy Optimization" by PChelsea Finn, Sergey Levine, and Pieter Abbeel, and 
presented at ICML 2016.

For more details, please refer to the paper:

import copy
import logging
import pickle

import numpy as np
import torch

from ilpyt.agents.a2c_agent import A2CAgent
from ilpyt.agents.base_agent import BaseAgent
from ilpyt.agents.dqn_agent import DQNAgent
from ilpyt.agents.ppo_agent import PPOAgent
from ilpyt.algos.base_algo import BaseAlgorithm
from ilpyt.envs.vec_env import VecEnv
from ilpyt.runners.runner import Runner
from ilpyt.utils.agent_utils import flatten_batch

class GCL(BaseAlgorithm):
    def initialize(
        env: VecEnv,
        agent: BaseAgent,
        save_path: str = 'logs',
        load_path: str = '',
        use_gpu: bool = True,
        Initialization function for the GCL algorithm.

        env: VecEnv
            vectorized OpenAI Gym environment
        agent: BaseAgent
            agent for train and/or test
        save_path: str, default='logs'
            path to directory to save network weights
        load_path: str, default=''
            path to directory to load network weights. If not specified, network 
            weights will be randomly initialized
        use_gpu: bool, default=True
            flag indicating whether or not to run operations on the GPU

            if `agent` is not an instance of `A2CAgent`, `DQNAgent`, or `PPOAgent`
        self.env = env
        self.agent = agent
        self.use_gpu = use_gpu

        # Set up agent
        if (
            not isinstance(, A2CAgent)
            and not isinstance(, DQNAgent)
            and not isinstance(, PPOAgent)
            raise ValueError(
                'GCL is only compatible with A2C, DQN, and PPO actors.'
        if use_gpu:
        if load_path:

        # Set up runner
        self.runner = Runner(env, self.agent, use_gpu)

    def train(
        num_episodes: int = int(1e4),
        num_reward_updates: int = 10,
        batch_size: int = 128,
        expert_demos: str = 'demos.pkl',
    ) -> None:
        Train the agent within the specified environment.

        num_episodes: int
            Number of training episodes
        num_reward_updates: int
            Number of times we update loss per rollout
        batch_size: int
            size of batches used to calculate loss and update network
        expert_demos: str
            path to expert demos pickle file
        # Set train
        self.best_loss = np.float('inf')
        self.best_reward = np.float('-inf')
        self.reward_tracker = self.best_reward * np.ones(self.env.num_envs)

        # Expert demonstrations
        with open(expert_demos, 'rb') as f:
            demos = pickle.load(f)  # runner.Experiences
            if self.use_gpu:

        for i in range(num_episodes):
            # Generate samples
            batch = self.runner.generate_batch(64)
            flat_batch = flatten_batch(copy.deepcopy(batch))
            agent_batch_size = len(flat_batch['states'])
            expert_batch_size = len(demos.states)

            # Update cost function
            for j in range(num_reward_updates):
                selected_idxs = torch.randperm(expert_batch_size)[:batch_size]
                expert_states = demos.states[selected_idxs]
                expert_actions = demos.actions[selected_idxs]

                selected_idxs = torch.randperm(agent_batch_size)[:batch_size]
                states = flat_batch['states'][selected_idxs]
                actions = flat_batch['actions'][selected_idxs]

                states =[states, expert_states], dim=0)
                actions =[actions, expert_actions], dim=0)
                loss_cost_dict = self.agent.update_cost(
                    states, actions, expert_states, expert_actions

            # Update policy
            loss_reward_dict = self.agent.update(batch)
            # Log
            self.log(i, loss_cost_dict)
            self.log(i, loss_reward_dict)

            # Save agent
            loss = loss_cost_dict['loss/ioc'] + loss_reward_dict['loss/total']

            # Logging
            for ep_count, info_dict in batch['infos']:
                self.log(ep_count, info_dict)
                for (k, v) in info_dict.items():
                    if 'reward' in k:
                        agent_num = int(k.split('/')[1])
                        self.reward_tracker[agent_num] = v

            mean_reward = np.mean(self.reward_tracker)

            self.log(i, {'values/mean_reward': mean_reward})

            # added in a check to make sure we aren't counting initial low loss
            if (loss < self.best_loss and i > 1000) or i % 500 == 0:
      , i)
                self.best_loss = loss
                    "Save new best model at epoch %i with loss %0.4f."
                    % (i, loss)



class GCL (**kwargs: Any)


**kwargs: arbitrary keyword arguments. Will be passed to the initialize and setup_experiment functions

Expand source code
class GCL(BaseAlgorithm):
    def initialize(
        env: VecEnv,
        agent: BaseAgent,
        save_path: str = 'logs',
        load_path: str = '',
        use_gpu: bool = True,
        Initialization function for the GCL algorithm.

        env: VecEnv
            vectorized OpenAI Gym environment
        agent: BaseAgent
            agent for train and/or test
        save_path: str, default='logs'
            path to directory to save network weights
        load_path: str, default=''
            path to directory to load network weights. If not specified, network 
            weights will be randomly initialized
        use_gpu: bool, default=True
            flag indicating whether or not to run operations on the GPU

            if `agent` is not an instance of `A2CAgent`, `DQNAgent`, or `PPOAgent`
        self.env = env
        self.agent = agent
        self.use_gpu = use_gpu

        # Set up agent
        if (
            not isinstance(, A2CAgent)
            and not isinstance(, DQNAgent)
            and not isinstance(, PPOAgent)
            raise ValueError(
                'GCL is only compatible with A2C, DQN, and PPO actors.'
        if use_gpu:
        if load_path:

        # Set up runner
        self.runner = Runner(env, self.agent, use_gpu)

    def train(
        num_episodes: int = int(1e4),
        num_reward_updates: int = 10,
        batch_size: int = 128,
        expert_demos: str = 'demos.pkl',
    ) -> None:
        Train the agent within the specified environment.

        num_episodes: int
            Number of training episodes
        num_reward_updates: int
            Number of times we update loss per rollout
        batch_size: int
            size of batches used to calculate loss and update network
        expert_demos: str
            path to expert demos pickle file
        # Set train
        self.best_loss = np.float('inf')
        self.best_reward = np.float('-inf')
        self.reward_tracker = self.best_reward * np.ones(self.env.num_envs)

        # Expert demonstrations
        with open(expert_demos, 'rb') as f:
            demos = pickle.load(f)  # runner.Experiences
            if self.use_gpu:

        for i in range(num_episodes):
            # Generate samples
            batch = self.runner.generate_batch(64)
            flat_batch = flatten_batch(copy.deepcopy(batch))
            agent_batch_size = len(flat_batch['states'])
            expert_batch_size = len(demos.states)

            # Update cost function
            for j in range(num_reward_updates):
                selected_idxs = torch.randperm(expert_batch_size)[:batch_size]
                expert_states = demos.states[selected_idxs]
                expert_actions = demos.actions[selected_idxs]

                selected_idxs = torch.randperm(agent_batch_size)[:batch_size]
                states = flat_batch['states'][selected_idxs]
                actions = flat_batch['actions'][selected_idxs]

                states =[states, expert_states], dim=0)
                actions =[actions, expert_actions], dim=0)
                loss_cost_dict = self.agent.update_cost(
                    states, actions, expert_states, expert_actions

            # Update policy
            loss_reward_dict = self.agent.update(batch)
            # Log
            self.log(i, loss_cost_dict)
            self.log(i, loss_reward_dict)

            # Save agent
            loss = loss_cost_dict['loss/ioc'] + loss_reward_dict['loss/total']

            # Logging
            for ep_count, info_dict in batch['infos']:
                self.log(ep_count, info_dict)
                for (k, v) in info_dict.items():
                    if 'reward' in k:
                        agent_num = int(k.split('/')[1])
                        self.reward_tracker[agent_num] = v

            mean_reward = np.mean(self.reward_tracker)

            self.log(i, {'values/mean_reward': mean_reward})

            # added in a check to make sure we aren't counting initial low loss
            if (loss < self.best_loss and i > 1000) or i % 500 == 0:
      , i)
                self.best_loss = loss
                    "Save new best model at epoch %i with loss %0.4f."
                    % (i, loss)




def initialize(self, env: VecEnv, agent: BaseAgent, save_path: str = 'logs', load_path: str = '', use_gpu: bool = True)

Initialization function for the GCL algorithm.


env : VecEnv
vectorized OpenAI Gym environment
agent : BaseAgent
agent for train and/or test
save_path : str, default='logs'
path to directory to save network weights
load_path : str, default=''
path to directory to load network weights. If not specified, network weights will be randomly initialized
use_gpu : bool, default=True
flag indicating whether or not to run operations on the GPU



if agent is not an instance of A2CAgent, DQNAgent, or PPOAgent

Expand source code
def initialize(
    env: VecEnv,
    agent: BaseAgent,
    save_path: str = 'logs',
    load_path: str = '',
    use_gpu: bool = True,
    Initialization function for the GCL algorithm.

    env: VecEnv
        vectorized OpenAI Gym environment
    agent: BaseAgent
        agent for train and/or test
    save_path: str, default='logs'
        path to directory to save network weights
    load_path: str, default=''
        path to directory to load network weights. If not specified, network 
        weights will be randomly initialized
    use_gpu: bool, default=True
        flag indicating whether or not to run operations on the GPU

        if `agent` is not an instance of `A2CAgent`, `DQNAgent`, or `PPOAgent`
    self.env = env
    self.agent = agent
    self.use_gpu = use_gpu

    # Set up agent
    if (
        not isinstance(, A2CAgent)
        and not isinstance(, DQNAgent)
        and not isinstance(, PPOAgent)
        raise ValueError(
            'GCL is only compatible with A2C, DQN, and PPO actors.'
    if use_gpu:
    if load_path:

    # Set up runner
    self.runner = Runner(env, self.agent, use_gpu)
def train(self, num_episodes: int = 10000, num_reward_updates: int = 10, batch_size: int = 128, expert_demos: str = 'demos.pkl') ‑> NoneType

Train the agent within the specified environment.


num_episodes : int
Number of training episodes
num_reward_updates : int
Number of times we update loss per rollout
batch_size : int
size of batches used to calculate loss and update network
expert_demos : str
path to expert demos pickle file
Expand source code
def train(
    num_episodes: int = int(1e4),
    num_reward_updates: int = 10,
    batch_size: int = 128,
    expert_demos: str = 'demos.pkl',
) -> None:
    Train the agent within the specified environment.

    num_episodes: int
        Number of training episodes
    num_reward_updates: int
        Number of times we update loss per rollout
    batch_size: int
        size of batches used to calculate loss and update network
    expert_demos: str
        path to expert demos pickle file
    # Set train
    self.best_loss = np.float('inf')
    self.best_reward = np.float('-inf')
    self.reward_tracker = self.best_reward * np.ones(self.env.num_envs)

    # Expert demonstrations
    with open(expert_demos, 'rb') as f:
        demos = pickle.load(f)  # runner.Experiences
        if self.use_gpu:

    for i in range(num_episodes):
        # Generate samples
        batch = self.runner.generate_batch(64)
        flat_batch = flatten_batch(copy.deepcopy(batch))
        agent_batch_size = len(flat_batch['states'])
        expert_batch_size = len(demos.states)

        # Update cost function
        for j in range(num_reward_updates):
            selected_idxs = torch.randperm(expert_batch_size)[:batch_size]
            expert_states = demos.states[selected_idxs]
            expert_actions = demos.actions[selected_idxs]

            selected_idxs = torch.randperm(agent_batch_size)[:batch_size]
            states = flat_batch['states'][selected_idxs]
            actions = flat_batch['actions'][selected_idxs]

            states =[states, expert_states], dim=0)
            actions =[actions, expert_actions], dim=0)
            loss_cost_dict = self.agent.update_cost(
                states, actions, expert_states, expert_actions

        # Update policy
        loss_reward_dict = self.agent.update(batch)
        # Log
        self.log(i, loss_cost_dict)
        self.log(i, loss_reward_dict)

        # Save agent
        loss = loss_cost_dict['loss/ioc'] + loss_reward_dict['loss/total']

        # Logging
        for ep_count, info_dict in batch['infos']:
            self.log(ep_count, info_dict)
            for (k, v) in info_dict.items():
                if 'reward' in k:
                    agent_num = int(k.split('/')[1])
                    self.reward_tracker[agent_num] = v

        mean_reward = np.mean(self.reward_tracker)

        self.log(i, {'values/mean_reward': mean_reward})

        # added in a check to make sure we aren't counting initial low loss
        if (loss < self.best_loss and i > 1000) or i % 500 == 0:
  , i)
            self.best_loss = loss
                "Save new best model at epoch %i with loss %0.4f."
                % (i, loss)


Inherited members