11#include <rclcpp/rclcpp.hpp>
24using SharedSubscriptionCallbackInstance =
25 std::unique_ptr<
const unsigned, std::function<void(
const unsigned*)>>;
38template <
typename RosMessageType>
39class SharedSubscription :
public std::enable_shared_from_this<SharedSubscription<RosMessageType>> {
41 using OnUpdateCallback = std::function<void(
const typename RosMessageType::UniquePtr&)>;
43 static SharedSubscription& instance(rclcpp::Node& node,
const std::string& topic_name);
45 [[nodiscard]]
static SharedSubscriptionCallbackInstance create(rclcpp::Node& node,
46 const std::string& topic_name,
47 OnUpdateCallback callback)
49 return instance(node, topic_name).registerOnUpdateCallback(callback);
52 [[nodiscard]] SharedSubscriptionCallbackInstance registerOnUpdateCallback(
53 OnUpdateCallback callback);
55 const typename rclcpp::Subscription<RosMessageType>::SharedPtr& getSubscription()
const
62 void unregisterOnUpdateCallback(
unsigned token);
63 static void cleanupInstanceIfUnused(
const std::shared_ptr<SharedSubscription>& instance);
65 void onUpdate(
const typename RosMessageType::UniquePtr& msg)
const;
67 typename rclcpp::Subscription<RosMessageType>::SharedPtr _subscription;
69 unsigned _next_token{1};
70 std::map<unsigned, OnUpdateCallback>
75 std::string topic_name;
76 bool operator<(
const Key& other)
const
78 return node < other.node || topic_name < other.topic_name;
82 static std::map<Key, std::shared_ptr<SharedSubscription>> instances;
83 static std::mutex mutex;
86template <
typename RosMessageType>
87std::map<typename SharedSubscription<RosMessageType>::Key,
88 std::shared_ptr<SharedSubscription<RosMessageType>>>
90template <
typename RosMessageType>
93template <
typename RosMessageType>
95 rclcpp::Node& node,
const std::string& topic_name)
100 const std::lock_guard lg{mutex};
101 const Key key{&node, topic_name};
102 auto it = instances.find(key);
103 if (it != instances.end()) {
106 const auto instance =
108 instances.emplace(key, instance);
112template <
typename RosMessageType>
114 OnUpdateCallback callback)
116 const unsigned token = _next_token++;
117 const auto insert_ret = _callbacks.emplace(token, std::move(callback));
118 assert(insert_ret.second);
120 std::weak_ptr instance = this->shared_from_this();
121 return SharedSubscriptionCallbackInstance(
new unsigned(token), [instance](
const unsigned* token) {
122 const auto instance_shared = instance.lock();
123 if (token && instance_shared) {
124 instance_shared->unregisterOnUpdateCallback(*token);
125 SharedSubscription::cleanupInstanceIfUnused(instance_shared);
130template <
typename RosMessageType>
132 const std::string& topic_name)
135 _subscription = _node.create_subscription<RosMessageType>(
136 topic_name, rclcpp::QoS(1).best_effort(),
137 [
this](
typename RosMessageType::UniquePtr msg) { onUpdate(msg); });
140template <
typename RosMessageType>
143 const auto num_erased = _callbacks.erase(token);
144 assert(num_erased == 1);
148template <
typename RosMessageType>
150 const std::shared_ptr<SharedSubscription>& instance)
156 const std::lock_guard lg{mutex};
157 if (instance->_callbacks.empty()) {
158 auto result = std::find_if(instances.begin(), instances.end(),
159 [&instance](
const auto& obj) { return obj.second == instance; });
160 if (result != instances.end()) {
161 instances.erase(result);
163 RCLCPP_FATAL(instance->_node.get_logger(),
"Instance not found");
168template <
typename RosMessageType>
170 const typename RosMessageType::UniquePtr& msg)
const
172 for (
const auto& cb : _callbacks) {
Shared class for a ROS subscription.
Definition shared_subscription.hpp:39