PX4 ROS 2 Interface Library
Library to interface with PX4 from a companion computer using ROS 2
shared_subscription.hpp
1 /****************************************************************************
2  * Copyright (c) 2025 PX4 Development Team.
3  * SPDX-License-Identifier: BSD-3-Clause
4  ****************************************************************************/
5 
6 #pragma once
7 
8 #include <rclcpp/rclcpp.hpp>
9 #include <functional>
10 #include <mutex>
11 #include <memory>
12 
13 
25 using SharedSubscriptionCallbackInstance = std::unique_ptr<const unsigned,
26  std::function<void (const unsigned *)>>;
27 
38 template<typename RosMessageType>
39 class SharedSubscription : public std::enable_shared_from_this<SharedSubscription<RosMessageType>>
40 {
41 public:
42  using OnUpdateCallback =
43  std::function<void (const typename RosMessageType::UniquePtr &)>;
44 
45  static SharedSubscription & instance(
46  rclcpp::Node & node,
47  const std::string & topic_name);
48 
49  [[nodiscard]] static SharedSubscriptionCallbackInstance create(
50  rclcpp::Node & node, const std::string & topic_name, OnUpdateCallback callback)
51  {
52  return instance(node, topic_name).registerOnUpdateCallback(callback);
53  }
54 
55  [[nodiscard]] SharedSubscriptionCallbackInstance registerOnUpdateCallback(
56  OnUpdateCallback callback);
57 
58  const typename rclcpp::Subscription<RosMessageType>::SharedPtr & getSubscription() const
59  {
60  return _subscription;
61  }
62 
63 private:
64  explicit SharedSubscription(
65  rclcpp::Node & node,
66  const std::string & topic_name);
67  void unregisterOnUpdateCallback(unsigned token);
68  static void cleanupInstanceIfUnused(const std::shared_ptr<SharedSubscription> & instance);
69 
70  void onUpdate(const typename RosMessageType::UniquePtr & msg) const;
71 
72  typename rclcpp::Subscription<RosMessageType>::SharedPtr _subscription;
73  rclcpp::Node & _node;
74  unsigned _next_token{1};
75  std::map<unsigned, OnUpdateCallback> _callbacks; // Needs guaranteed ordering, for consistent callback ordering
76 
77  struct Key
78  {
79  rclcpp::Node * node;
80  std::string topic_name;
81  bool operator<(const Key & other) const
82  {
83  return node < other.node || topic_name < other.topic_name;
84  }
85  };
86 
87  static std::map<Key, std::shared_ptr<SharedSubscription>> instances;
88  static std::mutex mutex;
89 };
90 
91 template<typename RosMessageType>
92 std::map<typename SharedSubscription<RosMessageType>::Key,
93  std::shared_ptr<SharedSubscription<RosMessageType>>> SharedSubscription<RosMessageType>::instances;
94 template<typename RosMessageType>
96 
97 template<typename RosMessageType>
99  rclcpp::Node & node,
100  const std::string & topic_name)
101 {
102  // Maintain a single instance per node and topic name.
103  // Even though the ROS MultiThreadedExecutor is not supported by the library, a user might still use multiple Nodes
104  // running in different threads. Therefore, we use a mutex here.
105  const std::lock_guard lg{mutex};
106  const Key key{&node, topic_name};
107  auto it = instances.find(key);
108  if (it != instances.end()) {
109  return *it->second;
110  }
111  const auto instance =
112  std::shared_ptr<SharedSubscription>(new SharedSubscription<RosMessageType>(node, topic_name));
113  instances.emplace(key, instance);
114  return *instance;
115 }
116 
117 template<typename RosMessageType>
118 SharedSubscriptionCallbackInstance SharedSubscription<RosMessageType>::registerOnUpdateCallback(
119  OnUpdateCallback callback)
120 {
121  const unsigned token = _next_token++;
122  const auto insert_ret = _callbacks.emplace(token, std::move(callback));
123  assert(insert_ret.second);
124  (void)insert_ret;
125  std::weak_ptr instance = this->shared_from_this();
126  return SharedSubscriptionCallbackInstance(
127  new unsigned(token), [instance](const unsigned * token)
128  {
129  const auto instance_shared = instance.lock();
130  if (token && instance_shared) {
131  instance_shared->unregisterOnUpdateCallback(*token);
132  SharedSubscription::cleanupInstanceIfUnused(instance_shared);
133  }
134  });
135 }
136 
137 template<typename RosMessageType>
139  rclcpp::Node & node,
140  const std::string & topic_name)
141 : _node(node)
142 {
143  _subscription = _node.create_subscription<RosMessageType>(
144  topic_name, rclcpp::QoS(
145  1).best_effort(),
146  [this](typename RosMessageType::UniquePtr msg) {
147  onUpdate(msg);
148  });
149 }
150 
151 template<typename RosMessageType>
153 {
154  const auto num_erased = _callbacks.erase(token);
155  assert(num_erased == 1);
156  (void)num_erased;
157 }
158 
159 template<typename RosMessageType>
161  const std::shared_ptr<SharedSubscription> & instance)
162 {
163  // Remove the instance from the map if it is not used anymore.
164  // This would not strictly be required, as the static object gets destroyed at program exit. However, this causes
165  // segfaults, see https://github.com/ros2/rmw_fastrtps/pull/770 (fixed in ROS Jazzy)
166  const std::lock_guard lg{mutex};
167  if (instance->_callbacks.empty()) {
168  auto result = std::find_if(
169  instances.begin(), instances.end(),
170  [&instance](const auto & obj) {return obj.second == instance;});
171  if (result != instances.end()) {
172  instances.erase(result);
173  } else {
174  RCLCPP_FATAL(instance->_node.get_logger(), "Instance not found");
175  }
176  }
177 }
178 
179 template<typename RosMessageType>
180 void SharedSubscription<RosMessageType>::onUpdate(const typename RosMessageType::UniquePtr & msg)
181 const
182 {
183  for (const auto & cb : _callbacks) {
184  cb.second(msg);
185  }
186 }
Shared class for a ROS subscription.
Definition: shared_subscription.hpp:40