diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index e2393b202..8227f8413 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -12,17 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import array -from functools import partial import time -from typing import Any -from typing import Dict -from typing import List from typing import Optional from typing import TypeVar -import numpy - import rclpy from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy @@ -34,91 +27,13 @@ from ros2topic.api import TopicNameCompleter from ros2topic.api import TopicTypeCompleter from ros2topic.verb import VerbExtension -from rosidl_parser.definition import AbstractNestedType -from rosidl_parser.definition import NamespacedType -from rosidl_runtime_py.convert import get_message_slot_types -from rosidl_runtime_py.import_message import import_message_from_namespaced_type +from rosidl_runtime_py import set_message_fields from rosidl_runtime_py.utilities import get_message import yaml MsgType = TypeVar('MsgType') -def set_message_fields_expanded( - msg: Any, values: Dict[str, str], expand_header_auto: bool = False, - expand_time_now: bool = False) -> List[Any]: - """ - Set the fields of a ROS message. - - :param msg: The ROS message to populate. - :param values: The values to set in the ROS message. The keys of the dictionary represent - fields of the message. - :param expand_header_auto: If enabled and 'auto' is passed as a value to a - 'std_msgs.msg.Header' field, an empty Header will be instantiated and a setter function - will be returned so that its 'stamp' field can be set to the current time. - :param expand_time_now: If enabled and 'now' is passed as a value to a - 'builtin_interfaces.msg.Time' field, a setter function will be returned so that - its value can be set to the current time. - :returns: A list of setter functions that can be used to update 'builtin_interfaces.msg.Time' - fields, useful for setting them to the current time. The list will be empty if the message - does not have any 'builtin_interfaces.msg.Time' fields, or if expand_header_auto and - expand_time_now are false. - :raises AttributeError: If the message does not have a field provided in the input dictionary. - :raises TypeError: If a message value does not match its field type. - """ - timestamp_fields = [] - - def set_message_fields_expanded_internal( - msg: Any, values: Dict[str, str], - timestamp_fields: List[Any]) -> List[Any]: - try: - items = values.items() - except AttributeError: - raise TypeError( - "Value '%s' is expected to be a dictionary but is a %s" % - (values, type(values).__name__)) - for field_name, field_value in items: - field = getattr(msg, field_name) - field_type = type(field) - qualified_class_name = '{}.{}'.format(field_type.__module__, field_type.__name__) - if field_type is array.array: - value = field_type(field.typecode, field_value) - elif field_type is numpy.ndarray: - value = numpy.array(field_value, dtype=field.dtype) - elif type(field_value) is field_type: - value = field_value - # We can't import these types directly, so we use the qualified class name to - # distinguish them from other fields - elif qualified_class_name == 'std_msgs.msg._header.Header' and \ - field_value == 'auto' and expand_header_auto: - timestamp_fields.append(partial(setattr, field, 'stamp')) - continue - elif qualified_class_name == 'builtin_interfaces.msg._time.Time' and \ - field_value == 'now' and expand_time_now: - timestamp_fields.append(partial(setattr, msg, field_name)) - continue - else: - try: - value = field_type(field_value) - except TypeError: - value = field_type() - set_message_fields_expanded_internal( - value, field_value, timestamp_fields) - rosidl_type = get_message_slot_types(msg)[field_name] - # Check if field is an array of ROS messages - if isinstance(rosidl_type, AbstractNestedType): - if isinstance(rosidl_type.value_type, NamespacedType): - field_elem_type = import_message_from_namespaced_type(rosidl_type.value_type) - for n in range(len(value)): - submsg = field_elem_type() - set_message_fields_expanded_internal( - submsg, value[n], timestamp_fields) - value[n] = submsg - setattr(msg, field_name, value) - set_message_fields_expanded_internal(msg, values, timestamp_fields) - return timestamp_fields - - def nonnegative_int(inval): ret = int(inval) if ret < 0: @@ -283,7 +198,7 @@ def publisher( msg = msg_module() try: - timestamp_fields = set_message_fields_expanded( + timestamp_fields = set_message_fields( msg, values_dictionary, expand_header_auto=True, expand_time_now=True) except Exception as e: return 'Failed to populate field: {0}'.format(e) diff --git a/ros2topic/test/test_pub.py b/ros2topic/test/test_pub.py deleted file mode 100644 index 4f79f28a8..000000000 --- a/ros2topic/test/test_pub.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright 2022 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import builtins - -from builtin_interfaces.msg import Time -from ros2topic.verb.pub import set_message_fields_expanded -import rosidl_parser.definition -from std_msgs.msg import Header - - -class MockMessageStamped: - - __slots__ = [ - '_header', - ] - - _fields_and_field_types = { - 'header': 'std_msgs/Header', - } - - SLOT_TYPES = ( - rosidl_parser.definition.NamespacedType(['std_msgs', 'msg'], 'Header'), - ) - - def __init__(self): - self.header = Header() - - @builtins.property - def header(self): - return self._header - - @header.setter - def header(self, value): - self._header = value - - -class MockMessageWithStampFields: - - __slots__ = [ - '_timestamp1', - '_timestamp2', - ] - - _fields_and_field_types = { - 'timestamp1': 'builtin_interfaces/Time', - 'timestamp2': 'builtin_interfaces/Time', - } - - SLOT_TYPES = ( - rosidl_parser.definition.NamespacedType(['builtin_interfaces', 'msg'], 'Time'), - rosidl_parser.definition.NamespacedType(['builtin_interfaces', 'msg'], 'Time'), - ) - - def __init__(self): - self.timestamp1 = Time() - self.timestamp2 = Time() - - @builtins.property - def timestamp1(self): - return self._timestamp1 - - @timestamp1.setter - def timestamp1(self, value): - self._timestamp1 = value - - @builtins.property - def timestamp2(self): - return self._timestamp2 - - @timestamp2.setter - def timestamp2(self, value): - self._timestamp2 = value - - -def test_set_message_fields_expanded_header_auto(): - msg = MockMessageStamped() - values = {'header': 'auto'} - assert msg.header.stamp.sec == 0 and msg.header.stamp.nanosec == 0 - assert msg.header.frame_id == '' - timestamp_fields = set_message_fields_expanded( - msg, values, expand_header_auto=True, expand_time_now=True) - assert timestamp_fields is not None - for field_setter in timestamp_fields: - stamp = Time(sec=1, nanosec=2) - field_setter(stamp) - assert msg.header.stamp.sec == 1 and msg.header.stamp.nanosec == 2 - assert msg.header.frame_id == '' - - -def test_set_message_fields_expanded_stamp_now(): - msg = MockMessageStamped() - values = {'header': {'stamp': 'now'}} - assert msg.header.stamp.sec == 0 and msg.header.stamp.nanosec == 0 - assert msg.header.frame_id == '' - timestamp_fields = set_message_fields_expanded( - msg, values, expand_header_auto=True, expand_time_now=True) - assert timestamp_fields is not None - for field_setter in timestamp_fields: - stamp = Time(sec=1, nanosec=2) - field_setter(stamp) - assert msg.header.stamp.sec == 1 and msg.header.stamp.nanosec == 2 - assert msg.header.frame_id == '' - - -def test_set_message_fields_expanded_stamp_now_with_frame_id(): - msg = MockMessageStamped() - values = {'header': {'stamp': 'now', 'frame_id': 'hello'}} - assert msg.header.stamp.sec == 0 and msg.header.stamp.nanosec == 0 - assert msg.header.frame_id == '' - timestamp_fields = set_message_fields_expanded( - msg, values, expand_header_auto=True, expand_time_now=True) - assert timestamp_fields is not None - for field_setter in timestamp_fields: - stamp = Time(sec=1, nanosec=2) - field_setter(stamp) - assert msg.header.stamp.sec == 1 and msg.header.stamp.nanosec == 2 - assert msg.header.frame_id == 'hello' - - -def test_set_message_fields_expanded_stamp_now_with_timestamp_fields(): - msg = MockMessageWithStampFields() - values = {'timestamp1': 'now', 'timestamp2': 'now'} - assert msg.timestamp1.sec == 0 and msg.timestamp1.nanosec == 0 - assert msg.timestamp2.sec == 0 and msg.timestamp2.nanosec == 0 - timestamp_fields = set_message_fields_expanded( - msg, values, expand_header_auto=True, expand_time_now=True) - assert timestamp_fields is not None - for field_setter in timestamp_fields: - stamp = Time(sec=1, nanosec=2) - field_setter(stamp) - assert msg.timestamp1.sec == 1 and msg.timestamp1.nanosec == 2 - assert msg.timestamp2.sec == 1 and msg.timestamp2.nanosec == 2