Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
CACC-Module-Test.py 7.64 KiB
import numpy as np
import random
import sys
import matplotlib
matplotlib.use('TkAgg')
from matplotlib import pyplot as plt
from kalmanfilter import KalmanFilter 

class UnderCIPDerror(Exception):
  pass

class Car(object):

    def __init__(self, prev, speed, position, IPD, PS, tStep):
        self.plattonPrev = prev
        if not prev == None:
            dt = 1/tStep
            x = np.array([[0,0]]).T
            P = np.array([[10,0],[0,10]])

            # f(x,v,a) =
            # x = f1(x,v,a) = x + v*dt + a * dt**2
            # v = f2(x,v,a) = v + a*dt
            # a = f3(x,v,a) = a

            F = np.array([[1,dt],[0,1]])
            Q = np.array([[1,0],[0,1]])
            H = np.array([[1,0]])
            R = np.array([[5]])

            self.KF = KalmanFilter(x, P, F, Q, H, R, tStep)

            self.KF.predict()
        self.cPos = position
        self.IPD = IPD
        self.PS = PS
        self.tStep = tStep

        self.bcIPD = False
        
        RAND = 0.5
        self.SonarSystemUnc      = random.uniform(-RAND,RAND)       # in cm
        self.SonarStatisticalUnc = lambda : random.gauss(0,RAND) # in cm
        
        self.SpeedSystemUnc      = random.uniform(-RAND,RAND)        # in meter per seconds
        self.SpeedStatisticalUnc = lambda : random.gauss(0,RAND) # in meter per seconds
        

        self.EngineSystemUnc     = random.uniform(-RAND,RAND) # in meter per seconds
        
        self.setSpeed(speed)
        self.getDistance()
        self.v_v = self.cSpeed
        return None

    def getCIPD(self):
        return 0.1*self.IPD
    
    def getSpeed(self):
        if self.cSpeed <= 1e-8:
            return 0
        return self.SpeedSystemUnc + self.cSpeed + self.SpeedStatisticalUnc()
      
    def setSpeed(self,value):
        if value > 1e-8:
            self.cSpeed = value
            self.speed  = self.EngineSystemUnc + value
        else:
            self.cSpeed = 0
            self.speed = 0
      
    def getDistance(self):
        setLater = False
        try: 
            self.oldDist = self.dist
        except AttributeError:
            setLater = True
        
        if not self.plattonPrev == None:
            self.dist = self.SonarSystemUnc + (self.plattonPrev.cPos - self.cPos) + self.SonarStatisticalUnc()
        else:
            self.dist = sys.maxsize
            
        if setLater:
            self.oldDist = self.dist
        return self.dist

    def updatePos(self):
        self.cPos += self.cSpeed/float(tStep)

    def estimateVeloVA(self):
        if not self.plattonPrev == None:
            return self.getSpeed() + (self.dist - self.oldDist)*self.tStep    

    def updateSpeed(self):
        if not self.bcIPD:
            d = self.getDistance() # sensor data
            v = self.getSpeed()    # sesnor data
            IPD = self.IPD
            PS  = self.PS
            v_new = None

            if not self.plattonPrev ==  None:
                #self.v_v = self.estimateVeloVA()
                #v_v = self.v_v

                self.KF.update(np.array([[self.getDistance()]]))
                self.KF.predict()
                x = self.KF.getValue()
                d = x[0,0]
                self.v_v = v_v = v + x[1,0]
                if checkInbound(d, IPD, 0.01*IPD):
                    if checkInbound (v, v_v, 0.01*v_v):
                        if checkInbound(v, PS, 0.01*PS):
                            v_new = PS
                        else:
                            if v > PS:
                                v_new = v_v - abs(PS-v_v)/4
                            else:
                                v_new = v_v + abs(PS-v_v)/4
                    else:
                        if v > v_v:
                            v_new = v_v - abs(PS-v_v)/4
                        else:
                            v_new = v_v + abs(PS-v_v)/4
                else:
                    v_new = (v_v * (d/IPD)**2)      #Der Exponent gibt an, wie schnell die Aenderung umgesetzt werden soll

                inertia = 0.8                                   # in [0,1] und gibt die Traegheit des Fahrzeuges an
                self.setSpeed(inertia * v + (1- inertia) * v_new)
                
                if self.getCIPD() > d:
                    print("raise")
                    raise UnderCIPDerror("under CIPD!")
                
            else: # this is for the LV
                c = 0.5
                self.v_v = 0
                self.setSpeed(c * v + (1-c) * PS)


