Cyber RT构建新组件

Apollo Cyber RT处于底层的实时操作系统(RTOS)算法模块之间,能够在保证高吞吐的情况下,又能低延迟的实时响应上层任务,并保证整个系统确定性的运转。

Apollo Cyber RT 框架核心理念是基于组件,组件有预先设定的输入输出。实际上,每个组件就代表一个专用的算法模块。框架可以根据所有预定义的组件生成有向无环图 (DAG)

在运行时刻,框架把融合好的传感器数据预定义的组件打包在一起形成用户级轻量任务,之后,框架的调度器可以根据资源可用性和任务优先级来派发这些任务。

1、功能模块的迁移

将基于ROS的功能模块迁移到基于Cyber RT的功能组件,不需要修改任何算法逻辑,仅需要修改消息接收、发布代码、以及组件调度的配置文件。

功能模块迁移的基本步骤:CyberRT_Quick_Start_cn.md

`Apollo Cyber RT`框架基于组件的概念构建、加载各功能模块。`Localization`、 `Perception`等功能模块均作为`Apollo Cyber RT`框架的一个组件而存在,基于`Cyber RT`提供的调度程序`mainboard`加载运行。 基于`Apollo Cyber RT`框架创建和发布新的功能模块组件,需执行以下五个基本步骤:
  • 设置组件文件结构
  • 实现组件类
  • 提供构建文件(BUILD)
  • 提供配置文件(DAG)
  • 启动组件(LAUNCH)

Planning模块为例说明。

1.1、设置组件文件结构

基于路径${APOLLO_HOME}/modules/planning${APOLLO_HOME}表示Apollo项目的根目录,Docker内部的路径全部是/apollo,设置如下组件的文件结构:

  • 头文件: planning_component.h
  • 实现文件: planning_component.cc
  • 构建文件: BUILD
  • DAG配置文件: dag/planning.dag
  • Launch配置文件: launch/planning.launch

1.2、实现组件类

执行以下步骤以实现组件类:

  1. 基于模板类Component派生出规划模块的组件类PlanningComponent
  2. 在派生类PlanningComponent中覆盖纯虚函数Init() and Proc()函数;
  3. 使用宏CYBER_REGISTER_COMPONENT(PlanningComponent)注册组件类PlanningComponent,以便Cyber RT能正确创建并加载该类对象。

1.2.1、组件类PlanningComponent的声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
namespace apollo {
namespace planning {

class PlanningComponent final // 此类不能被继承
: public cyber::Component<prediction::PredictionObstacles, canbus::Chassis,
localization::LocalizationEstimate> {
public:
PlanningComponent() = default;

~PlanningComponent() = default;

public:
bool Init() override;

bool Proc(const std::shared_ptr<prediction::PredictionObstacles>&
prediction_obstacles,
const std::shared_ptr<canbus::Chassis>& chassis,
const std::shared_ptr<localization::LocalizationEstimate>&
localization_estimate) override;

private:
void CheckRerouting();
bool CheckInput();

private:
std::shared_ptr<cyber::Reader<perception::TrafficLightDetection>> traffic_light_reader_;
std::shared_ptr<cyber::Reader<routing::RoutingResponse>> routing_reader_;
std::shared_ptr<cyber::Reader<planning::PadMessage>> pad_msg_reader_;
std::shared_ptr<cyber::Reader<relative_map::MapMsg>> relative_map_reader_;

std::shared_ptr<cyber::Writer<ADCTrajectory>> planning_writer_;
std::shared_ptr<cyber::Writer<routing::RoutingRequest>> rerouting_writer_;

std::mutex mutex_;
perception::TrafficLightDetection traffic_light_;
routing::RoutingResponse routing_;
planning::PadMessage pad_msg_;
relative_map::MapMsg relative_map_;

LocalView local_view_; // 处理fused_data用???

// unique_ptr不能复制
std::unique_ptr<PlanningBase> planning_base_;

PlanningConfig config_;
};

CYBER_REGISTER_COMPONENT(PlanningComponent)

} // namespace planning
} // namespace apollo

基类Component的定义是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <typename M0 = NullType, typename M1 = NullType,
typename M2 = NullType, typename M3 = NullType>
class Component : public ComponentBase {
public:
Component() {}
~Component() override {}

// 会调用基类的纯虚函数Init();这里先使用LoadConfigFiles(config)从dag文件读取配置信息
bool Initialize(const ComponentConfig& config) override;
// 会调用纯虚函数Proc()
bool Process(const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1,
const std::shared_ptr<M2>& msg2,
const std::shared_ptr<M3>& msg3);

private:
virtual bool Proc(const std::shared_ptr<M0>& msg0, // 表示4个channel message
const std::shared_ptr<M1>& msg1,
const std::shared_ptr<M2>& msg2,
const std::shared_ptr<M3>& msg3) = 0;
};

可见,Component类最多接受4个模板参数,每个模板参数均表示一种输入的消息类型,这些消息在Proc函数中被周期性地接收并处理;而PlanningComponent继承的是该模板类接受3个参数的一个特化版本:

1
2
3
4
template <typename M0, typename M1, typename M2>
class Component<M0, M1, M2, NullType> : public ComponentBase {
// ...
};

PlanningComponent继承自cyber::Component<prediction::PredictionObstacles, canbus::Chassis, localization::LocalizationEstimate>,3个消息参数分别为:prediction::PredictionObstaclescanbus::Chassislocalization::LocalizationEstimate,这些消息在Proc函数中被周期性地接收并处理。bool Initialize(const ComponentConfig& config)函数的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename M0, typename M1, typename M2>
bool Component<M0, M1, M2, NullType>::Initialize(const ComponentConfig& config) {
node_.reset(new Node(config.name()));
LoadConfigFiles(config);

if (config.readers_size() < 3) {
AERROR << "Invalid config file: too few readers.";
return false;
}

if (!Init()) {
AERROR << "Component Init() failed.";
return false;
}

ReaderConfig reader_cfg; //设置reader的一些参数
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg..CopyFrom(config.readers(1).());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
// ...
if (ptr) {
ptr->Process(msg0, msg1, msg2);
} else {
AERROR << "Component object has been destroyed.";
}
// ...
}
`

mainboard函数中的ModuleController类实现了component的实例化,并且执行了base->Initialize()函数,由上述函数可见,Initialize()函数调用了Init,与Process()函数,最终启动了组件的功能。Initialize()函数首先新建一个node,然后读入dag配置文件中的components部分。LoadConfigFiles()是抽象基类ComponentBase的一个成员函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void LoadConfigFiles(const ComponentConfig& config) {
if (!config.config_file_path().empty()) { // dag文件中的config_file_path项
if (config.config_file_path()[0] != '/') {
config_file_path_ = common::GetAbsolutePath(common::WorkRoot(),
config.config_file_path());
} else {
config_file_path_ = config.config_file_path();
}
}

if (!config.flag_file_path().empty()) {
std::string flag_file_path = config.flag_file_path(); // dag文件中的flag_file_path项
if (flag_file_path[0] != '/') {
flag_file_path =
common::GetAbsolutePath(common::WorkRoot(), flag_file_path);
}
// gflags是google的一个开源的处理命令行参数的库
// 把 flag 参数和对应的值写在文件中,然后运行时使用 -flagfile 来指定对应的 flag 文件
google::SetCommandLineOption("flagfile", flag_file_path.c_str()); //FLAGS__的来源
}
}

上述函数主要读取dag文件中组件components部分的配置文件config_file_path和标志文件flag_file_path的路径。其中,标志文件使用google gflags 库读入进程序中,定义的驾驶模式标志文件"modules/common/configs/config_gflags.h",消息话题标志文件"modules/common/adapters/adapter_gflags.h"

1.2.2、组件类PlanningComponent的实现

PlanningComponent的实现主要包括两个覆盖的虚函数Init() and Proc()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
bool PlanningComponent::Init() {
if (FLAGS_use_navigation_mode) { // "modules/common/adapters/adapter_gflags.h"
planning_base_ = std::make_unique<NaviPlanning>();
} else {
planning_base_ = std::make_unique<OnLanePlanning>();
}

CHECK(apollo::cyber::common::GetProtoFromFile(FLAGS_planning_config_file,&config_))
<< "failed to load planning config file " << FLAGS_planning_config_file;
planning_base_->Init(config_);
// 消息处理回调函数
routing_reader_ = node_->CreateReader<RoutingResponse>(
FLAGS_routing_response_topic, // "modules/common/adapters/adapter_gflags.h"
[this](const std::shared_ptr<RoutingResponse>& routing) {
AINFO << "Received routing data: run routing callback."
<< routing->header().DebugString();
std::lock_guard<std::mutex> lock(mutex_);
routing_.CopyFrom(*routing);
});
traffic_light_reader_ = node_->CreateReader<TrafficLightDetection>(
FLAGS_traffic_light_detection_topic,
[this](const std::shared_ptr<TrafficLightDetection>& traffic_light) {
ADEBUG << "Received traffic light data: run traffic light callback.";
std::lock_guard<std::mutex> lock(mutex_);
traffic_light_.CopyFrom(*traffic_light);
});

pad_msg_reader_ = node_->CreateReader<PadMessage>(
FLAGS_planning_pad_topic,
[this](const std::shared_ptr<PadMessage>& pad_msg) {
ADEBUG << "Received pad data: run pad callback.";
std::lock_guard<std::mutex> lock(mutex_);
pad_msg_.CopyFrom(*pad_msg);
});

if (FLAGS_use_navigation_mode) {
relative_map_reader_ = node_->CreateReader<MapMsg>(
FLAGS_relative_map_topic,
[this](const std::shared_ptr<MapMsg>& map_message) {
ADEBUG << "Received relative map data: run relative map callback.";
std::lock_guard<std::mutex> lock(mutex_);
relative_map_.CopyFrom(*map_message);
});
}
// 模块输出器
planning_writer_ =
node_->CreateWriter<ADCTrajectory>(FLAGS_planning_trajectory_topic);
rerouting_writer_ =
node_->CreateWriter<RoutingRequest>(FLAGS_routing_request_topic);

return true;
}

其中planning_base_->Init()函数用于创建实际规划类对象,接着创建除prediction::PredictionObstaclescanbus::Chassislocalization::LocalizationEstimate三类消息以外的其他消息处理回调函数,创建Planning模块的输出器:轨迹输出器planning_writer_和重新生成路由输出器rerouting_writer_Planning模块主要功能实现函数bool PlanningComponent::Proc如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
bool PlanningComponent::Proc(
const std::shared_ptr<prediction::PredictionObstacles>& prediction_obstacles,
const std::shared_ptr<canbus::Chassis>& chassis,
const std::shared_ptr<localization::LocalizationEstimate>& localization_estimate) {
CHECK(prediction_obstacles != nullptr);

// check and process possible rerouting request
CheckRerouting();

// process fused input data
local_view_.prediction_obstacles = prediction_obstacles;
local_view_.chassis = chassis;
local_view_.localization_estimate = localization_estimate;
{
std::lock_guard<std::mutex> lock(mutex_);
if (!local_view_.routing ||
hdmap::PncMap::IsNewRouting(*local_view_.routing, routing_)) {
local_view_.routing = std::make_shared<routing::RoutingResponse>(routing_);
}
}
{
std::lock_guard<std::mutex> lock(mutex_);
local_view_.traffic_light = std::make_shared<TrafficLightDetection>(traffic_light_);
local_view_.relative_map = std::make_shared<MapMsg>(relative_map_);
}
{
std::lock_guard<std::mutex> lock(mutex_);
local_view_.pad_msg = std::make_shared<PadMessage>(pad_msg_);
}

if (!CheckInput()) {
AERROR << "Input check failed";
return false;
}

ADCTrajectory adc_trajectory_pb;
planning_base_->RunOnce(local_view_, &adc_trajectory_pb);
auto start_time = adc_trajectory_pb.header().timestamp_sec();
common::util::FillHeader(node_->Name(), &adc_trajectory_pb);

// modify trajectory relative time due to the timestamp change in header
const double dt = start_time - adc_trajectory_pb.header().timestamp_sec();
for (auto& p : *adc_trajectory_pb.mutable_trajectory_point()) {
p.set_relative_time(p.relative_time() + dt);
}
planning_writer_->Write(adc_trajectory_pb);

// record in history
auto* history = History::Instance();
history->Add(adc_trajectory_pb);

return true;
}

Proc()函数周期性地接收prediction::PredictionObstaclescanbus::Chassislocalization::LocalizationEstimate三类消息,调用planning_base_->RunOnce()函数执行实际的路径与速度规划,并将规划结果adc_trajectory_pb借助函数planning_writer_->Write()将生成的规划轨迹输出给控制模块执行。

1.3、提供构建文件/apollo/modules/planning/BUILD

下面列出/apollo/modules/planning/BUILD文件中与Cyber RT相关的内容,可见基于planning_component_lib库最终生成了一个共享库文件libplanning_component.so,而该共享库通过Cyber RT调度程序mainboard动态加载运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cc_library(
name = "planning_component_lib",
srcs = ["planning_component.cc"],
hdrs = ["planning_component.h"],
copts = [
"-DMODULE_NAME=\\\"planning\\\"",
],
deps = [
":navi_planning",
":on_lane_planning",
"//cyber",
"//modules/common/adapters:adapter_gflags",
"//modules/common/util:message_util",
"//modules/localization/proto:localization_proto",
"//modules/map/relative_map/proto:navigation_proto",
"//modules/perception/proto:perception_proto",
"//modules/planning/common:history",
"//modules/planning/proto:planning_proto",
"//modules/prediction/proto:prediction_proto",
],
)

cc_binary(
name = "libplanning_component.so",
linkshared = True,
linkstatic = False,
deps = [":planning_component_lib"],
)

1.4、提供DAG配置文件/apollo/dag/planning.dag

DAG配置文件是Cyber RT调度程序mainboard动态加载Planning模块的最终配置文件,加载命令一般为:

1
/apollo/bazel-bin/cyber/mainboard -d /apollo/modules/planning/dag/planning.dag

dag文件编写规则参考cyber/proto/dag_conf.proto以及common::GetProtoFromFile(path, &dag_config),标准模式的DAG配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Define all coms in DAG streaming.
module_config {
# 共享库文件路径
module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"
components {
# 组件类名称,一定不能写错,否则mainboard无法动态创建PlanningComponent组件对象
class_name : "PlanningComponent"
config {
# 模块名
name: "planning"
# GFlag配置文件路径,注意路径一定写成绝对路径,否则可能无法找到配置文件,导致模块加载失败
flag_file_path: "/apollo/modules/planning/conf/planning.conf"
# PlanningComponent组件**Proc()函数中使用的三个消息接收器**
readers: [
{
channel: "/apollo/prediction"
},
{
channel: "/apollo/canbus/chassis"
qos_profile: {
depth : 15
}
pending_queue_size: 50
},
{
channel: "/apollo/localization/pose"
qos_profile: {
depth : 15
}
pending_queue_size: 50
}
]
}
}
}

1.5、提供Launch配置文件/apollo/launch/planning.launch

Launch配置文件是Cyber RT提供的一个Python工具程序cyber_launch加载Planning模块所需的配置文件,启动命令如下所示(最终仍归结于mainboard加载):

1
cyber_launch start /apollo/launch/planning.launch

标准模式的Launch配置文件如下:规则见Apollo的启动过程1–启动命令解析

1
2
3
4
5
6
7
<cyber>
<module>
<name>planning</name>
<dag_conf>/apollo/modules/planning/dag/planning.dag</dag_conf>
<process_name>planning</process_name>
</module>
</cyber>
听说打赏我的人,最后都找到了真爱。