Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for topic QOS for ros2topic bw, delay and hz #935

Open
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions ros2topic/ros2topic/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

from rclpy.duration import Duration
from rclpy.expand_topic_name import expand_topic_name
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSPresetProfiles
from rclpy.qos import QoSReliabilityPolicy
from rclpy.topic_or_service_is_hidden import topic_or_service_is_hidden
from rclpy.validate_full_topic_name import validate_full_topic_name
from ros2cli.node.strategy import NodeStrategy
Expand Down Expand Up @@ -253,3 +256,80 @@ def add_qos_arguments(parser: ArgumentParser, subscribe_or_publish: str, default
help=(
f'Quality of service liveliness lease duration setting to {subscribe_or_publish} '
'with (overrides liveliness lease duration value of --qos-profile option'))


def extract_qos_arguments(args):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand why this is needed here. it does not do anything but passing all variables into another class object, can you elaborate this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely useless. I had a few alternatives in mind but none that I was satisfied with:

  • just pass args to the _rostopic_verb functions as the qos parameter. But in that case I might as well remove the other arguments from _rostopic_verb since args has all of them
  • split each qos argument into their own function parameters but that would have added 6 parameters to both _rostopic_verb and choose_qos
    I'm not sure what would best fit the style of this package. What do you think ?

class QosArgs:
pass

qos = QosArgs()
qos.qos_profile = args.qos_profile
qos.qos_reliability = args.qos_reliability
qos.qos_durability = args.qos_durability
qos.qos_depth = args.qos_depth
qos.qos_history = args.qos_history
qos.qos_liveliness = args.qos_liveliness
qos.qos_liveliness_lease_duration_seconds = args.qos_liveliness_lease_duration_seconds

return qos


def choose_qos(node, topic_name: str, qos_args):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think moving this to ros2topic/api is nice to do here aligned with add_qos_arguments.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides that, i see choose_qos function is doing like adaptive qos configuration based on the concerned publisher information for durability and reliability to avoid incompatible QoS setting when it starts echo. I think this can be also applied to ros2 topic pub without this adaptive qos checking for the publisher? that does a bit more refactoring and cleaning for ros2topic qos functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. From what I understand apart from the adaptive QoS configuration choose_qos just calls qos_profile_from_short_keys which ends up calling the same function profile_configure_short_keys that is called by pub ?

if (qos_args.qos_reliability is not None or
qos_args.qos_durability is not None or
qos_args.qos_depth is not None or
qos_args.qos_history is not None or
qos_args.qos_liveliness is not None or
qos_args.qos_liveliness_lease_duration_seconds is not None):

return qos_profile_from_short_keys(
qos_args.qos_profile,
reliability=qos_args.qos_reliability,
durability=qos_args.qos_durability,
depth=qos_args.qos_depth,
history=qos_args.qos_history,
liveliness=qos_args.qos_liveliness,
liveliness_lease_duration_s=qos_args.qos_liveliness_lease_duration_seconds)

qos_profile = QoSPresetProfiles.get_from_short_key(qos_args.qos_profile)
reliability_reliable_endpoints_count = 0
durability_transient_local_endpoints_count = 0

pubs_info = node.get_publishers_info_by_topic(topic_name)
publishers_count = len(pubs_info)
if publishers_count == 0:
return qos_profile

for info in pubs_info:
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
reliability_reliable_endpoints_count += 1
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
durability_transient_local_endpoints_count += 1

# If all endpoints are reliable, ask for reliable
if reliability_reliable_endpoints_count == publishers_count:
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
else:
if reliability_reliable_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSReliabilityPolicy.RELIABLE. Falling back to '
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
'to all publishers'
)
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT

# If all endpoints are transient_local, ask for transient_local
if durability_transient_local_endpoints_count == publishers_count:
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
else:
if durability_transient_local_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
'QoSDurabilityPolicy.VOLATILE as it will connect '
'to all publishers'
)
qos_profile.durability = QoSDurabilityPolicy.VOLATILE

return qos_profile
18 changes: 12 additions & 6 deletions ros2topic/ros2topic/verb/bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import traceback

import rclpy
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import add_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import extract_qos_arguments
from ros2topic.api import get_msg_class
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand All @@ -62,19 +64,21 @@ class BwVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'topic',
'topic_name',
Comment on lines -65 to +67
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this? Is this related to what this PR is fixing? just checking.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed but I initially implemented this feature by removing all parameters from _rostopic_verb and just passing it args and noticed that different verbs used different argument names so I harmonized them.