def checkInbound(argument, target, delta):    #Ueberall ist ein delta von 0.01 hinterlegt
    if argument > target + delta:
        return False
    elif argument < target - delta:
        return False
    else:
        return True

def broadcastPlattonSettings(carList, newIPD, newPS):
    for item in carList:
        item.PS = newPS
        item.IPD = newIPD

def update(carList):
    for item in carList:
        #if not item.plattonPrev == None:
            #item.KF.resetCov()
        item.updatePos()
    for item in reversed(carList):
        item.updateSpeed()

if __name__ == "__main__":
    maxT = [20, 40, 80, 110, 130,150,170]           # times for events
    setIPD = [40, 40, 40, 60, 45, 30, 15]           # event of IPD (activ until time event)
    setPS  = [20, 30, 10, 10, 10, 10, 10]           # event of PS (activ until time event)
    # setIPD = [40, 40, 40, 40, 40, 40, 40]           # event of IPD (activ until time event)
    # setPS  = [20, 20, 20, 20, 20, 20, 20]           # event of PS (activ until time event)

    v   = [52.0, 50.0, 10.0]                        # startvektor (Startgeschwindikeit)
    pos = [90, 30, 0]                               # startvektor (Startposition)
    tStep = 50
    
    cars = []
    cars.append(Car(None, v[0], pos[0], setIPD[0], setPS[0],tStep))
    cars.append(Car(cars[0], v[1], pos[1], setIPD[0], setPS[0],tStep))
    #cars.append(Car(cars[1], v[2], pos[2], setIPD[0], setPS[0],tStep))

    t = []
    mesIPD= []
    mesPS = []
   
    y_pos = [[],[],[]]
    y_speed = [[],[],[]]
    y_cSpeed = [[],[],[]]
    y_dist  = [[],[],[]]
    y_cDist = [[],[],[]]
    y_cdist = [[],[],[]]
    y_v_v = [[],[],[]]
    
    i = 0.0; end = False
    for j in range(0, len(maxT)):
        IPD = setIPD[j]
        PS = setPS[j]
        broadcastPlattonSettings(cars, IPD, PS)
        
        while i < maxT[j]:
            t.append(i)
            mesIPD.append(IPD)
            mesPS.append(PS)

            for k in range(0, len(cars)):
                y_pos[k].append(cars[k].cPos)
                y_speed[k].append(cars[k].getSpeed())
                y_cSpeed[k].append(cars[k].cSpeed)
                y_v_v[k].append(cars[k].v_v)

            for k in range(1, len(cars)):
                y_dist[k].append(cars[k].getDistance())
                y_cDist[k].append(cars[k-1].cPos - cars[k].cPos)
                y_cdist[k].append(cars[k].getCIPD())

            i = i+1/tStep
            try:
                update(cars)
            except UnderCIPDerror:
                print("catch")
                end = True  
                break
        if end: break


    plt.subplot(131)
    plt.plot(t, y_pos[0], 'r')
    plt.plot(t, y_pos[1], 'b')
    # plt.plot(t, y_pos[2], 'g')
    plt.title("Position")

    plt.subplot(132)
    plt.plot(t, mesPS, 'm--')
    plt.plot(t, y_speed[0], 'r--')
    plt.plot(t, y_cSpeed[0], 'r-')
    plt.plot(t, y_speed[1], 'b--')
    plt.plot(t, y_cSpeed[1], 'b')
    plt.plot(t, y_v_v[1], 'g')
    # plt.plot(t, y_speed[2], 'g')
    plt.title("Speed")
    
    plt.subplot(133)
    plt.plot(t, mesIPD, 'r--')
    plt.plot(t, y_dist[1], 'b-')
    plt.plot(t, y_cdist[1], 'b--')
    plt.plot(t, y_cDist[1], 'b--')
    # plt.plot(t, y_dist[2], 'g-')
    # plt.plot(t, y_cdist[2], 'g--')
    plt.title("Distance")

    plt.show()