PX4 ROS 2 Interface Library
Library to interface with PX4 from a companion computer using ROS 2
Loading...
Searching...
No Matches
shared_subscription.hpp
1/****************************************************************************
2 * Copyright (c) 2025 PX4 Development Team.
3 * SPDX-License-Identifier: BSD-3-Clause
4 ****************************************************************************/
5
6#pragma once
7
8#include <functional>
9#include <memory>
10#include <mutex>
11#include <rclcpp/rclcpp.hpp>
12
24using SharedSubscriptionCallbackInstance =
25 std::unique_ptr<const unsigned, std::function<void(const unsigned*)>>;
26
38template <typename RosMessageType>
39class SharedSubscription : public std::enable_shared_from_this<SharedSubscription<RosMessageType>> {
40 public:
41 using OnUpdateCallback = std::function<void(const typename RosMessageType::UniquePtr&)>;
42
43 static SharedSubscription& instance(rclcpp::Node& node, const std::string& topic_name);
44
45 [[nodiscard]] static SharedSubscriptionCallbackInstance create(rclcpp::Node& node,
46 const std::string& topic_name,
47 OnUpdateCallback callback)
48 {
49 return instance(node, topic_name).registerOnUpdateCallback(callback);
50 }
51
52 [[nodiscard]] SharedSubscriptionCallbackInstance registerOnUpdateCallback(
53 OnUpdateCallback callback);
54
55 const typename rclcpp::Subscription<RosMessageType>::SharedPtr& getSubscription() const
56 {
57 return _subscription;
58 }
59
60 private:
61 explicit SharedSubscription(rclcpp::Node& node, const std::string& topic_name);
62 void unregisterOnUpdateCallback(unsigned token);
63 static void cleanupInstanceIfUnused(const std::shared_ptr<SharedSubscription>& instance);
64
65 void onUpdate(const typename RosMessageType::UniquePtr& msg) const;
66
67 typename rclcpp::Subscription<RosMessageType>::SharedPtr _subscription;
68 rclcpp::Node& _node;
69 unsigned _next_token{1};
70 std::map<unsigned, OnUpdateCallback>
71 _callbacks; // Needs guaranteed ordering, for consistent callback ordering
72
73 struct Key {
74 rclcpp::Node* node;
75 std::string topic_name;
76 bool operator<(const Key& other) const
77 {
78 return node < other.node || topic_name < other.topic_name;
79 }
80 };
81
82 static std::map<Key, std::shared_ptr<SharedSubscription>> instances;
83 static std::mutex mutex;
84};
85
86template <typename RosMessageType>
87std::map<typename SharedSubscription<RosMessageType>::Key,
88 std::shared_ptr<SharedSubscription<RosMessageType>>>
90template <typename RosMessageType>
92
93template <typename RosMessageType>
95 rclcpp::Node& node, const std::string& topic_name)
96{
97 // Maintain a single instance per node and topic name.
98 // Even though the ROS MultiThreadedExecutor is not supported by the library, a user might still
99 // use multiple Nodes running in different threads. Therefore, we use a mutex here.
100 const std::lock_guard lg{mutex};
101 const Key key{&node, topic_name};
102 auto it = instances.find(key);
103 if (it != instances.end()) {
104 return *it->second;
105 }
106 const auto instance =
107 std::shared_ptr<SharedSubscription>(new SharedSubscription<RosMessageType>(node, topic_name));
108 instances.emplace(key, instance);
109 return *instance;
110}
111
112template <typename RosMessageType>
113SharedSubscriptionCallbackInstance SharedSubscription<RosMessageType>::registerOnUpdateCallback(
114 OnUpdateCallback callback)
115{
116 const unsigned token = _next_token++;
117 const auto insert_ret = _callbacks.emplace(token, std::move(callback));
118 assert(insert_ret.second);
119 (void)insert_ret;
120 std::weak_ptr instance = this->shared_from_this();
121 return SharedSubscriptionCallbackInstance(new unsigned(token), [instance](const unsigned* token) {
122 const auto instance_shared = instance.lock();
123 if (token && instance_shared) {
124 instance_shared->unregisterOnUpdateCallback(*token);
125 SharedSubscription::cleanupInstanceIfUnused(instance_shared);
126 }
127 });
128}
129
130template <typename RosMessageType>
132 const std::string& topic_name)
133 : _node(node)
134{
135 _subscription = _node.create_subscription<RosMessageType>(
136 topic_name, rclcpp::QoS(1).best_effort(),
137 [this](typename RosMessageType::UniquePtr msg) { onUpdate(msg); });
138}
139
140template <typename RosMessageType>
142{
143 const auto num_erased = _callbacks.erase(token);
144 assert(num_erased == 1);
145 (void)num_erased;
146}
147
148template <typename RosMessageType>
150 const std::shared_ptr<SharedSubscription>& instance)
151{
152 // Remove the instance from the map if it is not used anymore.
153 // This would not strictly be required, as the static object gets destroyed at program exit.
154 // However, this causes segfaults, see https://github.com/ros2/rmw_fastrtps/pull/770 (fixed in ROS
155 // Jazzy)
156 const std::lock_guard lg{mutex};
157 if (instance->_callbacks.empty()) {
158 auto result = std::find_if(instances.begin(), instances.end(),
159 [&instance](const auto& obj) { return obj.second == instance; });
160 if (result != instances.end()) {
161 instances.erase(result);
162 } else {
163 RCLCPP_FATAL(instance->_node.get_logger(), "Instance not found");
164 }
165 }
166}
167
168template <typename RosMessageType>
170 const typename RosMessageType::UniquePtr& msg) const
171{
172 for (const auto& cb : _callbacks) {
173 cb.second(msg);
174 }
175}
Shared class for a ROS subscription.
Definition shared_subscription.hpp:39