help='Topic name to monitor for bandwidth utilization')
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
add_qos_arguments(parser, 'subscribe', 'sensor_data')
parser.add_argument(
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
Comment on lines -70 to +73
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, is this required for this PR?

help='maximum window size, in # of messages, for calculating rate '
f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW')
add_direct_node_arguments(parser)

def main(self, *, args):
qos_args = extract_qos_arguments(args)
with DirectNode(args) as node:
_rostopic_bw(node.node, args.topic, window_size=args.window)
_rostopic_bw(node.node, args.topic_name, qos_args, window_size=args.window_size)


class ROSTopicBandwidth(object):
Expand Down Expand Up @@ -150,20 +154,22 @@ def print_bw(self):
print(f'{bw} from {n} messages\n\tMessage size mean: {mean} min: {min_s} max: {max_s}')


def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE):
def _rostopic_bw(node, topic, qos, window_size=DEFAULT_WINDOW_SIZE):
"""Periodically print the received bandwidth of a topic to console until shutdown."""
# pause bw until topic is published
msg_class = get_msg_class(node, topic, blocking=True, include_hidden_topics=True)
if msg_class is None:
node.destroy_node()
return

qos_profile = choose_qos(node, topic_name=topic, qos_args=qos)

rt = ROSTopicBandwidth(node, window_size)
node.create_subscription(
msg_class,
topic,
rt.callback,
qos_profile_sensor_data,
qos_profile,
raw=True
)

Expand Down
20 changes: 13 additions & 7 deletions ros2topic/ros2topic/verb/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@

import rclpy

from rclpy.qos import qos_profile_sensor_data
from rclpy.time import Time
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import add_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import extract_qos_arguments
from ros2topic.api import get_msg_class
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand All @@ -50,12 +52,13 @@ class DelayVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'topic',
'topic_name',
help='Topic name to calculate the delay for')
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
add_qos_arguments(parser, 'subscribe', 'sensor_data')
parser.add_argument(
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
help='window size, in # of messages, for calculating rate, '
'string to (default: %d)' % DEFAULT_WINDOW_SIZE)
add_direct_node_arguments(parser)
Expand All @@ -65,9 +68,9 @@ def main(self, *, args):


def main(args):
qos_args = extract_qos_arguments(args)
with DirectNode(args) as node:
_rostopic_delay(
node.node, args.topic, window_size=args.window)
_rostopic_delay(node.node, args.topic_name, qos_args, window_size=args.window_size)


class ROSTopicDelay(object):
Expand Down Expand Up @@ -155,11 +158,12 @@ def print_delay(self):
% (delay * 1e-9, min_delta * 1e-9, max_delta * 1e-9, std_dev * 1e-9, window))


def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE):
def _rostopic_delay(node, topic, qos, window_size=DEFAULT_WINDOW_SIZE):
"""
Periodically print the publishing delay of a topic to console until shutdown.

:param topic: topic name, ``str``
:param qos: qos configuration of the subscriber
:param window_size: number of messages to average over, ``unsigned_int``
:param blocking: pause delay until topic is published, ``bool``
"""
Expand All @@ -170,12 +174,14 @@ def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE):
node.destroy_node()
return

qos_profile = choose_qos(node, topic_name=topic, qos_args=qos)

rt = ROSTopicDelay(node, window_size)
node.create_subscription(
msg_class,
topic,
rt.callback_delay,
qos_profile_sensor_data)
qos_profile)

timer = node.create_timer(1, rt.print_delay)
while rclpy.ok():
Expand Down
70 changes: 4 additions & 66 deletions ros2topic/ros2topic/verb/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@
from rclpy.event_handler import SubscriptionEventCallbacks
from rclpy.event_handler import UnsupportedEventTypeError
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSPresetProfiles
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.task import Future
from ros2cli.helpers import unsigned_int
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2topic.api import add_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import extract_qos_arguments
from ros2topic.api import get_msg_class
from ros2topic.api import positive_float
from ros2topic.api import qos_profile_from_short_keys
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension
from rosidl_runtime_py import message_to_csv
Expand Down Expand Up @@ -108,67 +106,6 @@ def add_arguments(self, parser, cli_name):
'--include-message-info', '-i', action='store_true',
help='Shows the associated message info.')

def choose_qos(self, node, args):

