ROS2: Template to program an Action Server

Aus HSHL Mechatronik
Zur Navigation springen Zur Suche springen


zurück zum Hauptartikel

#!/usr/bin/env python # this specifies the interpreter of this script

"""
Generic ROS 2 Action Server Example (Python)

This example shows in detail:
- How an Action Server is created
- How goals are received and processed
- How feedback is sent to a client
- How another Action Server can be called
- How internal state (e.g. odometry) can be used

IMPORTANT:
Replace "Fibonacci" with your own custom action type.
"""

#!/usr/bin/env python3

# --- Standard Python libraries ---
import time  # Provides basic time functions (e.g., sleep).
from math import sqrt  # Used for distance calculations.

# --- ROS 2 core libraries ---
import rclpy  # Main ROS 2 Python client library.
from rclpy.node import Node  # Base class for all ROS 2 nodes.
from rclpy.action import ActionServer, ActionClient  # Enables action server/client functionality.

# --- ROS 2 message and action types ---
from action_msgs.msg import GoalStatus  # Contains action result status constants.
from nav_msgs.msg import Odometry  # Provides robot position and velocity data.
from example_interfaces.action import Fibonacci  # Example action definition (replace in real use).

# Main class
class GenericActionServer(Node):
    """
    A reusable template for a ROS 2 Action Server node.

    This node:
    - Accepts incoming action goals
    - Processes them asynchronously
    - Optionally calls another action server
    - Publishes feedback during execution
    """

    def __init__(self):
        # Initialize the ROS 2 node with a unique name
        super().__init__('generic_action_server')

        # --- Action Server (handles incoming goals from clients) ---
        # This creates an action endpoint (similar to a service, but asynchronous)
        # Parameters:
        # 1) self              -> current node
        # 2) Fibonacci         -> action type (Goal, Result, Feedback definition)
        # 3) 'example_action'  -> name of the action topic
        #   self.execute_callback -> function that is executed when a goal is received
        # !! 1)-3) must be the same in ActionClient and ActionServer
        self.action_server = ActionServer(
            self,
            Fibonacci,
            'example_action',
            self.execute_callback
        )

        # --- Action Client (used to call another Action Server) ---
        # This allows this node to act as a client to another action
        self.external_action_client = ActionClient(
            self,
            Fibonacci,
            'external_action'
        )

        # --- Subscriber (example: listen to odometry data) ---
        # This demonstrates how sensor/state data can be integrated
        # into your action logic
        self.create_subscription(
            Odometry,          # message type
            '/odom',           # topic name
            self.odom_callback,# callback function
            10                 # queue size
        )

        # --- Internal state variables ---
        # Store the current robot position (updated via odometry)
        self.current_position = None

        # Store the previous position to calculate distance increments
        self.previous_position = None

        # Accumulated traveled distance
        self.total_distance = 0.0

        # Store current goal handle (needed to send feedback)
        self.goal_handle = None

        self.get_logger().info("Generic Action Server is ready.")

    # --- Subscriber callback ---
    def odom_callback(self, msg):
        """
        This function is called every time a new Odometry message arrives.

        Purpose:
        - Track the robot's movement
        - Compute the traveled distance over time
        """

        # Extract current position from odometry message
        position = msg.pose.pose.position
        self.current_position = position

        # If we already have a previous position, compute distance delta
        if self.previous_position is not None:
            distance_step = sqrt(
                (position.x - self.previous_position.x) ** 2 +
                (position.y - self.previous_position.y) ** 2
            )

            # Add incremental distance to total distance
            self.total_distance += distance_step

        # Update previous position for next iteration
        self.previous_position = position

    # --- Action execution ---
    async def execute_callback(self, goal_handle):
        """
        This function is automatically called when a client sends a goal.

        Workflow:
        1. Accept goal
        2. Initialize/reset state
        3. Perform the task (optionally calling other systems)
        4. Return a result
        """

        self.get_logger().info("Received new goal")

        # Store goal handle so we can send feedback later
        self.goal_handle = goal_handle

        # Reset tracking variables for this new goal
        self.total_distance = 0.0
        self.previous_position = None

        # Extract goal parameters from request
        # (depends on your custom action definition)
        target_value = goal_handle.request.order

        self.get_logger().info(f"Processing goal: {target_value}")

        # Call another Action Server (optional, example use case)
        success = await self.call_external_action(target_value)

        # --- Prepare result message ---
        result = Fibonacci.Result()

        if success:
            # Mark goal as successfully completed
            goal_handle.succeed()

            # Fill result message with meaningful data
            result.sequence = [0, 1, 2]  # Replace with real result

            self.get_logger().info("Goal succeeded")
        else:
            # Mark goal as failed
            goal_handle.abort()

            result.sequence = []

            self.get_logger().info("Goal failed")

        # Return result to client
        return result

    # --- Call another Action Server ---
    async def call_external_action(self, value):
        """
        Sends a goal to another Action Server and waits for the result.

        This demonstrates how to chain actions together.
        """

        # Create goal message for external action
        goal_msg = Fibonacci.Goal()
        goal_msg.order = value

        # Wait until the external server is available
        self.get_logger().info("Waiting for external server...")
        self.external_action_client.wait_for_server()

        # Send goal asynchronously (non-blocking)
        send_goal_future = self.external_action_client.send_goal_async(goal_msg)

        # Wait until the goal is accepted/rejected
        goal_handle = await send_goal_future

        if not goal_handle.accepted:
            self.get_logger().warn("External goal rejected")
            return False

        self.get_logger().info("External goal accepted")

        # Start a timer to periodically send feedback to the client
        self.feedback_timer = self.create_timer(1.0, self.publish_feedback)

        # Wait for the external action result
        result_future = goal_handle.get_result_async()
        result = await result_future

        # Stop feedback timer (task finished)
        self.feedback_timer.cancel()

        # Check result status
        if result.status == GoalStatus.STATUS_SUCCEEDED:
            self.get_logger().info("External action succeeded")
            return True
        else:
            self.get_logger().warn(f"External action failed: {result.status}")
            return False

    # --- Feedback publishing ---
    def publish_feedback(self):
        """
        Sends periodic feedback to the Action Client.

        Feedback is useful to:
        - Show progress
        - Provide intermediate results
        - Keep long-running tasks observable
        """

        # If no active goal exists, do nothing
        if self.goal_handle is None:
            return

        # Create feedback message
        feedback_msg = Fibonacci.Feedback()

        # Fill with example data (replace with meaningful values)
        feedback_msg.partial_sequence = [1, 2, 3]

        # Publish feedback to client
        self.goal_handle.publish_feedback(feedback_msg)

        self.get_logger().info("Feedback sent")

    # --- Cleanup ---
    def destroy_node(self):
        """
        Called when the node is shutting down.
        Used for cleanup if needed.
        """
        super().destroy_node()


# --- Main entry point ---
def main(args=None):
    # Initialize ROS 2 Python client library
    rclpy.init(args=args)

    # Create node instance
    node = GenericActionServer()

    try:
        # Keep node alive and processing callbacks
        rclpy.spin(node)
    except KeyboardInterrupt:
        # Graceful shutdown on CTRL+C
        pass

    # Destroy node explicitly
    node.destroy_node()

    # Shutdown ROS 2
    rclpy.shutdown()


if __name__ == '__main__':
    main()