8 #include <rclcpp/rclcpp.hpp>
25 using SharedSubscriptionCallbackInstance = std::unique_ptr<
const unsigned,
26 std::function<void (
const unsigned *)>>;
38 template<
typename RosMessageType>
39 class SharedSubscription :
public std::enable_shared_from_this<SharedSubscription<RosMessageType>>
42 using OnUpdateCallback =
43 std::function<void (
const typename RosMessageType::UniquePtr &)>;
47 const std::string & topic_name);
49 [[nodiscard]]
static SharedSubscriptionCallbackInstance create(
50 rclcpp::Node & node,
const std::string & topic_name, OnUpdateCallback callback)
52 return instance(node, topic_name).registerOnUpdateCallback(callback);
55 [[nodiscard]] SharedSubscriptionCallbackInstance registerOnUpdateCallback(
56 OnUpdateCallback callback);
58 const typename rclcpp::Subscription<RosMessageType>::SharedPtr & getSubscription()
const
66 const std::string & topic_name);
67 void unregisterOnUpdateCallback(
unsigned token);
68 static void cleanupInstanceIfUnused(
const std::shared_ptr<SharedSubscription> & instance);
70 void onUpdate(
const typename RosMessageType::UniquePtr & msg)
const;
72 typename rclcpp::Subscription<RosMessageType>::SharedPtr _subscription;
74 unsigned _next_token{1};
75 std::map<unsigned, OnUpdateCallback> _callbacks;
80 std::string topic_name;
81 bool operator<(
const Key & other)
const
83 return node < other.node || topic_name < other.topic_name;
87 static std::map<Key, std::shared_ptr<SharedSubscription>> instances;
88 static std::mutex mutex;
91 template<
typename RosMessageType>
92 std::map<typename SharedSubscription<RosMessageType>::Key,
94 template<
typename RosMessageType>
97 template<
typename RosMessageType>
100 const std::string & topic_name)
105 const std::lock_guard lg{mutex};
106 const Key key{&node, topic_name};
107 auto it = instances.find(key);
108 if (it != instances.end()) {
111 const auto instance =
113 instances.emplace(key, instance);
117 template<
typename RosMessageType>
119 OnUpdateCallback callback)
121 const unsigned token = _next_token++;
122 const auto insert_ret = _callbacks.emplace(token, std::move(callback));
123 assert(insert_ret.second);
125 std::weak_ptr instance = this->shared_from_this();
126 return SharedSubscriptionCallbackInstance(
127 new unsigned(token), [instance](
const unsigned * token)
129 const auto instance_shared = instance.lock();
130 if (token && instance_shared) {
131 instance_shared->unregisterOnUpdateCallback(*token);
132 SharedSubscription::cleanupInstanceIfUnused(instance_shared);
137 template<
typename RosMessageType>
140 const std::string & topic_name)
143 _subscription = _node.create_subscription<RosMessageType>(
144 topic_name, rclcpp::QoS(
146 [
this](
typename RosMessageType::UniquePtr msg) {
151 template<
typename RosMessageType>
154 const auto num_erased = _callbacks.erase(token);
155 assert(num_erased == 1);
159 template<
typename RosMessageType>
161 const std::shared_ptr<SharedSubscription> & instance)
166 const std::lock_guard lg{mutex};
167 if (instance->_callbacks.empty()) {
168 auto result = std::find_if(
169 instances.begin(), instances.end(),
170 [&instance](
const auto & obj) {return obj.second == instance;});
171 if (result != instances.end()) {
172 instances.erase(result);
174 RCLCPP_FATAL(instance->_node.get_logger(),
"Instance not found");
179 template<
typename RosMessageType>
183 for (
const auto & cb : _callbacks) {
Shared class for a ROS subscription.
Definition: shared_subscription.hpp:40