PX4 ROS 2 Interface Library
Library to interface with PX4 from a companion computer using ROS 2
shared_subscription.hpp
1 /****************************************************************************
2  * Copyright (c) 2025 PX4 Development Team.
3  * SPDX-License-Identifier: BSD-3-Clause
4  ****************************************************************************/
5 
6 #pragma once
7 
8 #include <functional>
9 #include <memory>
10 #include <mutex>
11 #include <rclcpp/rclcpp.hpp>
12 
24 using SharedSubscriptionCallbackInstance =
25  std::unique_ptr<const unsigned, std::function<void(const unsigned*)>>;
26 
38 template <typename RosMessageType>
39 class SharedSubscription : public std::enable_shared_from_this<SharedSubscription<RosMessageType>> {
40  public:
41  using OnUpdateCallback = std::function<void(const typename RosMessageType::UniquePtr&)>;
42 
43  static SharedSubscription& instance(rclcpp::Node& node, const std::string& topic_name);
44 
45  [[nodiscard]] static SharedSubscriptionCallbackInstance create(rclcpp::Node& node,
46  const std::string& topic_name,
47  OnUpdateCallback callback)
48  {
49  return instance(node, topic_name).registerOnUpdateCallback(std::move(callback));
50  }
51 
52  [[nodiscard]] SharedSubscriptionCallbackInstance registerOnUpdateCallback(
53  OnUpdateCallback callback);
54 
55  const typename rclcpp::Subscription<RosMessageType>::SharedPtr& getSubscription() const
56  {
57  return _subscription;
58  }
59 
60  private:
61  explicit SharedSubscription(rclcpp::Node& node, const std::string& topic_name);
62  void unregisterOnUpdateCallback(unsigned token);
63  static void cleanupInstanceIfUnused(const std::shared_ptr<SharedSubscription>& instance);
64 
65  void onUpdate(const typename RosMessageType::UniquePtr& msg) const;
66 
67  typename rclcpp::Subscription<RosMessageType>::SharedPtr _subscription;
68  rclcpp::Node& _node;
69  unsigned _next_token{1};
70  std::map<unsigned, OnUpdateCallback>
71  _callbacks; // Needs guaranteed ordering, for consistent callback ordering
72 
73  struct Key {
74  rclcpp::Node* node;
75  std::string topic_name;
76  bool operator<(const Key& other) const
77  {
78  return node < other.node || topic_name < other.topic_name;
79  }
80  };
81 
82  static std::map<Key, std::shared_ptr<SharedSubscription>> instances;
83  static std::mutex mutex;
84 };
85 
86 template <typename RosMessageType>
87 std::map<typename SharedSubscription<RosMessageType>::Key,
88  std::shared_ptr<SharedSubscription<RosMessageType>>>
90 template <typename RosMessageType>
92 
93 template <typename RosMessageType>
95  rclcpp::Node& node, const std::string& topic_name)
96 {
97  // Maintain a single instance per node and topic name.
98  // Even though the ROS MultiThreadedExecutor is not supported by the library, a user might still
99  // use multiple Nodes running in different threads. Therefore, we use a mutex here.
100  const std::scoped_lock lg{mutex};
101  const Key key{&node, topic_name};
102  auto it = instances.find(key);
103  if (it != instances.end()) {
104  return *it->second;
105  }
106  const auto instance =
107  std::shared_ptr<SharedSubscription>(new SharedSubscription<RosMessageType>(node, topic_name));
108  instances.emplace(key, instance);
109  return *instance;
110 }
111 
112 template <typename RosMessageType>
113 SharedSubscriptionCallbackInstance SharedSubscription<RosMessageType>::registerOnUpdateCallback(
114  OnUpdateCallback callback)
115 {
116  const unsigned token = _next_token++;
117  const auto insert_ret = _callbacks.emplace(token, std::move(callback));
118  assert(insert_ret.second);
119  (void)insert_ret;
120  const std::weak_ptr instance = this->shared_from_this();
121  return SharedSubscriptionCallbackInstance(new unsigned(token), [instance](const unsigned* token) {
122  const auto instance_shared = instance.lock();
123  if (token && instance_shared) {
124  instance_shared->unregisterOnUpdateCallback(*token);
125  SharedSubscription::cleanupInstanceIfUnused(instance_shared);
126  }
127  });
128 }
129 
130 template <typename RosMessageType>
132  const std::string& topic_name)
133  : _node(node)
134 {
135  _subscription = _node.create_subscription<RosMessageType>(
136  topic_name, rclcpp::QoS(1).best_effort(),
137  [this](typename RosMessageType::UniquePtr msg) { onUpdate(msg); });
138 }
139 
140 template <typename RosMessageType>
142 {
143  const auto num_erased = _callbacks.erase(token);
144  assert(num_erased == 1);
145  (void)num_erased;
146 }
147 
148 template <typename RosMessageType>
150  const std::shared_ptr<SharedSubscription>& instance)
151 {
152  // Remove the instance from the map if it is not used anymore.
153  // This would not strictly be required, as the static object gets destroyed at program exit.
154  // However, this causes segfaults, see https://github.com/ros2/rmw_fastrtps/pull/770 (fixed in ROS
155  // Jazzy)
156  const std::scoped_lock lg{mutex};
157  if (instance->_callbacks.empty()) {
158  auto result = std::find_if(instances.begin(), instances.end(),
159  [&instance](const auto& obj) { return obj.second == instance; });
160  if (result != instances.end()) {
161  instances.erase(result);
162  } else {
163  RCLCPP_FATAL(instance->_node.get_logger(), "Instance not found");
164  }
165  }
166 }
167 
168 template <typename RosMessageType>
170  const typename RosMessageType::UniquePtr& msg) const
171 {
172  for (const auto& cb : _callbacks) {
173  cb.second(msg);
174  }
175 }
Shared class for a ROS subscription.
Definition: shared_subscription.hpp:39