Skip to content

Commit

Permalink
WIP: decode everything
Browse files Browse the repository at this point in the history
QoS parsing moved
MCAP code already drafted
  • Loading branch information
roncapat committed Oct 24, 2023
1 parent e19c113 commit 5be9891
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 100 deletions.
3 changes: 3 additions & 0 deletions rosbag2_storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ find_package(ament_cmake REQUIRED)
find_package(pluginlib REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rclcpp REQUIRED)
find_package(yaml_cpp_vendor REQUIRED)

add_library(
${PROJECT_NAME}
SHARED
src/rosbag2_storage/qos.cpp
src/rosbag2_storage/default_storage_id.cpp
src/rosbag2_storage/metadata_io.cpp
src/rosbag2_storage/ros_helper.cpp
Expand All @@ -45,6 +47,7 @@ target_link_libraries(${PROJECT_NAME}
pluginlib::pluginlib
rcpputils::rcpputils
rcutils::rcutils
rclcpp::rclcpp
yaml-cpp
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__QOS_HPP_
#define ROSBAG2_TRANSPORT__QOS_HPP_
#ifndef ROSBAG2_STORAGE__QOS_HPP_
#define ROSBAG2_STORAGE__QOS_HPP_

#include <map>
#include <string>
#include <vector>

#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/qos.hpp"
#include "yaml-cpp/yaml.h"

#include "rosbag2_transport/visibility_control.hpp"
#include "rosbag2_storage/yaml.hpp"

namespace rosbag2_transport
#include "rosbag2_storage/visibility_control.hpp"

namespace rosbag2_storage
{
/// Simple wrapper around rclcpp::QoS to provide a default constructor for YAML deserialization.
class ROSBAG2_TRANSPORT_PUBLIC Rosbag2QoS : public rclcpp::QoS
class ROSBAG2_STORAGE_PUBLIC Rosbag2QoS : public rclcpp::QoS
{
public:
Rosbag2QoS()
Expand All @@ -45,7 +46,7 @@ class ROSBAG2_TRANSPORT_PUBLIC Rosbag2QoS : public rclcpp::QoS

// Create an adaptive QoS profile to use for subscribing to a set of offers from publishers.
/**
* - Uses rosbag2_transport defaults for History since they do not affect compatibility.
* - Uses rosbag2_storage defaults for History since they do not affect compatibility.
* - Adapts Durability and Reliability, falling back to the least strict publisher when there
* is a mixed offer. This behavior errs on the side of forming connections with all publishers.
* - Does not specify Lifespan, Deadline, or Liveliness to be maximally compatible, because
Expand All @@ -67,56 +68,80 @@ class ROSBAG2_TRANSPORT_PUBLIC Rosbag2QoS : public rclcpp::QoS
const std::string & topic_name,
const std::vector<Rosbag2QoS> & profiles);
};
} // namespace rosbag2_transport
} // namespace rosbag2_storage

namespace YAML
{

/// Pass metadata version to the sub-structs of BagMetadata for deserializing.
/**
* Encoding should always use the current metadata version, so it does not need this value.
* We cannot extend the YAML::Node class to include this, so we must call it
* as a function with the node as an argument.
*/

template<typename T>
T decode_for_version(const Node & node, int version)
{
static_assert(
std::is_default_constructible<T>::value,
"Type passed to decode_for_version that has is not default constructible.");
if (!node.IsDefined()) {
throw TypedBadConversion<T>(node.Mark());
}
T value{};
if (convert<T>::decode(node, value, version)) {
return value;
}
throw TypedBadConversion<T>(node.Mark());
}

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<rmw_qos_history_policy_t>
struct ROSBAG2_STORAGE_PUBLIC convert<rmw_qos_history_policy_t>
{
static Node encode(const rmw_qos_history_policy_t & policy);
static bool decode(const Node & node, rmw_qos_history_policy_t & policy);
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<rmw_qos_reliability_policy_t>
struct ROSBAG2_STORAGE_PUBLIC convert<rmw_qos_reliability_policy_t>
{
static Node encode(const rmw_qos_reliability_policy_t & policy);
static bool decode(const Node & node, rmw_qos_reliability_policy_t & policy);
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<rmw_qos_durability_policy_t>
struct ROSBAG2_STORAGE_PUBLIC convert<rmw_qos_durability_policy_t>
{
static Node encode(const rmw_qos_durability_policy_t & policy);
static bool decode(const Node & node, rmw_qos_durability_policy_t & policy);
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<rmw_qos_liveliness_policy_t>
struct ROSBAG2_STORAGE_PUBLIC convert<rmw_qos_liveliness_policy_t>
{
static Node encode(const rmw_qos_liveliness_policy_t & policy);
static bool decode(const Node & node, rmw_qos_liveliness_policy_t & policy);
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<rmw_time_t>
struct ROSBAG2_STORAGE_PUBLIC convert<rmw_time_t>
{
static Node encode(const rmw_time_t & time);
static bool decode(const Node & node, rmw_time_t & time);
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<rosbag2_transport::Rosbag2QoS>
struct ROSBAG2_STORAGE_PUBLIC convert<rosbag2_storage::Rosbag2QoS>
{
static Node encode(const rosbag2_transport::Rosbag2QoS & qos);
static bool decode(const Node & node, rosbag2_transport::Rosbag2QoS & qos, int version);
static Node encode(const rosbag2_storage::Rosbag2QoS & qos);
static bool decode(const Node & node, rosbag2_storage::Rosbag2QoS & qos, int version);
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<std::vector<rosbag2_transport::Rosbag2QoS>>
struct ROSBAG2_STORAGE_PUBLIC convert<std::vector<rosbag2_storage::Rosbag2QoS>>
{
static Node encode(const std::vector<rosbag2_transport::Rosbag2QoS> & rhs)
static Node encode(const std::vector<rosbag2_storage::Rosbag2QoS> & rhs)
{
Node node{NodeType::Sequence};
for (const auto & value : rhs) {
Expand All @@ -126,25 +151,25 @@ struct ROSBAG2_TRANSPORT_PUBLIC convert<std::vector<rosbag2_transport::Rosbag2Qo
}

static bool decode(
const Node & node, std::vector<rosbag2_transport::Rosbag2QoS> & rhs, int version)
const Node & node, std::vector<rosbag2_storage::Rosbag2QoS> & rhs, int version)
{
if (!node.IsSequence()) {
return false;
}

rhs.clear();
for (const auto & value : node) {
auto temp = decode_for_version<rosbag2_transport::Rosbag2QoS>(value, version);
auto temp = decode_for_version<rosbag2_storage::Rosbag2QoS>(value, version);
rhs.push_back(temp);
}
return true;
}
};

template<>
struct ROSBAG2_TRANSPORT_PUBLIC convert<std::map<std::string, rosbag2_transport::Rosbag2QoS>>
struct ROSBAG2_STORAGE_PUBLIC convert<std::map<std::string, rosbag2_storage::Rosbag2QoS>>
{
static Node encode(const std::map<std::string, rosbag2_transport::Rosbag2QoS> & rhs)
static Node encode(const std::map<std::string, rosbag2_storage::Rosbag2QoS> & rhs)
{
Node node{NodeType::Sequence};
for (const auto & [key, value] : rhs) {
Expand All @@ -154,20 +179,20 @@ struct ROSBAG2_TRANSPORT_PUBLIC convert<std::map<std::string, rosbag2_transport:
}

static bool decode(
const Node & node, std::map<std::string, rosbag2_transport::Rosbag2QoS> & rhs, int version)
const Node & node, std::map<std::string, rosbag2_storage::Rosbag2QoS> & rhs, int version)
{
if (!node.IsMap()) {
return false;
}

rhs.clear();
for (const auto & element : node) {
auto temp = decode_for_version<rosbag2_transport::Rosbag2QoS>(element.second, version);
auto temp = decode_for_version<rosbag2_storage::Rosbag2QoS>(element.second, version);
rhs[element.first.as<std::string>()] = temp;
}
return true;
}
};
} // namespace YAML

#endif // ROSBAG2_TRANSPORT__QOS_HPP_
#endif // ROSBAG2_STORAGE__QOS_HPP_
5 changes: 2 additions & 3 deletions rosbag2_storage/include/rosbag2_storage/topic_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define ROSBAG2_STORAGE__TOPIC_METADATA_HPP_

#include <string>
#include "rclcpp/qos.hpp"

namespace rosbag2_storage
{
Expand All @@ -25,11 +26,9 @@ struct TopicMetadata
std::string name;
std::string type;
std::string serialization_format;
// Serialized std::vector<rclcpp::QoS> as a YAML string
std::string offered_qos_profiles;
std::vector<rclcpp::QoS> offered_qos_profiles;
// REP-2011 type description hash if available for topic, "" otherwise.
std::string type_description_hash;
int version = 9;

bool operator==(const rosbag2_storage::TopicMetadata & rhs) const
{
Expand Down
41 changes: 14 additions & 27 deletions rosbag2_storage/include/rosbag2_storage/yaml.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#endif

#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/qos.hpp"

namespace YAML
{
Expand Down Expand Up @@ -64,28 +65,6 @@ struct convert<std::unordered_map<std::string, std::string>>
}
};

/// Pass metadata version to the sub-structs of BagMetadata for deserializing.
/**
* Encoding should always use the current metadata version, so it does not need this value.
* We cannot extend the YAML::Node class to include this, so we must call it
* as a function with the node as an argument.
*/
template<typename T>
T decode_for_version(const Node & node, int version)
{
static_assert(
std::is_default_constructible<T>::value,
"Type passed to decode_for_version that has is not default constructible.");
if (!node.IsDefined()) {
throw TypedBadConversion<T>(node.Mark());
}
T value{};
if (convert<T>::decode(node, value, version)) {
return value;
}
throw TypedBadConversion<T>(node.Mark());
}

template<>
struct convert<rosbag2_storage::TopicMetadata>
{
Expand All @@ -95,21 +74,29 @@ struct convert<rosbag2_storage::TopicMetadata>
node["name"] = topic.name;
node["type"] = topic.type;
node["serialization_format"] = topic.serialization_format;
node["offered_qos_profiles"] = topic.offered_qos_profiles;
std::vector<rosbag2_storage::Rosbag2QoS> to_encode;
to_encode.reserve(topic.offered_qos_profiles.size());
std::transform(
topic.offered_qos_profiles.begin(), topic.offered_qos_profiles.end(), to_encode.begin(),
[](auto & qos) {return static_cast<rosbag2_storage::Rosbag2QoS>(qos);});
node["offered_qos_profiles"] = convert<std::vector<rosbag2_storage::Rosbag2QoS>>::encode(
to_encode);
node["type_description_hash"] = topic.type_description_hash;
return node;
}

static bool decode(const Node & node, rosbag2_storage::TopicMetadata & topic, int version)
{
topic.version = version;
topic.name = node["name"].as<std::string>();
topic.type = node["type"].as<std::string>();
topic.serialization_format = node["serialization_format"].as<std::string>();
if (version >= 4) {
topic.offered_qos_profiles = node["offered_qos_profiles"].as<std::string>();
} else {
topic.offered_qos_profiles = "";
auto decoded =
decode_for_version<std::vector<rosbag2_storage::Rosbag2QoS>>(
node["offered_qos_profiles"],
version);
topic.offered_qos_profiles.reserve(decoded.size());
std::copy(decoded.begin(), decoded.end(), topic.offered_qos_profiles.begin());
}
if (version >= 7) {
topic.type_description_hash = node["type_description_hash"].as<std::string>();
Expand Down
1 change: 1 addition & 0 deletions rosbag2_storage/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<depend>pluginlib</depend>
<depend>rcpputils</depend>
<depend>rcutils</depend>
<depend>rclcpp</depend>
<depend>yaml_cpp_vendor</depend>

<test_depend>ament_cmake_gtest</test_depend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <vector>


#include "rosbag2_transport/qos.hpp"
#include "logging.hpp"
#include "rosbag2_storage/qos.hpp"
#include "rosbag2_storage/logging.hpp"
#include "rmw/qos_string_conversions.h"

namespace
Expand Down Expand Up @@ -131,8 +131,8 @@ bool convert<rmw_time_t>::decode(const Node & node, rmw_time_t & time)
return true;
}

Node convert<rosbag2_transport::Rosbag2QoS>::encode(
const rosbag2_transport::Rosbag2QoS & qos)
Node convert<rosbag2_storage::Rosbag2QoS>::encode(
const rosbag2_storage::Rosbag2QoS & qos)
{
const auto & p = qos.get_rmw_qos_profile();
Node node;
Expand All @@ -148,8 +148,8 @@ Node convert<rosbag2_transport::Rosbag2QoS>::encode(
return node;
}

bool convert<rosbag2_transport::Rosbag2QoS>::decode(
const Node & node, rosbag2_transport::Rosbag2QoS & qos, int version)
bool convert<rosbag2_storage::Rosbag2QoS>::decode(
const Node & node, rosbag2_storage::Rosbag2QoS & qos, int version)
{
if (version <= 8) {
auto history = static_cast<rmw_qos_history_policy_t>(node["history"].as<int>());
Expand Down Expand Up @@ -179,7 +179,7 @@ bool convert<rosbag2_transport::Rosbag2QoS>::decode(
}
} // namespace YAML

namespace rosbag2_transport
namespace rosbag2_storage
{
Rosbag2QoS Rosbag2QoS::adapt_request_to_offers(
const std::string & topic_name, const std::vector<rclcpp::TopicEndpointInfo> & endpoints)
Expand Down Expand Up @@ -210,7 +210,7 @@ Rosbag2QoS Rosbag2QoS::adapt_request_to_offers(
request_qos.reliable();
} else {
if (reliability_reliable_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
ROSBAG2_STORAGE_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. "
"Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT "
Expand All @@ -226,7 +226,7 @@ Rosbag2QoS Rosbag2QoS::adapt_request_to_offers(
request_qos.transient_local();
} else {
if (durability_transient_local_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
ROSBAG2_STORAGE_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. "
"Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE "
Expand Down Expand Up @@ -296,10 +296,10 @@ Rosbag2QoS Rosbag2QoS::adapt_offer_to_recorded_offers(
return result.default_history();
}

ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
ROSBAG2_STORAGE_LOG_WARN_STREAM(
"Not all original publishers on topic " << topic_name << " offered the same QoS profiles. "
"Rosbag2 cannot yet choose an adapted profile to offer for this mixed case. "
"Falling back to the rosbag2_transport default publisher offer.");
"Falling back to the rosbag2_storage default publisher offer.");
return Rosbag2QoS{};
}
} // namespace rosbag2_transport
} // namespace rosbag2_storage
Loading

0 comments on commit 5be9891

Please sign in to comment.