Skip to content
Snippets Groups Projects
Commit d34096eb authored by Franz Bethke's avatar Franz Bethke
Browse files

Separate class car form CACC test

parent eb19df81
Branches
No related merge requests found
import numpy as np
import random
import sys
import matplotlib
matplotlib.use('TkAgg')
from matplotlib import pyplot as plt
from kalmanfilter import KalmanFilter
from car import Car
### is thrown if a Car is too close to its previous platoon member
class UnderCIPDerror(Exception):
pass
class Car(object):
def __init__(self, prev, speed, position, IPD, PS, tps):
self.platoonPrev = prev
self.cPos = position
self.IPD = IPD
self.PS = PS
self.tps = tps
self.bcIPD = False
# initialise calman filter
if not prev == None:
dt = 1/tps
x = np.array([[0,0]]).T
P = np.array([[10,0],[0,10]])
F = np.array([[1,dt],[0,1]])
Q = np.array([[1,0],[0,1]])
H = np.array([[1,0]])
R = np.array([[5]])
self.KF = KalmanFilter(x, P, F, Q, H, R, tps)
self.KF.predict()
self.dIdx = 0
self.distHistLength = 15
self.ds = np.empty(self.distHistLength)
RAND = 0.5
self.SonarSystemUnc = random.uniform(-RAND,RAND) # in cm
self.SonarStatisticalUnc = lambda : random.gauss(0,RAND) # in cm
self.SpeedSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds
self.SpeedStatisticalUnc = lambda : random.gauss(0,RAND) # in meter per seconds
self.EngineSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds
self.setSpeed(speed)
self.getDistance()
self.v_v = self.cSpeed
return None
def getCIPD(self):
return 0.1*self.IPD
def getSpeed(self):
if self.cSpeed <= 1e-8:
return 0
self.dIdx = (self.dIdx + 1) % self.distHistLength
self.ds[self.dIdx] = self.SpeedSystemUnc + self.cSpeed + self.SpeedStatisticalUnc()
return np.median(self.ds)
def setSpeed(self,value):
if value > 1e-8:
self.cSpeed = value
self.speed = self.EngineSystemUnc + value
else:
self.cSpeed = 0
self.speed = 0
def getDistance(self):
setLater = False
try:
self.oldDist = self.dist
except AttributeError:
setLater = True
if not self.platoonPrev == None:
self.dist = self.SonarSystemUnc + (self.platoonPrev.cPos - self.cPos) + self.SonarStatisticalUnc()
else:
self.dist = sys.maxsize
if setLater:
self.oldDist = self.dist
return self.dist
def updatePos(self):
self.cPos += self.cSpeed/float(tps)
def updateSpeed(self):
if not self.bcIPD:
d = self.getDistance() # sensor data
v = self.getSpeed() # sesnor data
IPD = self.IPD
PS = self.PS
v_new = None
if not self.platoonPrev == None:
self.KF.update(np.array([[self.getDistance()]]))
self.KF.predict()
x = self.KF.getValue()
d = x[0,0]
self.v_v = v_v = max(v + x[1,0], 0)
v_new = self.computeNewSpeed_2(d, v, v_v, IPD, PS)
inertia = 0.8 # in [0,1] und gibt die Traegheit des Fahrzeuges an
self.setSpeed(inertia * v + (1- inertia) * v_new)
if self.getCIPD() > d:
print("raise")
raise UnderCIPDerror("under CIPD!")
else: # this is for the LV
c = 0.5
self.v_v = 0
self.setSpeed(c * v + (1-c) * PS)
def computeNewSpeed_1(self, d, v, v_v, IPD, PS):
if checkInbound(d, IPD, 0.01*IPD):
if checkInbound (v, v_v, 0.01*v_v):
if checkInbound(v, PS, 0.01*PS):
v_new = PS
else:
if v > PS:
v_new = v_v - abs(PS-v_v)/4
else:
v_new = v_v + abs(PS-v_v)/4
else:
if v > v_v:
v_new = v_v - abs(PS-v_v)/4
else:
v_new = v_v + abs(PS-v_v)/4
else:
v_new = (v_v * (d/IPD)**2) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
return v_new
def computeNewSpeed_2(self, d, v, v_v, IPD, PS):
# Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
DIST_POW = 2
SPEED_POW = 0.5
if abs(v_v) > 1e-8:
v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v_v)**SPEED_POW)
else:
v_new = (d - IPD)/IPD
return v_new
def checkInbound(argument, target, delta): #Ueberall ist ein delta von 0.01 hinterlegt
if argument > target + delta:
return False
......@@ -171,22 +27,22 @@ def update(carList):
item.updateSpeed()
if __name__ == "__main__":
maxT = [20, 40, 80, 110, 130,150,170] # times for events
setIPD = [40, 40, 40, 60, 45, 30, 15] # event of IPD (activ until time event)
setPS = [20, 30, 10, 10, 10, 10, 10] # event of PS (activ until time event)
maxT = [20, 40, 80, 110, 130,150,170] # times for events
setIPD = [40, 40, 40, 60, 45, 30, 15] # event of IPD (activ until time event)
setPS = [20, 30, 10, 10, 10, 10, 10] # event of PS (activ until time event)
v = [52.0, 50.0, 10.0] # startvektor (Startgeschwindikeit)
pos = [90, 30, 0] # startvektor (Startposition)
v = [52.0, 50.0, 10.0] # startvektor (Startgeschwindikeit)
pos = [90, 30, 0] # startvektor (Startposition)
tps = 60
cars = []
cars.append(Car(None, v[0], pos[0], setIPD[0], setPS[0],tps))
cars.append(Car(None, v[0], pos[0], setIPD[0], setPS[0],tps))
cars.append(Car(cars[0], v[1], pos[1], setIPD[0], setPS[0],tps))
cars.append(Car(cars[1], v[2], pos[2], setIPD[0], setPS[0],tps))
t = []
mesIPD= []
mesPS = []
IPD= []
PS = []
y_pos = [[],[],[]]
y_speed = [[],[],[]]
......@@ -198,14 +54,14 @@ if __name__ == "__main__":
i = 0.0; end = False
for j in range(0, len(maxT)):
IPD = setIPD[j]
PS = setPS[j]
broadcastPlattonSettings(cars, IPD, PS)
newIPD = setIPD[j]
newPS = setPS[j]
broadcastPlattonSettings(cars, newIPD, newPS)
while i < maxT[j]:
t.append(i)
mesIPD.append(IPD)
mesPS.append(PS)
IPD.append(newIPD)
PS.append(newPS)
for k in range(0, len(cars)):
y_pos[k].append(cars[k].cPos)
......@@ -236,7 +92,7 @@ if __name__ == "__main__":
plt.legend()
plt.subplot(222)
plt.plot(t, mesIPD, 'r--', label='IPD')
plt.plot(t, IPD, 'r--', label='IPD')
plt.plot(t, y_dist[1], 'b-', label='assumed dist FV1')
plt.plot(t, y_cDist[1], 'b--', label='real dist FV1')
plt.plot(t, y_cIPD[1], 'b-.', label='cIPD')
......@@ -247,7 +103,7 @@ if __name__ == "__main__":
plt.legend()
plt.subplot(223)
plt.plot(t, mesPS, 'm--', label='PS')
plt.plot(t, PS, 'm--', label='PS')
# plt.plot(t, y_speed[0], 'r-', label='assumed speed LV')
plt.plot(t, y_cSpeed[0], 'r--', label='real speed LV')
plt.plot(t, y_v_v[1], 'b:', label='speed estiamtion FV1 for LV')
......@@ -257,7 +113,7 @@ if __name__ == "__main__":
plt.legend()
plt.subplot(224)
plt.plot(t, mesPS, 'm--', label='PS')
plt.plot(t, PS, 'm--', label='PS')
# plt.plot(t, y_speed[1], 'b-', label='assumed speed FV1')
plt.plot(t, y_cSpeed[1], 'b--', label='real speed FV1')
plt.plot(t, y_v_v[2], 'g:', label='speed estiamtion FV2 for FV1')
......
import sys
import random
import numpy as np
from kalmanfilter import KalmanFilter
class Car(object):
def __init__(self, prev, speed, position, IPD, PS, tps):
self.platoonPrev = prev
self.cPos = position
self.IPD = IPD
self.PS = PS
self.tps = tps
self.bcIPD = False
# initialise calman filter
if not prev == None:
dt = 1/tps
x = np.array([[0,0]]).T
P = np.array([[10,0],[0,10]])
F = np.array([[1,dt],[0,1]])
Q = np.array([[1,0],[0,1]])
H = np.array([[1,0]])
R = np.array([[5]])
self.KF = KalmanFilter(x, P, F, Q, H, R, tps)
self.KF.predict()
self.dIdx = 0
self.distHistLength = 15
self.ds = np.empty(self.distHistLength)
RAND = 0.5
self.SonarSystemUnc = random.uniform(-RAND,RAND) # in cm
self.SonarStatisticalUnc = lambda : random.gauss(0,RAND) # in cm
self.SpeedSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds
self.SpeedStatisticalUnc = lambda : random.gauss(0,RAND) # in meter per seconds
self.EngineSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds
self.setSpeed(speed)
self.getDistance()
self.v_v = self.cSpeed
return None
def getCIPD(self):
return max(0.1*self.IPD,1)
def getSpeed(self):
if self.cSpeed <= 1e-8:
return 0
self.dIdx = (self.dIdx + 1) % self.distHistLength
self.ds[self.dIdx] = self.SpeedSystemUnc + self.cSpeed + self.SpeedStatisticalUnc()
return np.median(self.ds)
def setSpeed(self,value):
if value > 1e-8:
self.cSpeed = value
self.speed = self.EngineSystemUnc + value
else:
self.cSpeed = 0
self.speed = 0
def getDistance(self):
setLater = False
try:
self.oldDist = self.dist
except AttributeError:
setLater = True
if not self.platoonPrev == None:
self.dist = self.SonarSystemUnc + (self.platoonPrev.cPos - self.cPos) + self.SonarStatisticalUnc()
else:
self.dist = sys.maxsize
if setLater:
self.oldDist = self.dist
return self.dist
def updatePos(self):
self.cPos += self.cSpeed/float(self.tps)
def updateSpeed(self):
if not self.bcIPD:
d = self.getDistance() # sensor data
v = self.getSpeed() # sesnor data
IPD = self.IPD
PS = self.PS
v_new = None
if not self.platoonPrev == None:
self.KF.update(np.array([[self.getDistance()]]))
self.KF.predict()
x = self.KF.getValue()
d = x[0,0]
self.v_v = v_v = max(v + x[1,0], 0)
v_new = self.computeNewSpeed_2(d, v, v_v, IPD, PS)
inertia = 0.8 # in [0,1] und gibt die Traegheit des Fahrzeuges an
self.setSpeed(inertia * v + (1- inertia) * v_new)
if self.getCIPD() > d:
print("raise")
raise UnderCIPDerror("under CIPD!")
else: # this is for the LV
c = 0.5
self.v_v = 0
self.setSpeed(c * v + (1-c) * PS)
def computeNewSpeed_1(self, d, v, v_v, IPD, PS):
if checkInbound(d, IPD, 0.01*IPD):
if checkInbound (v, v_v, 0.01*v_v):
if checkInbound(v, PS, 0.01*PS):
v_new = PS
else:
if v > PS:
v_new = v_v - abs(PS-v_v)/4
else:
v_new = v_v + abs(PS-v_v)/4
else:
if v > v_v:
v_new = v_v - abs(PS-v_v)/4
else:
v_new = v_v + abs(PS-v_v)/4
else:
v_new = (v_v * (d/IPD)**2) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
return v_new
def computeNewSpeed_2(self, d, v, v_v, IPD, PS):
# Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
DIST_POW = 2
SPEED_POW = 0.5
if abs(v_v) > 1e-8:
v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v_v)**SPEED_POW)
else:
v_new = (d - IPD)/IPD
return v_new
def computeNewSpeed_3(self, d, v, v_v, IPD, PS):
# Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
DIST_POW = 1.5; DIST_FAC = 1
SPEED_POW = 0.5; SPEED_FAC = 1
if abs(v_v) > 1e-8:
v_new = (v_v * DIST_FAC*(d/(IPD))**DIST_POW * SPEED_FAC*(PS/v_v)**SPEED_POW)
else:
v_new = (d - IPD)/IPD
return v_new
......@@ -4,14 +4,14 @@ matplotlib.use('TkAgg')
from matplotlib import pyplot as plt
v = 20
v_v = 0.5
PS = 20
v = 12
v_v = 10
PS = 10
IPD = 20
IPD_TOL = 0.05*IPD
DIST_POW = 2
DIST_POW = 1
SPEED_POW = 0.5
ds = np.linspace(0.*IPD, 1.5*IPD, num = 1000)
......@@ -63,16 +63,20 @@ def variante_4(d):
v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v_v)**SPEED_POW) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
return v_new
def variante_5(d):
v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v)**SPEED_POW) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll
return v_new
for d in ds:
v_new = variante_4(d)
v_new = variante_5(d)
v_news.append(v_new)
plt.plot(ds, v_news,'b', label='v_new')
plt.plot(ds, [v_v for d in ds],'tab:orange', label='v_v')
plt.plot(ds, [PS for d in ds], 'g', label='PS')
plt.plot(ds, [v for d in ds], 'b--', label='v')
plt.plot([IPD, IPD], [0, max(v_news)], 'r', label='IPD')
plt.plot([IPD - IPD_TOL, IPD - IPD_TOL], [0, max(v_news)], 'r--', label='IPD-')
plt.plot([IPD + IPD_TOL, IPD + IPD_TOL], [0, max(v_news)], 'r--', label='IPD+')
plt.plot([IPD, IPD], [0, 1.1*max(max(v_news),PS,v_v,v)], 'r', label='IPD')
plt.plot([IPD - IPD_TOL, IPD - IPD_TOL], [0, 1.1*max(max(v_news),PS,v_v,v)], 'r--', label='IPD-')
plt.plot([IPD + IPD_TOL, IPD + IPD_TOL], [0, 1.1*max(max(v_news),PS,v_v,v)], 'r--', label='IPD+')
plt.legend()
plt.show()
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment