Algorithm Customization

Due to the high modularity of OpenCDA, you can conveniently replace any default module with your own algorithms. The best way for customization is to put your customized module under opencda/customize/... , and use inheritance to overwrite the default algorithms. Afterwards, import your customized module in VehicleManager class. The only thing you need to pay attention is to make the input and output format the same as origin module if you only want to change a single module. Below we will show a detailed instruction of customizations for each module.

Localization Customization

The class LocalizationManager takes charges of the localization task. During initialization, a gps and imu sensor will be spawned to collect localization related information.

The core function in it is localize(), which doesn’t take any input, and aims to fill the correct value for self._ego_pos and _speed. The default algorithm to fuse the gps and imu data is the Kalman Filter. It takes the current gps and imu data as input, and return the corrected x, y, z coordinates.

from opencda.core.sensing.localization.kalman_filter import KalmanFilter

class LocalizationManager(object):
     def __init__(self, vehicle, config_yaml, carla_map):
        self.kf = KalmanFilter(self.dt)
     def localize(self):
        corrected_cords = self.kf(x, y, z, speed, yaw, imu_yaw_rate)

If a user wants to remain the whole structure of localization and just replace the filter(e.g. Extended Kalman Filter), then he/she just needs to create a under opencda/customize/core/sensing/localization folder and initializes the CustomizedLocalizationManager with Extended Kalman Filter:

from opencda.core.sensing.localization.localization_manager import LocalizationManager
from opencda.customize.core.sensing.localization.extented_kalman_filter import ExtentedKalmanFilter

class CustomizedLocalizationManager(LocalizationManager):
    def __init__(self, vehicle, config_yaml, carla_map):
        super(CustomizedLocalizationManager, self).__init__(vehicle, config_yaml, carla_map)
        self.kf = ExtentedKalmanFilter(self.dt)

Then go to VehicleManager class, import this customized module and set it as the localizer.

from opencda.core.sensing.localization.localization_manager import LocalizationManager
from opencda.customize.core.sensing.localization.localization_manager import CustomizedLocalizationManager

class VehicleManager(object):
    def __init__(self, vehicle, config_yaml, application, carla_map, cav_world):
        # self.localizer = LocalizationManager(vehicle, sensing_config['localization'], carla_map)
        self.localizer = CustomizedLocalizationManager(vehicle, sensing_config['localization'], carla_map)

If the users want to modify more (e.g. change the data pre-processing), as long as they remember to fill self._ego_pos and self._speed in the localize() function under CustomizedLocalizationManager, it will not cause any problem for the downstream modules.

Perception Customization

The class PerceptionManager is responsible for perception related task. Right now it supports vehicle detection and traffic light detection. The core function detect(ego_pos) takes the ego position from localization module as the input, and return a dictionary objects whose keys are the object categories and the values are each object’s attributes (e.g. 3d poses, static or dynamic) under world coordinate system in this category.

To customize your own object detection algorithms, create a under opencda/customize/core/sensing/perception/ folder.

import cv2
from opencda.core.sensing.perception.perception_manager import PerceptionManager
from opencda.core.sensing.perception.obstacle_vehicle import ObstacleVehicle
from opencda.core.sensing.perception.static_obstacle import TrafficLight

class CustomziedPeceptionManager(PerceptionManager):
     def __init__(self, vehicle, config_yaml, cav_world, data_dump=False):
        super(CustomizedLocalizationManager, self).__init__(vehicle, config_yaml, cav_world, data_dump)
     def detect(self, ego_pos):
        objects = {'vehicles': [],
                   'traffic_lights': [],
                   'other_objects_you_wanna_add' : []}

        # retrieve current rgb images from all cameras
        rgb_images = []
        for rgb_camera in self.rgb_camera:
            while rgb_camera.image is None:
        # retrieve lidar data from the sensor
        lidar_data =
        # this is where you put your algorithm #
        objects = your_algorithm(rgb_images, lidar_data)
        assert type(objects['vehicles']) == ObstacleVehicle
        assert type(objects['traffic_lights']) == TrafficLight
     return objects

Behavior Planning Customization

During simulation runtime, BehaviorAgent first saves the ego vehicle position, speed and surrounding objects information from PerceptionManager and LocalizationManager through function update_information. Afterwards, the BehaviorAgent will call function run_step() to execute a single step and return the target_speed and target_location.

To customize your own behavior planning algorithms, create a under opencda/customize/core/plan/ folder.

import carla.libcarla
from opencda.core.plan.behavior_agent import BehaviorAgent

class CustomizedBehaviorAgent(BehaviorAgent):
    def __init__(self, vehicle, carla_map, config_yaml):

    def update_information(self, ego_pos, ego_speed, objects):
        # this is where you put your algorithm #
        do_some_preprocessing(ego_pos, ego_speed, objects)

    def run_step(self):
        # this is where you put your algorithm #
        target_speed, target_loc = your_plan_algorithm()
        assert type(target_speed) == float
        assert type(target_loc) == carla.Location
        return target_speed, target_loc

Control Customization

Similar with BehaviorAgent, ControlManager first saves the ego vehicle and position through update_info(), and then takes the target_speed and target_loc generated from behavior agent to produce the final control command through run_step().

Different from other modules, ControlManager is more like a abstract class, which provides an interface to call the corresponding controller(default pid controller).

import importlib

class ControlManager(object):
    def __init__(self, control_config):
        controller_type = control_config['type']
        controller = getattr(
                "opencda.core.actuation.%s" %
                controller_type), 'Controller')
        self.controller = controller(control_config['args'])

    def update_info(self, ego_pos, ego_speed):
        Update ego vehicle information for controller.
        self.controller.update_info(ego_pos, ego_speed)

    def run_step(self, target_speed, target_loc):
        Execute current controller step.
        control_command = self.controller.run_step(target_speed, waypoint)
        return control_command

Therefore, if you want to use a controller other than pid controller, you can just create your under opencda/core/acutation/ folder, and follow the same input and output data format:

class CustomizeController:
    def update_info(self, ego_pos, ego_spd):
        # this is where you put your algorithm #
        do_some_process(ego_pos, ego_spd)
    def run_step(self, target_speed, target_loc):
        # this is where you put your algorithm #
        control_command = control(target_speed, target_loc)
        assert control_command == carla.libcarla.VehicleControl
        return control_command

Then put your controller’s name in your yaml file:

        type: customize_controller # this has to be exactly the same name as the controller py file
        args: ......