GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: src/ros_queued_subscribe.hxx Lines: 0 38 0.0 %
Date: 2022-09-08 07:44:08 Branches: 0 82 0.0 %

Line Branch Exec Source
1
//
2
// Copyright (c) 2017-2018 CNRS
3
// Authors: Joseph Mirabel
4
//
5
//
6
7
#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
8
#define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
9
#define ENABLE_RT_LOG
10
11
#include <dynamic-graph/linear-algebra.h>
12
#include <dynamic-graph/real-time-logger.h>
13
#include <dynamic-graph/signal-cast-helper.h>
14
#include <dynamic-graph/signal-caster.h>
15
#include <std_msgs/Float64.h>
16
17
#include <boost/bind.hpp>
18
#include <boost/date_time/posix_time/posix_time.hpp>
19
#include <vector>
20
21
#include "dynamic_graph_bridge_msgs/Matrix.h"
22
#include "dynamic_graph_bridge_msgs/Vector.h"
23
24
namespace dynamicgraph {
25
namespace internal {
26
static const int BUFFER_SIZE = 1 << 10;
27
28
template <typename T>
29
struct Add {
30
  void operator()(RosQueuedSubscribe& rosSubscribe, const std::string& type,
31
                  const std::string& signal, const std::string& topic) {
32
    typedef typename SotToRos<T>::sot_t sot_t;
33
    typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
34
    typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
35
    typedef typename BindedSignal_t::Signal_t Signal_t;
36
37
    // Initialize the bindedSignal object.
38
    BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
39
    SotToRos<T>::setDefault(bs->last);
40
41
    // Initialize the signal.
42
    boost::format signalName("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
43
    signalName % rosSubscribe.getName() % type % signal;
44
45
    bs->signal.reset(new Signal_t(signalName.str()));
46
    bs->signal->setFunction(boost::bind(&BindedSignal_t::reader, bs, _1, _2));
47
    rosSubscribe.signalRegistration(*bs->signal);
48
49
    // Initialize the subscriber.
50
    typedef boost::function<void(const ros_const_ptr_t& data)> callback_t;
51
    callback_t callback =
52
        boost::bind(&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);
53
54
    // Keep 50 messages in queue, but only 20 are sent every 100ms
55
    // -> No message should be lost because of a full buffer
56
    bs->subscriber = boost::make_shared<ros::Subscriber>(
57
        rosSubscribe.nh().subscribe(topic, BUFFER_SIZE, callback));
58
59
    RosQueuedSubscribe::bindedSignal_t bindedSignal(bs);
60
    rosSubscribe.bindedSignal()[signal] = bindedSignal;
61
    rosSubscribe.topics()[signal] = topic;
62
  }
63
};
64
65
// template <typename T, typename R>
66
template <typename T, int N>
67
template <typename R>
68
void BindedSignal<T, N>::writer(const R& data) {
69
  // synchronize with method clear
70
  boost::mutex::scoped_lock lock(wmutex);
71
  if (full()) {
72
    rmutex.lock();
73
    frontIdx = (frontIdx + 1) % N;
74
    rmutex.unlock();
75
  }
76
  converter(buffer[backIdx], data);
77
  // No need to synchronize with reader here because:
78
  // - if the buffer was not empty, then it stays not empty,
79
  // - if it was empty, then the current value will be used at next time. It
80
  //   means the transmission bandwidth is too low.
81
  if (!init) {
82
    last = buffer[backIdx];
83
    init = true;
84
  }
85
  backIdx = (backIdx + 1) % N;
86
}
87
88
template <typename T, int N>
89
T& BindedSignal<T, N>::reader(T& data, int time) {
90
  // synchronize with method clear:
91
  // If reading from the list cannot be done, then return last value.
92
  boost::mutex::scoped_lock lock(rmutex, boost::try_to_lock);
93
  if (!lock.owns_lock() || entity->readQueue_ == -1 ||
94
      time < entity->readQueue_) {
95
    data = last;
96
  } else {
97
    if (empty())
98
      data = last;
99
    else {
100
      data = buffer[frontIdx];
101
      frontIdx = (frontIdx + 1) % N;
102
      last = data;
103
    }
104
  }
105
  return data;
106
}
107
}  // end of namespace internal.
108
109
template <typename T>
110
void RosQueuedSubscribe::add(const std::string& type, const std::string& signal,
111
                             const std::string& topic) {
112
  internal::Add<T>()(*this, type, signal, topic);
113
}
114
}  // end of namespace dynamicgraph.
115
116
#endif  //! DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX