ROS2: Template to program a Client: Unterschied zwischen den Versionen

Aus HSHL Mechatronik
Zur Navigation springen Zur Suche springen
Die Seite wurde neu angelegt: „Kategorie:ROS2 zurück zum Hauptartikel
 
Keine Bearbeitungszusammenfassung
 
Zeile 2: Zeile 2:


[[Overview_and_the_most_important_commands_of_ROS2|zurück zum Hauptartikel]]
[[Overview_and_the_most_important_commands_of_ROS2|zurück zum Hauptartikel]]
Note: the asynchronous mode is used so that the request does not block anything. Therefore this is the recommended way to do it.
<source line lang="python" style="font-size:small">
#!/usr/bin/env python # this specifies the interpreter of this script
# ros-core-library for standard ros2-functionality (this is the one for python)
import rclpy
# use ROS-nodes
from rclpy.node import Node
# Import the Trigger module from std_srvs Service interface of type std_srvs/srv/Trigger
from std_srvs.srv import Trigger
# create a Node "MyNode" with the name "text_recognition_client"
class MyNode(Node):
    def __init__(self):
        # Initialize the node with the name 'text_recognition_client'
        super().__init__('MyNodeName')
       
        # Create the Service Client object
        # This defines the name ('user_def_service') and type (Trigger) of the Service Server to connect to.
        name_service = '/user_def_service'
        self.client = self.create_client(Trigger, name_service)
       
        # Wait for the service to be available (checks every second)
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service '+name_service+' not available, waiting again...')
        # Create an empty Trigger request
        self.req = Trigger.Request()
    def send_request(self):
        # Send the request asynchronously
        self.future = self.client.call_async(self.req)
def main(args=None):
    # Initialize the ROS communication
    rclpy.init(args=args)
   
    # Declare the node constructor
    client = MyNode()
   
    # Run the send_request() method
    client.send_request()
    while rclpy.ok():
        # Spin once to check for a service response
        rclpy.spin_once(client)
       
        if client.future.done():
            try:
                # Check if a response from the service was received
                response = client.future.result()
            except Exception as e:
                # Log any exceptions
                client.get_logger().info(f'Service call failed: {e}')
            else:
                # Log the service response
                client.get_logger().info(f'Success: {response.success}, Detected text: {response.message}')
            break
    # Destroy the client node
    client.destroy_node()
   
    # Shutdown the ROS communication
    rclpy.shutdown()
if __name__ == '__main__':
    main()
</source>

Aktuelle Version vom 29. Januar 2026, 17:01 Uhr


zurück zum Hauptartikel

Note: the asynchronous mode is used so that the request does not block anything. Therefore this is the recommended way to do it.

#!/usr/bin/env python # this specifies the interpreter of this script

# ros-core-library for standard ros2-functionality (this is the one for python)
import rclpy
# use ROS-nodes
from rclpy.node import Node
# Import the Trigger module from std_srvs Service interface of type std_srvs/srv/Trigger
from std_srvs.srv import Trigger

# create a Node "MyNode" with the name "text_recognition_client" 
class MyNode(Node):

    def __init__(self):
        # Initialize the node with the name 'text_recognition_client'
        super().__init__('MyNodeName')
        
        # Create the Service Client object
        # This defines the name ('user_def_service') and type (Trigger) of the Service Server to connect to.
        name_service = '/user_def_service'
        self.client = self.create_client(Trigger, name_service)
        
        # Wait for the service to be available (checks every second)
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service '+name_service+' not available, waiting again...')

        # Create an empty Trigger request
        self.req = Trigger.Request()

    def send_request(self):
        # Send the request asynchronously
        self.future = self.client.call_async(self.req)


def main(args=None):
    # Initialize the ROS communication
    rclpy.init(args=args)
    
    # Declare the node constructor
    client = MyNode()
    
    # Run the send_request() method
    client.send_request()

    while rclpy.ok():
        # Spin once to check for a service response
        rclpy.spin_once(client)
        
        if client.future.done():
            try:
                # Check if a response from the service was received
                response = client.future.result()
            except Exception as e:
                # Log any exceptions
                client.get_logger().info(f'Service call failed: {e}')
            else:
                # Log the service response
                client.get_logger().info(f'Success: {response.success}, Detected text: {response.message}')
            break

    # Destroy the client node
    client.destroy_node()
    
    # Shutdown the ROS communication
    rclpy.shutdown()


if __name__ == '__main__':
    main()