diff --git a/modules/CACC/CACC-Module-Test.py b/modules/CACC/CACC-Module-Test.py index ba1aa78f5957b3441be9308b42c8205c924385b3..3de0a30dddc96a4f02f00ff4ec8276c9cee4d2b4 100644 --- a/modules/CACC/CACC-Module-Test.py +++ b/modules/CACC/CACC-Module-Test.py @@ -1,156 +1,12 @@ -import numpy as np -import random -import sys import matplotlib matplotlib.use('TkAgg') from matplotlib import pyplot as plt -from kalmanfilter import KalmanFilter +from car import Car ### is thrown if a Car is too close to its previous platoon member class UnderCIPDerror(Exception): pass -class Car(object): - - def __init__(self, prev, speed, position, IPD, PS, tps): - self.platoonPrev = prev - self.cPos = position - self.IPD = IPD - self.PS = PS - self.tps = tps - - self.bcIPD = False - - # initialise calman filter - if not prev == None: - dt = 1/tps - x = np.array([[0,0]]).T - P = np.array([[10,0],[0,10]]) - - F = np.array([[1,dt],[0,1]]) - Q = np.array([[1,0],[0,1]]) - H = np.array([[1,0]]) - R = np.array([[5]]) - - self.KF = KalmanFilter(x, P, F, Q, H, R, tps) - - self.KF.predict() - - self.dIdx = 0 - self.distHistLength = 15 - self.ds = np.empty(self.distHistLength) - - RAND = 0.5 - self.SonarSystemUnc = random.uniform(-RAND,RAND) # in cm - self.SonarStatisticalUnc = lambda : random.gauss(0,RAND) # in cm - - self.SpeedSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds - self.SpeedStatisticalUnc = lambda : random.gauss(0,RAND) # in meter per seconds - - self.EngineSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds - - self.setSpeed(speed) - self.getDistance() - self.v_v = self.cSpeed - return None - - def getCIPD(self): - return 0.1*self.IPD - - def getSpeed(self): - if self.cSpeed <= 1e-8: - return 0 - self.dIdx = (self.dIdx + 1) % self.distHistLength - self.ds[self.dIdx] = self.SpeedSystemUnc + self.cSpeed + self.SpeedStatisticalUnc() - return np.median(self.ds) - - def setSpeed(self,value): - if value > 1e-8: - self.cSpeed = value - self.speed = self.EngineSystemUnc + value - else: - self.cSpeed = 0 - self.speed = 0 - - def getDistance(self): - setLater = False - try: - self.oldDist = self.dist - except AttributeError: - setLater = True - - if not self.platoonPrev == None: - self.dist = self.SonarSystemUnc + (self.platoonPrev.cPos - self.cPos) + self.SonarStatisticalUnc() - else: - self.dist = sys.maxsize - - if setLater: - self.oldDist = self.dist - return self.dist - - def updatePos(self): - self.cPos += self.cSpeed/float(tps) - - def updateSpeed(self): - if not self.bcIPD: - d = self.getDistance() # sensor data - v = self.getSpeed() # sesnor data - IPD = self.IPD - PS = self.PS - v_new = None - - if not self.platoonPrev == None: - self.KF.update(np.array([[self.getDistance()]])) - self.KF.predict() - x = self.KF.getValue() - d = x[0,0] - self.v_v = v_v = max(v + x[1,0], 0) - - v_new = self.computeNewSpeed_2(d, v, v_v, IPD, PS) - - inertia = 0.8 # in [0,1] und gibt die Traegheit des Fahrzeuges an - self.setSpeed(inertia * v + (1- inertia) * v_new) - - if self.getCIPD() > d: - print("raise") - raise UnderCIPDerror("under CIPD!") - - else: # this is for the LV - c = 0.5 - self.v_v = 0 - self.setSpeed(c * v + (1-c) * PS) - - - def computeNewSpeed_1(self, d, v, v_v, IPD, PS): - if checkInbound(d, IPD, 0.01*IPD): - if checkInbound (v, v_v, 0.01*v_v): - if checkInbound(v, PS, 0.01*PS): - v_new = PS - else: - if v > PS: - v_new = v_v - abs(PS-v_v)/4 - else: - v_new = v_v + abs(PS-v_v)/4 - else: - if v > v_v: - v_new = v_v - abs(PS-v_v)/4 - else: - v_new = v_v + abs(PS-v_v)/4 - else: - v_new = (v_v * (d/IPD)**2) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll - return v_new - - def computeNewSpeed_2(self, d, v, v_v, IPD, PS): - # Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll - DIST_POW = 2 - SPEED_POW = 0.5 - if abs(v_v) > 1e-8: - v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v_v)**SPEED_POW) - else: - v_new = (d - IPD)/IPD - return v_new - - def checkInbound(argument, target, delta): #Ueberall ist ein delta von 0.01 hinterlegt if argument > target + delta: return False @@ -171,22 +27,22 @@ def update(carList): item.updateSpeed() if __name__ == "__main__": - maxT = [20, 40, 80, 110, 130,150,170] # times for events - setIPD = [40, 40, 40, 60, 45, 30, 15] # event of IPD (activ until time event) - setPS = [20, 30, 10, 10, 10, 10, 10] # event of PS (activ until time event) + maxT = [20, 40, 80, 110, 130,150,170] # times for events + setIPD = [40, 40, 40, 60, 45, 30, 15] # event of IPD (activ until time event) + setPS = [20, 30, 10, 10, 10, 10, 10] # event of PS (activ until time event) - v = [52.0, 50.0, 10.0] # startvektor (Startgeschwindikeit) - pos = [90, 30, 0] # startvektor (Startposition) + v = [52.0, 50.0, 10.0] # startvektor (Startgeschwindikeit) + pos = [90, 30, 0] # startvektor (Startposition) tps = 60 cars = [] - cars.append(Car(None, v[0], pos[0], setIPD[0], setPS[0],tps)) + cars.append(Car(None, v[0], pos[0], setIPD[0], setPS[0],tps)) cars.append(Car(cars[0], v[1], pos[1], setIPD[0], setPS[0],tps)) cars.append(Car(cars[1], v[2], pos[2], setIPD[0], setPS[0],tps)) t = [] - mesIPD= [] - mesPS = [] + IPD= [] + PS = [] y_pos = [[],[],[]] y_speed = [[],[],[]] @@ -198,14 +54,14 @@ if __name__ == "__main__": i = 0.0; end = False for j in range(0, len(maxT)): - IPD = setIPD[j] - PS = setPS[j] - broadcastPlattonSettings(cars, IPD, PS) + newIPD = setIPD[j] + newPS = setPS[j] + broadcastPlattonSettings(cars, newIPD, newPS) while i < maxT[j]: t.append(i) - mesIPD.append(IPD) - mesPS.append(PS) + IPD.append(newIPD) + PS.append(newPS) for k in range(0, len(cars)): y_pos[k].append(cars[k].cPos) @@ -236,7 +92,7 @@ if __name__ == "__main__": plt.legend() plt.subplot(222) - plt.plot(t, mesIPD, 'r--', label='IPD') + plt.plot(t, IPD, 'r--', label='IPD') plt.plot(t, y_dist[1], 'b-', label='assumed dist FV1') plt.plot(t, y_cDist[1], 'b--', label='real dist FV1') plt.plot(t, y_cIPD[1], 'b-.', label='cIPD') @@ -247,7 +103,7 @@ if __name__ == "__main__": plt.legend() plt.subplot(223) - plt.plot(t, mesPS, 'm--', label='PS') + plt.plot(t, PS, 'm--', label='PS') # plt.plot(t, y_speed[0], 'r-', label='assumed speed LV') plt.plot(t, y_cSpeed[0], 'r--', label='real speed LV') plt.plot(t, y_v_v[1], 'b:', label='speed estiamtion FV1 for LV') @@ -257,7 +113,7 @@ if __name__ == "__main__": plt.legend() plt.subplot(224) - plt.plot(t, mesPS, 'm--', label='PS') + plt.plot(t, PS, 'm--', label='PS') # plt.plot(t, y_speed[1], 'b-', label='assumed speed FV1') plt.plot(t, y_cSpeed[1], 'b--', label='real speed FV1') plt.plot(t, y_v_v[2], 'g:', label='speed estiamtion FV2 for FV1') diff --git a/modules/CACC/car.py b/modules/CACC/car.py new file mode 100644 index 0000000000000000000000000000000000000000..38ed22126e4c429352b037aba8a8229b8caf3a07 --- /dev/null +++ b/modules/CACC/car.py @@ -0,0 +1,154 @@ +import sys +import random +import numpy as np +from kalmanfilter import KalmanFilter + +class Car(object): + + def __init__(self, prev, speed, position, IPD, PS, tps): + self.platoonPrev = prev + self.cPos = position + self.IPD = IPD + self.PS = PS + self.tps = tps + + self.bcIPD = False + + # initialise calman filter + if not prev == None: + dt = 1/tps + x = np.array([[0,0]]).T + P = np.array([[10,0],[0,10]]) + + F = np.array([[1,dt],[0,1]]) + Q = np.array([[1,0],[0,1]]) + H = np.array([[1,0]]) + R = np.array([[5]]) + + self.KF = KalmanFilter(x, P, F, Q, H, R, tps) + + self.KF.predict() + + self.dIdx = 0 + self.distHistLength = 15 + self.ds = np.empty(self.distHistLength) + + RAND = 0.5 + self.SonarSystemUnc = random.uniform(-RAND,RAND) # in cm + self.SonarStatisticalUnc = lambda : random.gauss(0,RAND) # in cm + + self.SpeedSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds + self.SpeedStatisticalUnc = lambda : random.gauss(0,RAND) # in meter per seconds + + self.EngineSystemUnc = random.uniform(-RAND,RAND) # in meter per seconds + + self.setSpeed(speed) + self.getDistance() + self.v_v = self.cSpeed + return None + + def getCIPD(self): + return max(0.1*self.IPD,1) + + def getSpeed(self): + if self.cSpeed <= 1e-8: + return 0 + self.dIdx = (self.dIdx + 1) % self.distHistLength + self.ds[self.dIdx] = self.SpeedSystemUnc + self.cSpeed + self.SpeedStatisticalUnc() + return np.median(self.ds) + + def setSpeed(self,value): + if value > 1e-8: + self.cSpeed = value + self.speed = self.EngineSystemUnc + value + else: + self.cSpeed = 0 + self.speed = 0 + + def getDistance(self): + setLater = False + try: + self.oldDist = self.dist + except AttributeError: + setLater = True + + if not self.platoonPrev == None: + self.dist = self.SonarSystemUnc + (self.platoonPrev.cPos - self.cPos) + self.SonarStatisticalUnc() + else: + self.dist = sys.maxsize + + if setLater: + self.oldDist = self.dist + return self.dist + + def updatePos(self): + self.cPos += self.cSpeed/float(self.tps) + + def updateSpeed(self): + if not self.bcIPD: + d = self.getDistance() # sensor data + v = self.getSpeed() # sesnor data + IPD = self.IPD + PS = self.PS + v_new = None + + if not self.platoonPrev == None: + self.KF.update(np.array([[self.getDistance()]])) + self.KF.predict() + x = self.KF.getValue() + d = x[0,0] + self.v_v = v_v = max(v + x[1,0], 0) + + v_new = self.computeNewSpeed_2(d, v, v_v, IPD, PS) + + inertia = 0.8 # in [0,1] und gibt die Traegheit des Fahrzeuges an + self.setSpeed(inertia * v + (1- inertia) * v_new) + + if self.getCIPD() > d: + print("raise") + raise UnderCIPDerror("under CIPD!") + + else: # this is for the LV + c = 0.5 + self.v_v = 0 + self.setSpeed(c * v + (1-c) * PS) + + + def computeNewSpeed_1(self, d, v, v_v, IPD, PS): + if checkInbound(d, IPD, 0.01*IPD): + if checkInbound (v, v_v, 0.01*v_v): + if checkInbound(v, PS, 0.01*PS): + v_new = PS + else: + if v > PS: + v_new = v_v - abs(PS-v_v)/4 + else: + v_new = v_v + abs(PS-v_v)/4 + else: + if v > v_v: + v_new = v_v - abs(PS-v_v)/4 + else: + v_new = v_v + abs(PS-v_v)/4 + else: + v_new = (v_v * (d/IPD)**2) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll + return v_new + + def computeNewSpeed_2(self, d, v, v_v, IPD, PS): + # Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll + DIST_POW = 2 + SPEED_POW = 0.5 + if abs(v_v) > 1e-8: + v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v_v)**SPEED_POW) + else: + v_new = (d - IPD)/IPD + return v_new + + def computeNewSpeed_3(self, d, v, v_v, IPD, PS): + # Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll + DIST_POW = 1.5; DIST_FAC = 1 + SPEED_POW = 0.5; SPEED_FAC = 1 + if abs(v_v) > 1e-8: + v_new = (v_v * DIST_FAC*(d/(IPD))**DIST_POW * SPEED_FAC*(PS/v_v)**SPEED_POW) + else: + v_new = (d - IPD)/IPD + return v_new diff --git a/modules/CACC/simlifiedSpeedVisualization.py b/modules/CACC/simlifiedSpeedVisualization.py index 95c5a704d71e3b2a6f2285d1d238907f306b9721..73a0b7044bf9f60ce2671ff676e4df263bcce122 100644 --- a/modules/CACC/simlifiedSpeedVisualization.py +++ b/modules/CACC/simlifiedSpeedVisualization.py @@ -4,14 +4,14 @@ matplotlib.use('TkAgg') from matplotlib import pyplot as plt -v = 20 -v_v = 0.5 -PS = 20 +v = 12 +v_v = 10 +PS = 10 IPD = 20 IPD_TOL = 0.05*IPD -DIST_POW = 2 +DIST_POW = 1 SPEED_POW = 0.5 ds = np.linspace(0.*IPD, 1.5*IPD, num = 1000) @@ -63,16 +63,20 @@ def variante_4(d): v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v_v)**SPEED_POW) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll return v_new +def variante_5(d): + v_new = (v_v * (d/(IPD))**DIST_POW * (PS/v)**SPEED_POW) #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll + return v_new + for d in ds: - v_new = variante_4(d) + v_new = variante_5(d) v_news.append(v_new) plt.plot(ds, v_news,'b', label='v_new') plt.plot(ds, [v_v for d in ds],'tab:orange', label='v_v') plt.plot(ds, [PS for d in ds], 'g', label='PS') plt.plot(ds, [v for d in ds], 'b--', label='v') -plt.plot([IPD, IPD], [0, max(v_news)], 'r', label='IPD') -plt.plot([IPD - IPD_TOL, IPD - IPD_TOL], [0, max(v_news)], 'r--', label='IPD-') -plt.plot([IPD + IPD_TOL, IPD + IPD_TOL], [0, max(v_news)], 'r--', label='IPD+') +plt.plot([IPD, IPD], [0, 1.1*max(max(v_news),PS,v_v,v)], 'r', label='IPD') +plt.plot([IPD - IPD_TOL, IPD - IPD_TOL], [0, 1.1*max(max(v_news),PS,v_v,v)], 'r--', label='IPD-') +plt.plot([IPD + IPD_TOL, IPD + IPD_TOL], [0, 1.1*max(max(v_news),PS,v_v,v)], 'r--', label='IPD+') plt.legend() plt.show()