from abc import ABC, abstractmethod
from copy import deepcopy
from functools import partial
from pathlib import Path
import casadi as ca
import numpy as np
from acados_template import (
AcadosOcp,
AcadosSim,
AcadosSimOptions,
AcadosSimSolver,
)
from leap_c.mpc import Mpc, MpcParameter
from leap_c.utils import AcadosFileManager
[docs]
def create_dynamics_from_mpc(
mpc: Mpc, export_dir: None | Path = None, sens_forw=False, dt=None
) -> "Dynamics":
"""Create a dynamics object from an MPC object."""
ocp: AcadosOcp = mpc.ocp
# if there is a discrete dynamics function, use it
if ocp.model.disc_dyn_expr is not None:
return CasadiDynamics(mpc)
# otherwise we create a AcadosSim object
assert ocp.model.f_expl_expr is not None
# create sim opts
sim_opts = AcadosSimOptions()
# assert sim_method_num_stages all equal
# does this makes sense?
sim_opts.num_steps = ocp.solver_options.sim_method_num_steps
sim_opts.newton_iter = ocp.solver_options.sim_method_newton_iter
sim_opts.integrator_type = ocp.solver_options.integrator_type
# create sim
sim = AcadosSim()
sim.solver_options = sim_opts
sim.solver_options.sens_forw = sens_forw
sim.model = deepcopy(ocp.model)
sim.parameter_values = ocp.parameter_values
if dt is None:
dt = ocp.solver_options.tf / ocp.solver_options.N_horizon # type: ignore
sim.solver_options.T = dt
return SimDynamics(sim, export_dir=export_dir)
[docs]
class Dynamics(ABC):
@abstractmethod
def __call__(
self,
x: np.ndarray,
u: np.ndarray,
p: np.ndarray | None,
with_sens: bool = False,
) -> np.ndarray | tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Step the dynamics.
Args:
x: The current state.
u: The current input.
p: The current parameter.
with_sens: Whether to return the sensitivity.
Returns:
The next state.
"""
...
[docs]
class SimDynamics(Dynamics):
[docs]
def __init__(
self, sim: AcadosSim, export_dir: Path | None = None, cleanup: bool = True
):
self.sim = sim
afm = AcadosFileManager(export_dir, cleanup=cleanup)
self._sim_solver_fn = partial(afm.setup_acados_sim_solver, sim)
self._sim_solver = None
@property
def sim_solver(self) -> AcadosSimSolver:
if self._sim_solver is None:
self._sim_solver = self._sim_solver_fn()
return self._sim_solver
def __call__(
self,
x: np.ndarray,
u: np.ndarray,
p: np.ndarray | None = None,
with_sens: bool = False,
) -> np.ndarray | tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Step the dynamics.
Args:
x: The current state.
u: The current input.
p: The current parameter.
with_sens: Whether to return the sensitivity.
Returns:
The next state and the sensitivity if with_sens is True.
"""
self.sim_solver.set("x", x)
self.sim_solver.set("u", u)
if p is not None:
self.sim_solver.set("p", p)
self.sim_solver.solve()
if not with_sens:
return self.sim_solver.get("x") # type: ignore
Sx = self.sim_solver.get("Sx")
Su = self.sim_solver.get("Su")
return self.sim_solver.get("x"), Sx, Su # type: ignore
[docs]
class CasadiDynamics(Dynamics):
[docs]
def __init__(self, mpc: Mpc):
self.mpc = mpc
ocp = mpc.ocp
inputs = [ocp.model.x, ocp.model.u]
names = ["x", "u"]
if self.mpc.default_p_global is not None:
inputs.append(ocp.model.p_global)
names.append("p_global")
if self.mpc.default_p_stagewise is not None:
inputs.append(ocp.model.p)
names.append("p")
expr = ocp.model.disc_dyn_expr
self.dyn_fn = ca.Function("dyn", inputs, [expr], names, ["x_next"])
# generate the jacobian
inputs_cat = ca.vertcat(*inputs)
jac_expr = ca.jacobian(expr, inputs_cat)
self.jac_fn = ca.Function("jtimes", [inputs_cat], [jac_expr])
def __call__(
self,
x: np.ndarray,
u: np.ndarray,
p: MpcParameter | None = None,
with_sens: bool = False,
) -> np.ndarray | tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Step the dynamics.
If the parameter is not provided, the current parameter of the MPC object is used.
Args:
x: The current state.
u: The current input.
p: The current parameter.
with_sens: Whether to return the sensitivity.
Returns:
The next state.
"""
x_shape = x.shape
batched = True if len(x_shape) > 1 else False
inputs = [x, u]
p_global, p_stage = self.mpc.fetch_param(p, 0)
if p_global is not None:
if batched:
p_global = np.tile(p_global, (x_shape[0], 1))
inputs.append(p_global)
if p_stage is not None:
if batched:
p_stage = np.tile(p_stage, (x_shape[0], 1))
inputs.append(p_stage)
# transpose inputs to match casadi batch format
inputs = [i.T for i in inputs]
output = self.dyn_fn(*inputs).full().T.reshape(x_shape) # type: ignore
if not with_sens:
return output
# compute the jacobian
sizes = [i.shape[0] for i in inputs]
splits = np.cumsum(sizes)[:-1]
inputs_cat = np.concatenate(inputs, axis=0)
jac = self.jac_fn(inputs_cat).full().T # type: ignore
if len(x_shape) > 1:
jac = jac.reshape(x_shape[0], -1, x_shape[1])
Sx, Su = np.split(jac, splits, axis=1)[:2]
else:
Sx, Su = np.split(jac, splits, axis=0)[:2]
return output, Sx, Su