Reputation: 1
I'm trying to replicate the following paper https://groups.csail.mit.edu/robotics-center/public_papers/Platt10.pdf regarding planning in the belief space.
To do so, I'm using pydrake.solvers to solve a quadratic problem with non linear constraints.
Here is the code where I use pydrake:
import numpy as np
import sympy as sp
from pydrake.all import (
MathematicalProgram,
Solve,
Variables,
Jacobian,
Expression,
Variable,
)
from pydrake import forwarddiff
from pydrake.math import inv
class BSP:
def __init__(
self,
systemDynamicFunction, # function describing system dynamics
observationFunction, # function describing observation model
jacSysD, # Jacobian for system dynamics
jacSysO, # Jacobian for observation model
jacSysDU, # Jacobian for system dynamics w.r.t. inputs
noiseObs, # function for observation noise
noiseSysD, # function for system noise
QLarge, # cost matrix for final belief mean deviation
L, # cost matrix for covariance
Q, # cost matrix for mean deviation
R, # cost matrix for input deviation
planningHorizon=60,
delta=5,
):
self._systemDynamicFunction = systemDynamicFunction
self._observationFunction = observationFunction
self._jacSysD = jacSysD
self._jacSysO = jacSysO
self._jacSysDU = jacSysDU
self._noiseObs = noiseObs
self._noiseSysD = noiseSysD
self._QLarge = QLarge
self._L = L
self._Q = Q
self._R = R
self._planningHorizon = planningHorizon
self._delta = delta
self._numSegments = int(planningHorizon / delta)
self._stateDim = Q.shape[0]
self._inputDim = R.shape[0]
def _unpackSimmMatrix(self,x,n):
"""given a vector x containing the stacked matrix and the dimension n of the matrix,
it returns the matrix"""
matrix = np.empty((n, n), dtype=object)
# Fill the upper triangular part (including diagonal)
index = 0
for i in range(n):
for j in range(i, n): # j starts from i to fill only the upper triangle
matrix[i, j] = x[index]
matrix[j, i] = x[index] # Symmetry
index += 1
return matrix
def _packSimmMatrix(self, matrix):
"""given a matrix, it returns the stacked vector"""
n = matrix.shape[0]
x = np.empty(int(n * (n + 1) / 2),dtype=object)
index = 0
for i in range(n):
for j in range(i, n):
x[index] = matrix[i, j]
index += 1
return x
def simpEKF(self, mt, Sigmat, ut):
At = self._jacSysD(mt, ut)
Ct = self._jacSysO(mt)
if Sigmat.shape != (3,3):
Sigmat = self._unpackSimmMatrix(Sigmat, self._stateDim)
GammaT = At @ Sigmat @ At.T
Wt = self._noiseObs(mt)
Kt =np.dot(GammaT,Ct.T @ inv(Ct @ GammaT @ Ct.T + Wt))
mt1 = self._systemDynamicFunction(mt, ut) + self._noiseSysD(mt).reshape(-1)
Sigmat1 = GammaT - Kt @ Ct @ GammaT
return mt1, Sigmat1
def _beliefPropagation(self, mt, Sigmat, ut):
"""Symbolic belief propagation."""
bSeg = []
bSeg.append(self.simpEKF(mt, Sigmat, ut))
mSum = bSeg[0][0]
sigmaSum = bSeg[0][1]
for t in range(1,self._delta):
bSeg.append(self.simpEKF(bSeg[t-1][0],bSeg[t-1][1],ut))
for t in range(len(bSeg)):
if t + 1 < len(bSeg) -1:
mSum +=bSeg[t+1][0] - bSeg[t][0]
sigmaSum +=bSeg[t+1][1] - bSeg[t][1]
return mSum,sigmaSum
def _constructOptimizationProblem(self, bInit, bDes, uInit, controlBounds):
prog = MathematicalProgram()
state_vars = []
sigma_vars = []
input_vars = []
symmetrixMatrixDim = int(self._stateDim * (self._stateDim + 1)/2)
# Define variables for each segment
for i in range(self._numSegments):
mi = prog.NewContinuousVariables(self._stateDim, f"m_{i}")
si = prog.NewContinuousVariables(symmetrixMatrixDim, f"Sigma_{i}")
ui = prog.NewContinuousVariables(self._inputDim, f"u_{i}")
state_vars.append(mi)
sigma_vars.append(si)
input_vars.append(ui)
# Constraints for belief dynamics
for i in range(self._numSegments - 1):
mi, si, ui = state_vars[i], sigma_vars[i], input_vars[i]
mi1, si1, _ = state_vars[i + 1], sigma_vars[i + 1], input_vars[i + 1]
# Propagate belief and add constraints
m_next, Sigma_next = self._beliefPropagation(mi, si, ui)
for n in range(self._stateDim):
prog.AddConstraint(m_next[n] - mi1[n] == 0)
Sigma_next = self._packSimmMatrix(Sigma_next)
for m in range(symmetrixMatrixDim):
prog.AddConstraint(Sigma_next[m] == si1[m])
# Constraints for initial beliefs
for n in range(self._stateDim):
prog.AddConstraint(state_vars[0][n] == bInit[0][n])
# Input bounds
for ui in input_vars:
for i in range(self._inputDim):
prog.AddConstraint(ui[i] >= -5)
prog.AddConstraint(ui[i] <= 5)
# Define the cost function
cost_expr = Expression(0)
for i in range(self._numSegments):
mi, si, ui = state_vars[i], sigma_vars[i], input_vars[i]
cost_expr += (mi - bDes[0]).T @ self._Q @ (mi - bDes[0])
cost_expr += ui.T @ self._R @ ui
cost_expr += si.T @ self._L @ si
prog.AddCost(cost_expr)
# Save the program to a .tex file
with open("optimization_problem.tex", "w") as f:
f.write(prog.ToLatex())
print("prog ready")
return prog, state_vars, sigma_vars, input_vars
def create_plan(self, bInit, bDes, uInit, controlBounds=[-5, 5], optOptions={}):
prog, state_vars, sigma_vars, input_vars = self._constructOptimizationProblem(
bInit, bDes, uInit, controlBounds
)
result = Solve(prog)
if not result.is_success():
raise RuntimeError("Optimization failed!")
return {
"states": [result.GetSolution(v) for v in state_vars],
"covariances": [self._unpackSimmMatrix(result.GetSolution(v),self._stateDim) for v in sigma_vars],
"inputs": [result.GetSolution(v) for v in input_vars],
}
When I run the following code I get no error but the program gets stuck after the return of the method _beliefPropagation
and after a while, the process gets killed with no error output.
In that method 1 of the optimization variables is integrated (better, the integral is approximated with a summation) for a few timestamps and that seems to get the program stuck. I'm new to drake, could someone point to me in the right direction please?
EDIT
To run the full simulation I use the following code to call the class defined earlier:
import json
import numpy as np
from pydrake.all import MathematicalProgram, Solve
from pydrake.symbolic import sqrt as sym_sqrt
import math
from BSPDrake import BSP
from realTimePlotBSP import RealTimePlot
from autograd import jacobian
class RobotControl(BSP):
def __init__(self,settingsFile):
self.noiseJ = 1000
settings = json.load(open(settingsFile))
self.meanThreshold = settings['meanThreshold']
self.stdThreshold = settings['stdThreshold']
self.planningThreshold = settings['planningThreshold']
self.tau = settings['tau']
self.bigTau = settings['bigTau']
pathFileName = settings['pathFile']
self.mgoal =np.array([120, 75, 0])
self.sigmagoal = np.array([[.4,.02,.02],[.02,.4,.02],[.02,.02,.4]])
m0 = settings['m0']
sigma0 = np.array([[1,.2,.2],[.2,1,.2],[.2,.2,1]])
self.logFilePath = settings['logFilePath']
self.Q = 0.05*np.eye(3)
self.QQ = 5000*np.eye(3)
self.S=0.5*np.eye(3,3)
self.R =0.05* np.eye(3)
lambd = settings['lambda']
# self.LAMBDA = lambd * np.eye(9)
self.LAMBDA = lambd * np.eye(6)
self.m =np.array(m0)
self.sigma = sigma0
self.A =np.eye(3)
self.B = np.eye(3)
self.C = np.eye(3)
self.timeStep = 0
self.delta = 5
_,self.W = self.computeNoise(self.m)
self.initLogFile()
self.u= np.array([0,0,0])
self.logData(np.array([0,0,0]))
super().__init__(systemDynamicFunction=self.sysDynamics,
observationFunction=self.obsFunction,
jacSysD=self.jacSys,
jacSysO=self.jacObs,
jacSysDU=self.jacObs,
noiseObs=self.noiseObs,
noiseSysD=self.noiseSys,
QLarge=self.QQ,
R=self.R,
Q=self.Q,
L=self.LAMBDA,
delta=self.delta,
)
def sysDynamics(self,m,u):
return m+u
def obsFunction(self,m):
return m
def jacSys(self,m,u):
return np.eye(3)
def jacObs(self,m):
return np.eye(3)
def noiseObs(self,m):
_,W = self.computeNoise(m)
return W
def noiseSys(self,m):
v,_ = self.computeNoise(m)
return v
def EndOptimizationCallback(self,optRes,**kwa):
if optRes is not None:
# plan = self.unpackOptResult(optRes)
# mbar,sigmabar,ubar = [],[],[]
# for i in range(len(plan)):
# mbari,sigmabari,ubari = plan[i]
# mbar.append(mbari)
# sigmabar.append(sigmabari)
# ubar.append(ubari)
mbar = optRes['states']
sigmabar = optRes['covariances']
ubar = optRes['inputs']
else:
mbar,sigmabar,ubar = kwa['mbar'],kwa['sigmabar'],kwa['ubar']
kwargs = {
"m": self.m.reshape(3,),
"sigma": self.sigma,
'mbar': mbar,
'sigmabar': sigmabar,
'minNoisePos': np.array([-50,150]),#self.Points[self.noiseJ,:],
'ubar': ubar,
'W': self.W,
}
self.plotFunction(**kwargs)
def initLogFile(self):
with open(self.logFilePath, 'w') as file:
# Create an empty structure to hold the logs
json.dump({"logs": []}, file, indent=4)
print(f"Initialized log file at: {self.logFilePath}")
def planning(self):
while np.linalg.norm(self.mgoal.reshape(3,1) - self.m.reshape(3,1)) > self.meanThreshold or np.linalg.norm(self.sigmagoal - self.sigma) > self.stdThreshold:
mbar,ubar,sigmabar = self.obtain_plan()
for t in range(self.bigTau - self.tau):
# self.sigma = np.array([[sigmabar[0], sigmabar[1], sigmabar[2]], [sigmabar[1], sigmabar[3], sigmabar[4]], [sigmabar[2],sigmabar[4],sigmabar[5]]])
u = self.control(mbar[t],ubar[t])
self.m,self.sigma = self.EKF(ustar=u)
self.u = u
self.EndOptimizationCallback(None,mbar=mbar,ubar=ubar,sigmabar=sigmabar)
self.logData(u)
if np.linalg.norm(np.array(mbar[t][0:2]).reshape(2,1) - self.m[0:2].reshape(2,1)) > self.planningThreshold :
print("mbar",mbar[t][0:2])
print("m",self.m[0:2])
print("Replanning")
break
def obtain_plan(self):
numSegments = self._numSegments
bInit = (self.m.reshape(3,),self.sigma)
bDes = (self.mgoal.reshape(3,),self.sigmagoal)
uInit = []
for i in range(numSegments):
uInit.append(self.u.reshape(3,))
optRes = self.create_plan(bInit,bDes,uInit)
self.EndOptimizationCallback(optRes)
# plan = self.unpackOptResult(optRes)
# mbar,sigmabar,ubar = [],[],[]
# for i in range(len(plan)):
# mbari,sigmabari,ubari = plan[i]
# mbar.append(mbari)
# sigmabar.append(sigmabari)
# ubar.append(ubari)
mbar = optRes['states']
sigmabar = optRes['covariances']
ubar = optRes['inputs']
return mbar,ubar,sigmabar
def control(self,mbari,ubari):
mbari = mbari.reshape(3,1)
ubari = ubari.reshape(3,1)
s = self.S
TmpMtxInv = np.linalg.inv(self.B.transpose()*s*self.B+self.R)
s=self.Q+self.A.transpose()*s*self.A-self.A.transpose()*s*self.B*TmpMtxInv*self.B.transpose()*s*self.A
self.S = s
ustar = ubari - np.dot(TmpMtxInv*self.B.transpose()*self.S*self.A,(self.m.reshape(3,1)-mbari)).reshape(3,1)
return ustar
def EKF(self,ustar):
# self.C = self.Ccompute(self.mgoal,self.m,1)
# self.C = self.Ccompute(self.mgoal,self.m,1)
gamma = self.A @ self.sigma @ self.A.T
self.noise,self.W = self.computeNoise(self.m)
self.z = self.C @ self.m.reshape(3,1) + self.noise.reshape(3,1)
tmpMtx2Inv = np.linalg.inv(self.C*gamma*self.C.transpose()+ self.W)
sigma = gamma-gamma*self.C.transpose()*tmpMtx2Inv*self.C*gamma
m = self.m.reshape(3,1) + ustar.reshape(3,1) + gamma @ self.C.transpose() @ tmpMtx2Inv @ (self.z.reshape(3,1)-self.m.reshape(3,1) - ustar.reshape(3,1))
return m,sigma
def computeNoiseSymoblic(self,m):
vx = 0.5 * (self.mgoal[0] - m[0])**2
vy = 0.7 * (self.mgoal[1] - m[1])**2
vz = 0.5
v = np.array([sym_sqrt(vx),sym_sqrt(vy),sym_sqrt(vz)]).reshape(3,1)
vv = np.random.normal(0,v)
w = v.dot(v.transpose())
return vv,w
def computeNoise(self,m):
# noiseInf = self.Points[self.noiseJ,:].reshape(3,1)
# noiseNorm = np.linalg.norm(noiseInf - m.reshape(3,1))
# mean = 0
# variance = noiseNorm
# variance = max(0, min(variance, 10))
# v = np.random.normal(mean, math.sqrt(variance), 3).reshape(3,1)
# w = np.zeros((3,3))
# if noiseNorm < 10:
# if self.noiseJ < 200:
# self.noiseJ += 1
noiseInf = np.array([-50,150],).reshape(2,1)
mresh = self.m.reshape(3,1)
mNoise = np.linalg.norm(mresh[0:2] - noiseInf)
noiseInfx = noiseInf[0,0].reshape(1,1)
variance = 0.05 * mNoise + 0.7
w = np.zeros((3,3))
v = np.random.normal(0,math.sqrt(variance),3).reshape(3,1)
np.fill_diagonal(w,variance)
return v,w
@staticmethod
def Ccompute(vg, v, radius):
tmp = -1/(1+math.exp(-5*(np.linalg.norm(v-vg)-radius)))+1
mat = np.zeros((3, 3))
np.fill_diagonal(mat, tmp)
return mat
def logData(self,ustar):
log_entry = {
"timeStep": self.timeStep,
"m": self.m.tolist(),
"input": ustar.tolist(),
"sigma": self.sigma.tolist(),
"mgoal": self.mgoal.tolist(),
"sigmagoal": self.sigmagoal.tolist(),
"deltaM": np.linalg.norm(self.mgoal.reshape(3,1) - self.m.reshape(3,1)),
"deltaS": np.linalg.norm(self.sigmagoal - self.sigma),
"Wt": self.W.tolist(),
"Wt_norm": np.linalg.norm(self.W),
}
# Read the current file, update the logs, and write back
with open(self.logFilePath, 'r+') as file:
data = json.load(file) # Load existing data
data["logs"].append(log_entry) # Add the new entry
file.seek(0) # Move to the beginning of the file
json.dump(data, file, indent=4) # Write updated data
print(f"Logged data at {log_entry['timeStep']}")
self.timeStep += 1
if __name__ == "__main__":
robot = RobotControl("settings.json")
robot.planning()
I also initialize everything with the settings.JSON file:
{
"meanThreshold": 1,
"stdThreshold":0.1,
"planningThreshold": 4,
"tau": 0,
"bigTau":2,
"m0":[-130, 70, 0],
"logFilePath":"log.JSON",
"lambda": 2000000000
}
Upvotes: 0
Views: 34