if (args.qos_reliability is not None or
args.qos_durability is not None or
args.qos_depth is not None or
args.qos_history is not None or
args.qos_liveliness is not None or
args.qos_liveliness_lease_duration_seconds is not None):

return qos_profile_from_short_keys(
args.qos_profile,
reliability=args.qos_reliability,
durability=args.qos_durability,
depth=args.qos_depth,
history=args.qos_history,
liveliness=args.qos_liveliness,
liveliness_lease_duration_s=args.qos_liveliness_lease_duration_seconds)

qos_profile = QoSPresetProfiles.get_from_short_key(args.qos_profile)
reliability_reliable_endpoints_count = 0
durability_transient_local_endpoints_count = 0

pubs_info = node.get_publishers_info_by_topic(args.topic_name)
publishers_count = len(pubs_info)
if publishers_count == 0:
return qos_profile

for info in pubs_info:
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
reliability_reliable_endpoints_count += 1
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
durability_transient_local_endpoints_count += 1

# If all endpoints are reliable, ask for reliable
if reliability_reliable_endpoints_count == publishers_count:
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
else:
if reliability_reliable_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSReliabilityPolicy.RELIABLE. Falling back to '
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
'to all publishers'
)
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT

# If all endpoints are transient_local, ask for transient_local
if durability_transient_local_endpoints_count == publishers_count:
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
else:
if durability_transient_local_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
'QoSDurabilityPolicy.VOLATILE as it will connect '
'to all publishers'
)
qos_profile.durability = QoSDurabilityPolicy.VOLATILE

return qos_profile

def main(self, *, args):

self.csv = args.csv
Expand Down Expand Up @@ -198,7 +135,8 @@ def main(self, *, args):

with NodeStrategy(args) as node:

qos_profile = self.choose_qos(node, args)
qos = extract_qos_arguments(args)
qos_profile = choose_qos(node, topic_name=args.topic_name, qos_args=qos)

if args.message_type is None:
message_type = get_msg_class(
Expand Down
19 changes: 14 additions & 5 deletions ros2topic/ros2topic/verb/hz.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
from rclpy.clock import Clock
from rclpy.clock import ClockType
from rclpy.executors import ExternalShutdownException
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import add_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import extract_qos_arguments
from ros2topic.api import get_msg_class
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand All @@ -62,6 +64,7 @@ def add_arguments(self, parser, cli_name):
help="Names of the ROS topic to listen to (e.g. '/chatter')")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
add_qos_arguments(parser, 'subscribe', 'sensor_data')
parser.add_argument(
'--window', '-w',
dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
Expand Down Expand Up @@ -93,9 +96,11 @@ def eval_fn(m):
else:
filter_expr = None

qos_args = extract_qos_arguments(args)

with DirectNode(args) as node:
_rostopic_hz(node.node, topics, window_size=args.window_size, filter_expr=filter_expr,
use_wtime=args.use_wtime)
_rostopic_hz(node.node, topics, qos_args, window_size=args.window_size,
filter_expr=filter_expr, use_wtime=args.use_wtime)


class ROSTopicHz(object):
Expand Down Expand Up @@ -278,11 +283,13 @@ def _get_ascii_table(header, cols):
return table


def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
def _rostopic_hz(node, topics, qos, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None,
use_wtime=False):
"""
Periodically print the publishing rate of a topic to console until shutdown.

:param topics: list of topic names, ``list`` of ``str``
:param qos: qos configuration of the subscriber
:param window_size: number of messages to average over, -1 for infinite, ``int``
:param filter_expr: Python filter expression that is called with m, the message instance
"""
Expand All @@ -299,11 +306,13 @@ def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None
print('WARNING: failed to find message type for topic [%s]' % topic)
continue

qos_profile = choose_qos(node, topic_name=topic, qos_args=qos)

node.create_subscription(
msg_class,
topic,
functools.partial(rt.callback_hz, topic=topic),
qos_profile_sensor_data)
qos_profile)
if topics_len > 1:
print('Subscribed to [%s]' % topic)

Expand Down
2 changes: 1 addition & 1 deletion ros2topic/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ def test_topic_bw(self):
assert topic_command.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
'Subscribed to [/defaults]',
re.compile(r'\d{2} B/s from \d+ messages'),
re.compile(r'\d{2,3} B/s from \d+ messages'),
re.compile(r'\s*Message size mean: \d{2} B min: \d{2} B max: \d{2} B')
], strict=True
), timeout=10)
Expand Down