Module ilpyt.agents.gcl_agent

The agent from the Guided Cost Learning (GCL) algorithm. This algorithm was described in the paper "Guided Cost Learning: Deep Inverse Optimal Control via Policy Optimization" by PChelsea Finn, Sergey Levine, and Pieter Abbeel, and presented at ICML 2016.

For more details, please refer to the paper:

Expand source code
The agent from the Guided Cost Learning (GCL) algorithm. This algorithm was 
described in the paper "Guided Cost Learning: Deep Inverse Optimal Control via 
Policy Optimization" by PChelsea Finn, Sergey Levine, and Pieter Abbeel, and 
presented at ICML 2016.

For more details, please refer to the paper:

from typing import Dict, Union

import numpy as np
import torch
from torch.optim import Adam

from ilpyt.agents.base_agent import BaseAgent
from ilpyt.nets.base_net import BaseNetwork

class GCLAgent(BaseAgent):
    def initialize(
        actor: Union[BaseNetwork, None] = None,
        cost: Union[BaseNetwork, None] = None,
        lr: float = 0.001,
        gamma: float = 0.99,
        clip_ratio: float = 0.1,
        entropy_coeff: float = 0.01,
        lcr_reg_cost: bool = False,
        mono_reg_cost: bool = False,
    ) -> None:
        Initialization function for the GCL Agent.

        actor: BaseNetwork, default=None
            actor network
        critic: BaseNetwork, default=None
            critic network
        lr: float, default=0.001
            learning rate
        gamma: float, default=0.99
            discount factor for calculating returns
        clip_ratio: float, default=0.1
            clipping parameter used in PPO loss function
        entropy_coeff: float, default=0.01
            entropy loss coefficient
        lcr_reg_cost: bool, default=False
            flag to add regularization term to demo and sample cost trajectories
        mono_reg_cost: bool, default=False
            flag to add mono regularization term to demo cost trajectory

            If `actor` or `critic` are not specified.
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        self.entropy_coeff = entropy_coeff

        # Networks
        if actor is None:
            raise ValueError(
                'Please provide input value for actor. Currently set to None.'
        if cost is None:
            raise ValueError(
                'Please provide input value for critic. Currently set to None.'
            ) = actor
        self.cost = cost
        self.nets = {'cost': self.cost, **}

        self.opt_cost = Adam(self.cost.parameters(), lr)
        self.mono_reg_cost = mono_reg_cost
        self.lcr_reg_cost = lcr_reg_cost

    def step(self, state: torch.Tensor) -> np.ndarray:
        Find best action for the given state according to the current policy.

        state: torch.Tensor
            state tensor, of size (batch_size, state_shape)

        np.ndarray: selected actions

    def update(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
        Update actor weights based on batch of experiences.

        batch: Dict[str, torch.Tensor]
            batch of experiences, with values of size 
            (num_steps, num_env, item_shape)

        Dict[str, float]: loss dictionary, with keys as tensorboard tags and 
                values as loss values to chart
        # Rewards
        rollout_steps = batch['states'].shape[0]
        with torch.no_grad():
            rewards = []
            for i in range(rollout_steps):
                reward = -self.cost(batch['states'][i], batch['actions'][i])
            rewards = torch.stack(rewards)
        batch['rewards'] = rewards


    def update_cost(
        states: torch.Tensor,
        actions: torch.Tensor,
        expert_states: torch.Tensor,
        expert_actions: torch.Tensor,
    ) -> Dict[str, float]:
        Update cost function weights based on batch of experiences.

        states: torch.Tensor
            agent states, of size (batch_size, state_shape)
        actions: torch.Tensor
            agent actions, of size (batch_size, action_shape)
        expert_states: torch.Tensor
            expert states, of size (batch_size, state_shape)
        expert_actions: torch.Tensor
            expert actions, of size (batch_size, action_shape)

        Dict[str, float]: loss dictionary, with keys as tensorboard tags and 
            values as loss values to log
        sample_cost = self.cost(states, actions).squeeze()
        demo_cost = self.cost(expert_states, expert_actions).squeeze()

        with torch.no_grad():
            dist, _ =
            log_probs = dist.log_prob(actions)
            if len(log_probs.shape) > 1:  # continuous action space
                log_probs = log_probs.sum(axis=-1)
            probs = torch.exp(log_probs)

        loss_ioc = torch.mean(demo_cost) + torch.log(
            torch.mean(torch.exp(-sample_cost) / (probs + 1e-7))

        return_log_dict = dict()
        # apply regularizers if you so wish
        # warning: computation time dramatically slower
        if self.lcr_reg_cost:
            demo_reg_lcr = self.apply_lcr_reg(demo_cost)
            sample_reg_lcr = self.apply_lcr_reg(sample_cost)
            loss_ioc += demo_reg_lcr + sample_reg_lcr
            return_log_dict["reg/demo_lcr"] = demo_reg_lcr.item()
            return_log_dict["reg/sample_lcr"] = sample_reg_lcr.item()

        if self.mono_reg_cost:
            demo_reg_mono = self.apply_mono_reg(demo_cost)
            loss_ioc += demo_reg_mono
            return_log_dict["reg/demo_mono"] = demo_reg_mono.item()

        torch.nn.utils.clip_grad_norm_(self.cost.parameters(), self.clip_ratio)

        return_log_dict['loss/sample_cost'] = torch.mean(sample_cost)
        return_log_dict['loss/demo_cost'] = torch.mean(demo_cost)
        return_log_dict['loss/ioc'] = loss_ioc.item()
        return return_log_dict

    def apply_lcr_reg(self, cost_traj_tensor: torch.Tensor):
        Update cost function with local constant rate regularization term.

        cost_traj_tensor: torch.Tensor
            cost tensor for the trajectory

        torch.tensor: constant rate regularization tensor
        cost_traj = cost_traj_tensor
        regularization_sum = torch.tensor(0.0)
        if self.device == 'gpu':
            regularization_sum = regularization_sum.cuda()

        for i in range(1, len(cost_traj) - 2):
            local_sum = (
                (cost_traj[i + 1] - cost_traj[i])
                - (cost_traj[i] - cost_traj[i - 1])
            ) ** 2
            regularization_sum += local_sum
        return regularization_sum

    def apply_mono_reg(self, cost_traj_tensor: torch.Tensor) -> torch.Tensor:
        Update cost function with monotonic regularization term.

        cost_traj_tensor: torch.Tensor
            cost tensor for the trajectory

        torch.Tensor: monotonic regularization cost
        cost_traj = cost_traj_tensor
        reg_sum = torch.tensor(0.0)
        zero_tensor = torch.tensor(0.0)
        if self.device == 'gpu':
            reg_sum = reg_sum.cuda()
            zero_tensor = zero_tensor.cuda()

        for i in range(1, len(cost_traj) - 1):
            local_max = (
                max(zero_tensor, cost_traj[i] - cost_traj[i - 1] - 1) ** 2
            reg_sum += local_max
        return reg_sum

    def to_gpu(self) -> None:
        Place agent nets on the GPU.
        super(GCLAgent, self).to_gpu()

    def to_cpu(self) -> None:
        Place agent nets on the CPU.
        super(GCLAgent, self).to_cpu()

    def set_train(self) -> None:
        Set agent nets to training mode.
        super(GCLAgent, self).set_train()

    def set_test(self) -> None:
        Set agent nets to evaluation mode.
        super(GCLAgent, self).set_test()


class GCLAgent (**kwargs)

By default, the agent will be in train mode and be configured to use the cpu for step and update calls.


**kwargs: arbitrary keyword arguments that will be passed to the initialize function

Expand source code
class GCLAgent(BaseAgent):
    def initialize(
        actor: Union[BaseNetwork, None] = None,
        cost: Union[BaseNetwork, None] = None,
        lr: float = 0.001,
        gamma: float = 0.99,
        clip_ratio: float = 0.1,
        entropy_coeff: float = 0.01,
        lcr_reg_cost: bool = False,
        mono_reg_cost: bool = False,
    ) -> None:
        Initialization function for the GCL Agent.

        actor: BaseNetwork, default=None
            actor network
        critic: BaseNetwork, default=None
            critic network
        lr: float, default=0.001
            learning rate
        gamma: float, default=0.99
            discount factor for calculating returns
        clip_ratio: float, default=0.1
            clipping parameter used in PPO loss function
        entropy_coeff: float, default=0.01
            entropy loss coefficient
        lcr_reg_cost: bool, default=False
            flag to add regularization term to demo and sample cost trajectories
        mono_reg_cost: bool, default=False
            flag to add mono regularization term to demo cost trajectory

            If `actor` or `critic` are not specified.
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        self.entropy_coeff = entropy_coeff

        # Networks
        if actor is None:
            raise ValueError(
                'Please provide input value for actor. Currently set to None.'
        if cost is None:
            raise ValueError(
                'Please provide input value for critic. Currently set to None.'
            ) = actor
        self.cost = cost
        self.nets = {'cost': self.cost, **}

        self.opt_cost = Adam(self.cost.parameters(), lr)
        self.mono_reg_cost = mono_reg_cost
        self.lcr_reg_cost = lcr_reg_cost

    def step(self, state: torch.Tensor) -> np.ndarray:
        Find best action for the given state according to the current policy.

        state: torch.Tensor
            state tensor, of size (batch_size, state_shape)

        np.ndarray: selected actions

    def update(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
        Update actor weights based on batch of experiences.

        batch: Dict[str, torch.Tensor]
            batch of experiences, with values of size 
            (num_steps, num_env, item_shape)

        Dict[str, float]: loss dictionary, with keys as tensorboard tags and 
                values as loss values to chart
        # Rewards
        rollout_steps = batch['states'].shape[0]
        with torch.no_grad():
            rewards = []
            for i in range(rollout_steps):
                reward = -self.cost(batch['states'][i], batch['actions'][i])
            rewards = torch.stack(rewards)
        batch['rewards'] = rewards


    def update_cost(
        states: torch.Tensor,
        actions: torch.Tensor,
        expert_states: torch.Tensor,
        expert_actions: torch.Tensor,
    ) -> Dict[str, float]:
        Update cost function weights based on batch of experiences.

        states: torch.Tensor
            agent states, of size (batch_size, state_shape)
        actions: torch.Tensor
            agent actions, of size (batch_size, action_shape)
        expert_states: torch.Tensor
            expert states, of size (batch_size, state_shape)
        expert_actions: torch.Tensor
            expert actions, of size (batch_size, action_shape)

        Dict[str, float]: loss dictionary, with keys as tensorboard tags and 
            values as loss values to log
        sample_cost = self.cost(states, actions).squeeze()
        demo_cost = self.cost(expert_states, expert_actions).squeeze()

        with torch.no_grad():
            dist, _ =
            log_probs = dist.log_prob(actions)
            if len(log_probs.shape) > 1:  # continuous action space
                log_probs = log_probs.sum(axis=-1)
            probs = torch.exp(log_probs)

        loss_ioc = torch.mean(demo_cost) + torch.log(
            torch.mean(torch.exp(-sample_cost) / (probs + 1e-7))

        return_log_dict = dict()
        # apply regularizers if you so wish
        # warning: computation time dramatically slower
        if self.lcr_reg_cost:
            demo_reg_lcr = self.apply_lcr_reg(demo_cost)
            sample_reg_lcr = self.apply_lcr_reg(sample_cost)
            loss_ioc += demo_reg_lcr + sample_reg_lcr
            return_log_dict["reg/demo_lcr"] = demo_reg_lcr.item()
            return_log_dict["reg/sample_lcr"] = sample_reg_lcr.item()

        if self.mono_reg_cost:
            demo_reg_mono = self.apply_mono_reg(demo_cost)
            loss_ioc += demo_reg_mono
            return_log_dict["reg/demo_mono"] = demo_reg_mono.item()

        torch.nn.utils.clip_grad_norm_(self.cost.parameters(), self.clip_ratio)

        return_log_dict['loss/sample_cost'] = torch.mean(sample_cost)
        return_log_dict['loss/demo_cost'] = torch.mean(demo_cost)
        return_log_dict['loss/ioc'] = loss_ioc.item()
        return return_log_dict

    def apply_lcr_reg(self, cost_traj_tensor: torch.Tensor):
        Update cost function with local constant rate regularization term.

        cost_traj_tensor: torch.Tensor
            cost tensor for the trajectory

        torch.tensor: constant rate regularization tensor
        cost_traj = cost_traj_tensor
        regularization_sum = torch.tensor(0.0)
        if self.device == 'gpu':
            regularization_sum = regularization_sum.cuda()

        for i in range(1, len(cost_traj) - 2):
            local_sum = (
                (cost_traj[i + 1] - cost_traj[i])
                - (cost_traj[i] - cost_traj[i - 1])
            ) ** 2
            regularization_sum += local_sum
        return regularization_sum

    def apply_mono_reg(self, cost_traj_tensor: torch.Tensor) -> torch.Tensor:
        Update cost function with monotonic regularization term.

        cost_traj_tensor: torch.Tensor
            cost tensor for the trajectory

        torch.Tensor: monotonic regularization cost
        cost_traj = cost_traj_tensor
        reg_sum = torch.tensor(0.0)
        zero_tensor = torch.tensor(0.0)
        if self.device == 'gpu':
            reg_sum = reg_sum.cuda()
            zero_tensor = zero_tensor.cuda()

        for i in range(1, len(cost_traj) - 1):
            local_max = (
                max(zero_tensor, cost_traj[i] - cost_traj[i - 1] - 1) ** 2
            reg_sum += local_max
        return reg_sum

    def to_gpu(self) -> None:
        Place agent nets on the GPU.
        super(GCLAgent, self).to_gpu()

    def to_cpu(self) -> None:
        Place agent nets on the CPU.
        super(GCLAgent, self).to_cpu()

    def set_train(self) -> None:
        Set agent nets to training mode.
        super(GCLAgent, self).set_train()

    def set_test(self) -> None:
        Set agent nets to evaluation mode.
        super(GCLAgent, self).set_test()



def apply_lcr_reg(self, cost_traj_tensor: torch.Tensor)

Update cost function with local constant rate regularization term.


cost_traj_tensor : torch.Tensor
cost tensor for the trajectory


torch.tensor: constant rate regularization tensor
Expand source code
def apply_lcr_reg(self, cost_traj_tensor: torch.Tensor):
    Update cost function with local constant rate regularization term.

    cost_traj_tensor: torch.Tensor
        cost tensor for the trajectory

    torch.tensor: constant rate regularization tensor
    cost_traj = cost_traj_tensor
    regularization_sum = torch.tensor(0.0)
    if self.device == 'gpu':
        regularization_sum = regularization_sum.cuda()

    for i in range(1, len(cost_traj) - 2):
        local_sum = (
            (cost_traj[i + 1] - cost_traj[i])
            - (cost_traj[i] - cost_traj[i - 1])
        ) ** 2
        regularization_sum += local_sum
    return regularization_sum
def apply_mono_reg(self, cost_traj_tensor: torch.Tensor) ‑> torch.Tensor

Update cost function with monotonic regularization term.


cost_traj_tensor : torch.Tensor
cost tensor for the trajectory


torch.Tensor: monotonic regularization cost
Expand source code
def apply_mono_reg(self, cost_traj_tensor: torch.Tensor) -> torch.Tensor:
    Update cost function with monotonic regularization term.

    cost_traj_tensor: torch.Tensor
        cost tensor for the trajectory

    torch.Tensor: monotonic regularization cost
    cost_traj = cost_traj_tensor
    reg_sum = torch.tensor(0.0)
    zero_tensor = torch.tensor(0.0)
    if self.device == 'gpu':
        reg_sum = reg_sum.cuda()
        zero_tensor = zero_tensor.cuda()

    for i in range(1, len(cost_traj) - 1):
        local_max = (
            max(zero_tensor, cost_traj[i] - cost_traj[i - 1] - 1) ** 2
        reg_sum += local_max
    return reg_sum
def initialize(self, actor: Union[BaseNetwork, NoneType] = None, cost: Union[BaseNetwork, NoneType] = None, lr: float = 0.001, gamma: float = 0.99, clip_ratio: float = 0.1, entropy_coeff: float = 0.01, lcr_reg_cost: bool = False, mono_reg_cost: bool = False) ‑> NoneType

Initialization function for the GCL Agent.


actor : BaseNetwork, default=None
actor network
critic : BaseNetwork, default=None
critic network
lr : float, default=0.001
learning rate
gamma : float, default=0.99
discount factor for calculating returns
clip_ratio : float, default=0.1
clipping parameter used in PPO loss function
entropy_coeff : float, default=0.01
entropy loss coefficient
lcr_reg_cost : bool, default=False
flag to add regularization term to demo and sample cost trajectories
mono_reg_cost : bool, default=False
flag to add mono regularization term to demo cost trajectory



If actor or critic are not specified.

Expand source code
def initialize(
    actor: Union[BaseNetwork, None] = None,
    cost: Union[BaseNetwork, None] = None,
    lr: float = 0.001,
    gamma: float = 0.99,
    clip_ratio: float = 0.1,
    entropy_coeff: float = 0.01,
    lcr_reg_cost: bool = False,
    mono_reg_cost: bool = False,
) -> None:
    Initialization function for the GCL Agent.

    actor: BaseNetwork, default=None
        actor network
    critic: BaseNetwork, default=None
        critic network
    lr: float, default=0.001
        learning rate
    gamma: float, default=0.99
        discount factor for calculating returns
    clip_ratio: float, default=0.1
        clipping parameter used in PPO loss function
    entropy_coeff: float, default=0.01
        entropy loss coefficient
    lcr_reg_cost: bool, default=False
        flag to add regularization term to demo and sample cost trajectories
    mono_reg_cost: bool, default=False
        flag to add mono regularization term to demo cost trajectory

        If `actor` or `critic` are not specified.
    self.gamma = gamma
    self.clip_ratio = clip_ratio
    self.entropy_coeff = entropy_coeff

    # Networks
    if actor is None:
        raise ValueError(
            'Please provide input value for actor. Currently set to None.'
    if cost is None:
        raise ValueError(
            'Please provide input value for critic. Currently set to None.'
        ) = actor
    self.cost = cost
    self.nets = {'cost': self.cost, **}

    self.opt_cost = Adam(self.cost.parameters(), lr)
    self.mono_reg_cost = mono_reg_cost
    self.lcr_reg_cost = lcr_reg_cost
def set_test(self) ‑> NoneType

Set agent nets to evaluation mode.

Expand source code
def set_test(self) -> None:
    Set agent nets to evaluation mode.
    super(GCLAgent, self).set_test()
def set_train(self) ‑> NoneType

Set agent nets to training mode.

Expand source code
def set_train(self) -> None:
    Set agent nets to training mode.
    super(GCLAgent, self).set_train()
def step(self, state: torch.Tensor) ‑> numpy.ndarray

Find best action for the given state according to the current policy.


state : torch.Tensor
state tensor, of size (batch_size, state_shape)


np.ndarray: selected actions
Expand source code
def step(self, state: torch.Tensor) -> np.ndarray:
    Find best action for the given state according to the current policy.

    state: torch.Tensor
        state tensor, of size (batch_size, state_shape)

    np.ndarray: selected actions
def to_cpu(self) ‑> NoneType

Place agent nets on the CPU.

Expand source code
def to_cpu(self) -> None:
    Place agent nets on the CPU.
    super(GCLAgent, self).to_cpu()
def to_gpu(self) ‑> NoneType

Place agent nets on the GPU.

Expand source code
def to_gpu(self) -> None:
    Place agent nets on the GPU.
    super(GCLAgent, self).to_gpu()
def update(self, batch: Dict[str, torch.Tensor]) ‑> Dict[str, float]

Update actor weights based on batch of experiences.


batch : Dict[str, torch.Tensor]
batch of experiences, with values of size (num_steps, num_env, item_shape)


Dict[str, float]: loss dictionary, with keys as tensorboard tags and
values as loss values to chart
Expand source code
def update(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
    Update actor weights based on batch of experiences.

    batch: Dict[str, torch.Tensor]
        batch of experiences, with values of size 
        (num_steps, num_env, item_shape)

    Dict[str, float]: loss dictionary, with keys as tensorboard tags and 
            values as loss values to chart
    # Rewards
    rollout_steps = batch['states'].shape[0]
    with torch.no_grad():
        rewards = []
        for i in range(rollout_steps):
            reward = -self.cost(batch['states'][i], batch['actions'][i])
        rewards = torch.stack(rewards)
    batch['rewards'] = rewards

def update_cost(self, states: torch.Tensor, actions: torch.Tensor, expert_states: torch.Tensor, expert_actions: torch.Tensor) ‑> Dict[str, float]

Update cost function weights based on batch of experiences.


states : torch.Tensor
agent states, of size (batch_size, state_shape)
actions : torch.Tensor
agent actions, of size (batch_size, action_shape)
expert_states : torch.Tensor
expert states, of size (batch_size, state_shape)
expert_actions : torch.Tensor
expert actions, of size (batch_size, action_shape)


Dict[str, float]: loss dictionary, with keys as tensorboard tags and
values as loss values to log
Expand source code
def update_cost(
    states: torch.Tensor,
    actions: torch.Tensor,
    expert_states: torch.Tensor,
    expert_actions: torch.Tensor,
) -> Dict[str, float]:
    Update cost function weights based on batch of experiences.

    states: torch.Tensor
        agent states, of size (batch_size, state_shape)
    actions: torch.Tensor
        agent actions, of size (batch_size, action_shape)
    expert_states: torch.Tensor
        expert states, of size (batch_size, state_shape)
    expert_actions: torch.Tensor
        expert actions, of size (batch_size, action_shape)

    Dict[str, float]: loss dictionary, with keys as tensorboard tags and 
        values as loss values to log
    sample_cost = self.cost(states, actions).squeeze()
    demo_cost = self.cost(expert_states, expert_actions).squeeze()

    with torch.no_grad():
        dist, _ =
        log_probs = dist.log_prob(actions)
        if len(log_probs.shape) > 1:  # continuous action space
            log_probs = log_probs.sum(axis=-1)
        probs = torch.exp(log_probs)

    loss_ioc = torch.mean(demo_cost) + torch.log(
        torch.mean(torch.exp(-sample_cost) / (probs + 1e-7))

    return_log_dict = dict()
    # apply regularizers if you so wish
    # warning: computation time dramatically slower
    if self.lcr_reg_cost:
        demo_reg_lcr = self.apply_lcr_reg(demo_cost)
        sample_reg_lcr = self.apply_lcr_reg(sample_cost)
        loss_ioc += demo_reg_lcr + sample_reg_lcr
        return_log_dict["reg/demo_lcr"] = demo_reg_lcr.item()
        return_log_dict["reg/sample_lcr"] = sample_reg_lcr.item()

    if self.mono_reg_cost:
        demo_reg_mono = self.apply_mono_reg(demo_cost)
        loss_ioc += demo_reg_mono
        return_log_dict["reg/demo_mono"] = demo_reg_mono.item()

    torch.nn.utils.clip_grad_norm_(self.cost.parameters(), self.clip_ratio)

    return_log_dict['loss/sample_cost'] = torch.mean(sample_cost)
    return_log_dict['loss/demo_cost'] = torch.mean(demo_cost)
    return_log_dict['loss/ioc'] = loss_ioc.item()
    return return_log_dict

Inherited members