ROS2: Template to program an Action Server: Unterschied zwischen den Versionen
Zur Navigation springen
Zur Suche springen
Die Seite wurde neu angelegt: „Kategorie:ROS2 zurück zum Hauptartikel <source line lang="python" style="font-size:small"> #!/usr/bin/env python # this specifies the interpreter of this script """ Generic ROS 2 Action Server Example (Python) This example shows in detail: - How an Action Server is created - How goals are received and processed - How feedback is sent to a client - How another Action Server can be called - How i…“ |
Keine Bearbeitungszusammenfassung |
||
| Zeile 55: | Zeile 55: | ||
# This creates an action endpoint (similar to a service, but asynchronous) | # This creates an action endpoint (similar to a service, but asynchronous) | ||
# Parameters: | # Parameters: | ||
# | # 1) self -> current node | ||
# | # 2) Fibonacci -> action type (Goal, Result, Feedback definition) | ||
# | # 3) 'example_action' -> name of the action topic | ||
# self.execute_callback -> function that is executed when a goal is received | # self.execute_callback -> function that is executed when a goal is received | ||
# !! 1)-3) must be the same in ActionClient and ActionServer | |||
self.action_server = ActionServer( | self.action_server = ActionServer( | ||
self, | self, | ||
Aktuelle Version vom 26. März 2026, 16:03 Uhr
#!/usr/bin/env python # this specifies the interpreter of this script
"""
Generic ROS 2 Action Server Example (Python)
This example shows in detail:
- How an Action Server is created
- How goals are received and processed
- How feedback is sent to a client
- How another Action Server can be called
- How internal state (e.g. odometry) can be used
IMPORTANT:
Replace "Fibonacci" with your own custom action type.
"""
#!/usr/bin/env python3
# --- Standard Python libraries ---
import time # Provides basic time functions (e.g., sleep).
from math import sqrt # Used for distance calculations.
# --- ROS 2 core libraries ---
import rclpy # Main ROS 2 Python client library.
from rclpy.node import Node # Base class for all ROS 2 nodes.
from rclpy.action import ActionServer, ActionClient # Enables action server/client functionality.
# --- ROS 2 message and action types ---
from action_msgs.msg import GoalStatus # Contains action result status constants.
from nav_msgs.msg import Odometry # Provides robot position and velocity data.
from example_interfaces.action import Fibonacci # Example action definition (replace in real use).
# Main class
class GenericActionServer(Node):
"""
A reusable template for a ROS 2 Action Server node.
This node:
- Accepts incoming action goals
- Processes them asynchronously
- Optionally calls another action server
- Publishes feedback during execution
"""
def __init__(self):
# Initialize the ROS 2 node with a unique name
super().__init__('generic_action_server')
# --- Action Server (handles incoming goals from clients) ---
# This creates an action endpoint (similar to a service, but asynchronous)
# Parameters:
# 1) self -> current node
# 2) Fibonacci -> action type (Goal, Result, Feedback definition)
# 3) 'example_action' -> name of the action topic
# self.execute_callback -> function that is executed when a goal is received
# !! 1)-3) must be the same in ActionClient and ActionServer
self.action_server = ActionServer(
self,
Fibonacci,
'example_action',
self.execute_callback
)
# --- Action Client (used to call another Action Server) ---
# This allows this node to act as a client to another action
self.external_action_client = ActionClient(
self,
Fibonacci,
'external_action'
)
# --- Subscriber (example: listen to odometry data) ---
# This demonstrates how sensor/state data can be integrated
# into your action logic
self.create_subscription(
Odometry, # message type
'/odom', # topic name
self.odom_callback,# callback function
10 # queue size
)
# --- Internal state variables ---
# Store the current robot position (updated via odometry)
self.current_position = None
# Store the previous position to calculate distance increments
self.previous_position = None
# Accumulated traveled distance
self.total_distance = 0.0
# Store current goal handle (needed to send feedback)
self.goal_handle = None
self.get_logger().info("Generic Action Server is ready.")
# --- Subscriber callback ---
def odom_callback(self, msg):
"""
This function is called every time a new Odometry message arrives.
Purpose:
- Track the robot's movement
- Compute the traveled distance over time
"""
# Extract current position from odometry message
position = msg.pose.pose.position
self.current_position = position
# If we already have a previous position, compute distance delta
if self.previous_position is not None:
distance_step = sqrt(
(position.x - self.previous_position.x) ** 2 +
(position.y - self.previous_position.y) ** 2
)
# Add incremental distance to total distance
self.total_distance += distance_step
# Update previous position for next iteration
self.previous_position = position
# --- Action execution ---
async def execute_callback(self, goal_handle):
"""
This function is automatically called when a client sends a goal.
Workflow:
1. Accept goal
2. Initialize/reset state
3. Perform the task (optionally calling other systems)
4. Return a result
"""
self.get_logger().info("Received new goal")
# Store goal handle so we can send feedback later
self.goal_handle = goal_handle
# Reset tracking variables for this new goal
self.total_distance = 0.0
self.previous_position = None
# Extract goal parameters from request
# (depends on your custom action definition)
target_value = goal_handle.request.order
self.get_logger().info(f"Processing goal: {target_value}")
# Call another Action Server (optional, example use case)
success = await self.call_external_action(target_value)
# --- Prepare result message ---
result = Fibonacci.Result()
if success:
# Mark goal as successfully completed
goal_handle.succeed()
# Fill result message with meaningful data
result.sequence = [0, 1, 2] # Replace with real result
self.get_logger().info("Goal succeeded")
else:
# Mark goal as failed
goal_handle.abort()
result.sequence = []
self.get_logger().info("Goal failed")
# Return result to client
return result
# --- Call another Action Server ---
async def call_external_action(self, value):
"""
Sends a goal to another Action Server and waits for the result.
This demonstrates how to chain actions together.
"""
# Create goal message for external action
goal_msg = Fibonacci.Goal()
goal_msg.order = value
# Wait until the external server is available
self.get_logger().info("Waiting for external server...")
self.external_action_client.wait_for_server()
# Send goal asynchronously (non-blocking)
send_goal_future = self.external_action_client.send_goal_async(goal_msg)
# Wait until the goal is accepted/rejected
goal_handle = await send_goal_future
if not goal_handle.accepted:
self.get_logger().warn("External goal rejected")
return False
self.get_logger().info("External goal accepted")
# Start a timer to periodically send feedback to the client
self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
# Wait for the external action result
result_future = goal_handle.get_result_async()
result = await result_future
# Stop feedback timer (task finished)
self.feedback_timer.cancel()
# Check result status
if result.status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info("External action succeeded")
return True
else:
self.get_logger().warn(f"External action failed: {result.status}")
return False
# --- Feedback publishing ---
def publish_feedback(self):
"""
Sends periodic feedback to the Action Client.
Feedback is useful to:
- Show progress
- Provide intermediate results
- Keep long-running tasks observable
"""
# If no active goal exists, do nothing
if self.goal_handle is None:
return
# Create feedback message
feedback_msg = Fibonacci.Feedback()
# Fill with example data (replace with meaningful values)
feedback_msg.partial_sequence = [1, 2, 3]
# Publish feedback to client
self.goal_handle.publish_feedback(feedback_msg)
self.get_logger().info("Feedback sent")
# --- Cleanup ---
def destroy_node(self):
"""
Called when the node is shutting down.
Used for cleanup if needed.
"""
super().destroy_node()
# --- Main entry point ---
def main(args=None):
# Initialize ROS 2 Python client library
rclpy.init(args=args)
# Create node instance
node = GenericActionServer()
try:
# Keep node alive and processing callbacks
rclpy.spin(node)
except KeyboardInterrupt:
# Graceful shutdown on CTRL+C
pass
# Destroy node explicitly
node.destroy_node()
# Shutdown ROS 2
rclpy.shutdown()
if __name__ == '__main__':
main()