Module 4 Practical Exercises
Overview
This section contains hands-on exercises to reinforce your understanding of Vision-Language-Action (VLA) systems, including voice-to-action capabilities using OpenAI Whisper and cognitive planning using Large Language Models to translate natural language into ROS 2 actions. These exercises will help you gain practical experience with the integration of vision, language, and action components for humanoid robot control.
Exercise 1: Whisper-Based Voice Recognition Setup
Objective
Set up and configure OpenAI Whisper for voice command recognition in a humanoid robot system.
Instructions
- Install OpenAI Whisper and required dependencies
- Configure audio input for voice capture
- Test Whisper with various voice commands
- Integrate Whisper with ROS 2 messaging system
- Validate recognition accuracy and performance
Required Components
- OpenAI Whisper installation
- Audio recording setup
- ROS 2 integration
- Command validation system
Code Template
import whisper
import torch
import pyaudio
import numpy as np
from std_msgs.msg import String
import rclpy
from rclpy.node import Node
class WhisperVoiceNode(Node):
def __init__(self):
super().__init__('whisper_voice_node')
# Initialize Whisper model
self.get_logger().info("Loading Whisper model...")
self.model = whisper.load_model("base")
# Audio configuration
self.rate = 16000
self.chunk = 1024
self.format = pyaudio.paInt16
self.channels = 1
# Initialize PyAudio
self.audio = pyaudio.PyAudio()
# Publisher for recognized text
self.recognized_pub = self.create_publisher(String, '/recognized_speech', 10)
# Timer for continuous listening
self.listen_timer = self.create_timer(5.0, self.listen_and_recognize)
self.get_logger().info("Whisper Voice Node initialized")
def record_audio(self, duration=5):
"""Record audio from microphone"""
stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk
)
frames = []
for i in range(0, int(self.rate / self.chunk * duration)):
data = stream.read(self.chunk)
frames.append(data)
stream.stop_stream()
stream.close()
# Convert to numpy array
audio_data = b''.join(frames)
audio_np = np.frombuffer(audio_data, dtype=np.int16)
audio_np = audio_np.astype(np.float32) / 32768.0
return audio_np
def listen_and_recognize(self):
"""Listen for voice and recognize speech"""
try:
# Record audio
self.get_logger().info("Recording audio...")
audio_data = self.record_audio(duration=3)
# Transcribe using Whisper
self.get_logger().info("Transcribing audio...")
audio_tensor = torch.from_numpy(audio_data)
result = self.model.transcribe(audio_tensor, fp16=False)
recognized_text = result["text"].strip()
self.get_logger().info(f"Recognized: {recognized_text}")
# Publish recognized text
msg = String()
msg.data = recognized_text
self.recognized_pub.publish(msg)
except Exception as e:
self.get_logger().error(f"Error in voice recognition: {e}")
def destroy_node(self):
"""Clean up resources"""
self.audio.terminate()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = WhisperVoiceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Expected Output
- Whisper model loads successfully
- Audio recording works properly
- Speech recognition produces text output
- ROS 2 messages published correctly
Evaluation Criteria
- Whisper installation and configuration
- Audio recording functionality
- Recognition accuracy
- ROS 2 integration
Exercise 2: Cognitive Planning with LLMs
Objective
Implement cognitive planning using a Large Language Model to translate natural language commands into robot action sequences.
Instructions
- Set up LLM API access (OpenAI GPT, Anthropic Claude, etc.)
- Create a function to process natural language commands
- Implement task decomposition logic
- Generate executable action sequences
- Integrate with ROS 2 action execution
Code Template
import openai
import json
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
class CognitivePlanningNode(Node):
def __init__(self, api_key):
super().__init__('cognitive_planning_node')
# Set up LLM
openai.api_key = api_key
self.model = "gpt-4"
# Publishers and subscribers
self.command_sub = self.create_subscription(
String,
'/natural_language_command',
self.command_callback,
10
)
self.action_pub = self.create_publisher(
Twist,
'/cmd_vel',
10
)
# Available robot actions
self.robot_actions = {
"move_forward": {
"description": "Move robot forward",
"parameters": {"distance": "float", "speed": "float"}
},
"turn_left": {
"description": "Turn robot left",
"parameters": {"angle": "float", "speed": "float"}
},
"turn_right": {
"description": "Turn robot right",
"parameters": {"angle": "float", "speed": "float"}
},
"stop": {
"description": "Stop robot movement",
"parameters": {}
}
}
self.get_logger().info("Cognitive Planning Node initialized")
def command_callback(self, msg):
"""Process natural language command"""
command = msg.data
self.get_logger().info(f"Received command: {command}")
# Generate plan using LLM
plan = self.generate_plan(command)
# Execute plan
self.execute_plan(plan)
def generate_plan(self, command):
"""Generate action plan using LLM"""
prompt = f"""
Given the natural language command: "{command}"
Available robot actions: {json.dumps(self.robot_actions)}
Please decompose this command into a sequence of specific robot actions.
Each action should be one of the available actions with appropriate parameters.
Respond with the action sequence in JSON format:
{{
"actions": [
{{
"action": "action_name",
"parameters": {{"param1": "value1", "param2": "value2"}}
}}
]
}}
"""
try:
response = openai.ChatCompletion.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a robot task planner. Generate executable action sequences from natural language commands."},
{"role": "user", "content": prompt}
]
)
result = json.loads(response.choices[0].message.content)
return result.get("actions", [])
except Exception as e:
self.get_logger().error(f"Error generating plan: {e}")
return [{"action": "stop", "parameters": {}}]
def execute_plan(self, plan):
"""Execute the generated action plan"""
self.get_logger().info(f"Executing plan with {len(plan)} actions")
for action in plan:
self.execute_single_action(action)
def execute_single_action(self, action):
"""Execute a single action"""
action_name = action["action"]
parameters = action.get("parameters", {})
self.get_logger().info(f"Executing: {action_name} with {parameters}")
if action_name == "move_forward":
self.move_forward(parameters.get("distance", 1.0), parameters.get("speed", 0.5))
elif action_name == "turn_left":
self.turn_left(parameters.get("angle", 90.0), parameters.get("speed", 0.3))
elif action_name == "turn_right":
self.turn_right(parameters.get("angle", 90.0), parameters.get("speed", 0.3))
elif action_name == "stop":
self.stop_robot()
def move_forward(self, distance, speed):
"""Move robot forward"""
cmd = Twist()
cmd.linear.x = speed if distance > 0 else -speed
cmd.angular.z = 0.0
# Publish for duration based on distance and speed
duration = abs(distance) / abs(speed) if speed != 0 else 0
self.publish_for_duration(cmd, duration)
def turn_left(self, angle, speed):
"""Turn robot left"""
cmd = Twist()
cmd.linear.x = 0.0
cmd.angular.z = speed
# Convert angle to time (simplified)
duration = abs(angle) / 180.0 * 3.14 / speed if speed != 0 else 0
self.publish_for_duration(cmd, duration)
def turn_right(self, angle, speed):
"""Turn robot right"""
cmd = Twist()
cmd.linear.x = 0.0
cmd.angular.z = -speed
duration = abs(angle) / 180.0 * 3.14 / speed if speed != 0 else 0
self.publish_for_duration(cmd, duration)
def stop_robot(self):
"""Stop robot movement"""
cmd = Twist()
cmd.linear.x = 0.0
cmd.angular.z = 0.0
self.action_pub.publish(cmd)
def publish_for_duration(self, cmd, duration):
"""Publish command for specified duration"""
import time
start_time = time.time()
while time.time() - start_time < duration:
self.action_pub.publish(cmd)
# In real ROS 2, use proper rate control
def main(args=None, api_key="your-api-key-here"):
rclpy.init(args=args)
node = CognitivePlanningNode(api_key)
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Expected Output
- LLM processes natural language commands
- Action sequences generated correctly
- Robot actions executed via ROS 2
- Proper task decomposition
Exercise 3: Voice-to-Action Integration
Objective
Integrate voice recognition with cognitive planning to create a complete voice-controlled robot system.
Instructions
- Combine Whisper voice recognition with LLM cognitive planning
- Create a pipeline from voice input to robot action
- Implement error handling for recognition failures
- Add safety checks and validation
- Test with various voice commands
Code Template
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
import whisper
import openai
import pyaudio
import numpy as np
import threading
import queue
class VoiceToActionNode(Node):
def __init__(self, api_key):
super().__init__('voice_to_action_node')
# Initialize components
self.whisper_model = whisper.load_model("base")
openai.api_key = api_key
self.llm_model = "gpt-4"
# Audio configuration
self.audio = pyaudio.PyAudio()
self.rate = 16000
self.chunk = 1024
self.format = pyaudio.paInt16
self.channels = 1
# Publishers and subscribers
self.action_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.status_pub = self.create_publisher(String, '/voice_status', 10)
# Audio processing queue
self.audio_queue = queue.Queue()
self.processing_active = True
# Start audio recording thread
self.recording_thread = threading.Thread(target=self.record_audio_continuous)
self.recording_thread.daemon = True
self.recording_thread.start()
# Timer for processing audio
self.process_timer = self.create_timer(2.0, self.process_audio)
# Robot actions definition
self.robot_actions = {
"move_forward": {"linear": (0.5, 0.0), "duration": 2.0},
"move_backward": {"linear": (-0.5, 0.0), "duration": 2.0},
"turn_left": {"angular": 0.5, "duration": 1.0},
"turn_right": {"angular": -0.5, "duration": 1.0},
"stop": {"linear": (0.0, 0.0), "duration": 0.1}
}
self.get_logger().info("Voice-to-Action Node initialized")
def record_audio_continuous(self):
"""Continuously record audio in a separate thread"""
stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk
)
frames = []
while self.processing_active:
data = stream.read(self.chunk)
frames.append(data)
# Process every ~3 seconds of audio
if len(frames) * self.chunk >= self.rate * 3:
audio_data = b''.join(frames)
self.audio_queue.put(audio_data)
frames = []
stream.stop_stream()
stream.close()
def process_audio(self):
"""Process audio from queue"""
try:
while True: # Process all available audio
audio_data = self.audio_queue.get_nowait()
# Convert to numpy array
audio_np = np.frombuffer(audio_data, dtype=np.int16)
audio_np = audio_np.astype(np.float32) / 32768.0
# Transcribe
result = self.whisper_model.transcribe(audio_np, fp16=False)
recognized_text = result["text"].strip()
if recognized_text:
self.get_logger().info(f"Recognized: {recognized_text}")
# Process with cognitive planner
self.process_command_with_llm(recognized_text)
except queue.Empty:
pass # No more audio to process
def process_command_with_llm(self, command):
"""Process command using LLM cognitive planning"""
if not command.strip():
return
# Publish status
status_msg = String()
status_msg.data = f"Processing: {command}"
self.status_pub.publish(status_msg)
# Create LLM prompt for action planning
prompt = f"""
Natural language command: "{command}"
Available robot actions:
- move_forward: Move robot forward
- move_backward: Move robot backward
- turn_left: Turn robot left
- turn_right: Turn robot right
- stop: Stop robot movement
Please convert this command to the most appropriate robot action.
Respond with just the action name, nothing else.
"""
try:
response = openai.ChatCompletion.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "You are a robot command interpreter. Respond with only the action name from the available actions."},
{"role": "user", "content": prompt}
],
temperature=0.1
)
action_name = response.choices[0].message.content.strip().lower()
# Validate action
if action_name in self.robot_actions:
self.execute_action(action_name)
self.get_logger().info(f"Executed action: {action_name}")
else:
self.get_logger().warn(f"Unrecognized action: {action_name}")
self.execute_action("stop") # Safety fallback
except Exception as e:
self.get_logger().error(f"Error processing command with LLM: {e}")
self.execute_action("stop") # Safety fallback
def execute_action(self, action_name):
"""Execute a robot action"""
if action_name not in self.robot_actions:
self.get_logger().error(f"Unknown action: {action_name}")
return
action_def = self.robot_actions[action_name]
cmd = Twist()
if "linear" in action_def:
linear_speed, angular_speed = action_def["linear"]
cmd.linear.x = float(linear_speed)
cmd.angular.z = float(angular_speed)
elif "angular" in action_def:
cmd.angular.z = float(action_def["angular"])
# Publish command for specified duration
duration = action_def["duration"]
self.publish_command_for_duration(cmd, duration)
def publish_command_for_duration(self, cmd, duration):
"""Publish command for specified duration"""
import time
start_time = time.time()
while time.time() - start_time < duration:
self.action_pub.publish(cmd)
time.sleep(0.1) # 10Hz update rate
# Stop robot after action
stop_cmd = Twist()
self.action_pub.publish(stop_cmd)
def destroy_node(self):
"""Clean up resources"""
self.processing_active = False
self.audio.terminate()
super().destroy_node()
def main(args=None, api_key="your-api-key-here"):
rclpy.init(args=args)
node = VoiceToActionNode(api_key)
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Expected Output
- Continuous audio recording and processing
- Voice commands recognized and processed
- Appropriate robot actions executed
- Safety fallbacks implemented
Exercise 4: VLA System Integration and Testing
Objective
Create a complete Vision-Language-Action system that integrates voice recognition, cognitive planning, and robot execution in a simulated environment.
Instructions
- Set up a complete VLA system with all components
- Integrate with Gazebo simulation environment
- Test with various complex commands
- Implement error handling and recovery
- Validate system performance and safety
Launch File Template
<?xml version="1.0"?>
<launch>
<!-- Gazebo simulation -->
<include file="$(find-pkg-share gazebo_ros)/launch/gazebo.launch.py">
<arg name="world" value="$(find-pkg-share vla_exercises)/worlds/vla_test.world"/>
</include>
<!-- Robot state publisher -->
<node pkg="robot_state_publisher" exec="robot_state_publisher" name="robot_state_publisher">
<param name="robot_description" value="$(find-pkg-share vla_exercises)/urdf/test_robot.urdf"/>
</node>
<!-- Voice recognition node -->
<node pkg="vla_exercises" exec="whisper_voice_node" name="whisper_voice" output="screen">
<param name="model_size" value="base"/>
</node>
<!-- Cognitive planning node -->
<node pkg="vla_exercises" exec="cognitive_planning_node" name="cognitive_planner" output="screen">
<param name="llm_model" value="gpt-4"/>
</node>
<!-- Voice-to-action integration node -->
<node pkg="vla_exercises" exec="voice_to_action_node" name="vla_system" output="screen">
<param name="api_key" value="your-api-key"/>
</node>
<!-- Navigation stack -->
<include file="$(find-pkg-share nav2_bringup)/launch/navigation_launch.py">
<arg name="params_file" value="$(find-pkg-share vla_exercises)/config/vla_nav_params.yaml"/>
</include>
<!-- RViz for visualization -->
<node pkg="rviz2" exec="rviz2" name="rviz2" args="-d $(find-pkg-share vla_exercises)/rviz/vla_config.rviz"/>
</launch>
Testing Scenarios
# Test scenarios for VLA system
test_scenarios = [
{
"command": "move forward",
"expected": "Robot moves forward",
"validation": "Check robot position change"
},
{
"command": "turn left",
"expected": "Robot turns left",
"validation": "Check robot orientation change"
},
{
"command": "go to kitchen",
"expected": "Robot navigates to kitchen area",
"validation": "Check robot reaches kitchen location"
},
{
"command": "find the red ball",
"expected": "Robot searches for red ball",
"validation": "Check if ball is detected in camera feed"
},
{
"command": "stop",
"expected": "Robot stops all movement",
"validation": "Check robot velocity is zero"
}
]
def run_vla_tests():
"""Run comprehensive VLA system tests"""
import time
for i, scenario in enumerate(test_scenarios):
print(f"Running test {i+1}: {scenario['command']}")
# In simulation, send the command to the system
# This would typically involve publishing to the command topic
print(f" Expected: {scenario['expected']}")
print(f" Validation: {scenario['validation']}")
# Wait for execution
time.sleep(5) # Adjust based on expected execution time
print(f" Test {i+1} completed\n")
if __name__ == "__main__":
run_vla_tests()
Performance Metrics
class VLASystemMetrics:
def __init__(self):
self.response_times = []
self.recognition_accuracy = []
self.execution_success = []
self.safety_violations = []
def record_response_time(self, time_ms):
"""Record system response time"""
self.response_times.append(time_ms)
def record_recognition_accuracy(self, accuracy):
"""Record speech recognition accuracy"""
self.recognition_accuracy.append(accuracy)
def record_execution_success(self, success):
"""Record action execution success"""
self.execution_success.append(success)
def record_safety_violation(self, violation_type):
"""Record safety violation"""
self.safety_violations.append(violation_type)
def get_performance_report(self):
"""Generate performance report"""
if not self.response_times:
return "No data collected"
avg_response = sum(self.response_times) / len(self.response_times)
avg_accuracy = sum(self.recognition_accuracy) / len(self.recognition_accuracy) if self.recognition_accuracy else 0
success_rate = sum(self.execution_success) / len(self.execution_success) if self.execution_success else 0
return f"""
VLA System Performance Report:
- Average Response Time: {avg_response:.2f}ms
- Average Recognition Accuracy: {avg_accuracy:.2f}%
- Execution Success Rate: {success_rate:.2f}%
- Safety Violations: {len(self.safety_violations)}
"""
Expected Output
- Complete VLA system running in simulation
- Successful execution of test scenarios
- Performance metrics collected and reported
- Safety constraints enforced
Exercise 5: Autonomous Humanoid Capstone Integration
Objective
Integrate all VLA components into the complete Autonomous Humanoid capstone system and test end-to-end functionality.
Instructions
- Combine all previous exercises into a complete system
- Implement the Autonomous Humanoid capstone project
- Test with complex multi-step commands
- Validate system robustness and reliability
- Document lessons learned and improvements
Complete System Integration
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist, Pose
from sensor_msgs.msg import Image, Imu
from rclpy.action import ActionClient
from nav2_msgs.action import NavigateToPose
from builtin_interfaces.msg import Duration
import whisper
import openai
import pyaudio
import numpy as np
class AutonomousHumanoidSystem(Node):
def __init__(self, api_key):
super().__init__('autonomous_humanoid_system')
# Initialize all components
self.whisper_model = whisper.load_model("base")
openai.api_key = api_key
self.llm_model = "gpt-4"
# Audio setup
self.audio = pyaudio.PyAudio()
self.rate = 16000
self.chunk = 1024
self.format = pyaudio.paInt16
self.channels = 1
# Publishers
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.speech_pub = self.create_publisher(String, '/robot_speech', 10)
self.status_pub = self.create_publisher(String, '/system_status', 10)
# Subscribers
self.camera_sub = self.create_subscription(Image, '/camera/rgb/image_raw', self.camera_callback, 10)
self.imu_sub = self.create_subscription(Imu, '/imu/data', self.imu_callback, 10)
# Action clients
self.nav_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
# System state
self.world_state = {
"robot_position": (0, 0, 0),
"robot_orientation": 0.0,
"objects": {},
"locations": ["kitchen", "living_room", "bedroom", "office"]
}
# Execution state
self.current_plan = []
self.plan_index = 0
self.is_executing = False
self.listening_for_command = True
# Timers
self.voice_timer = self.create_timer(5.0, self.check_for_voice_command)
self.status_timer = self.create_timer(1.0, self.publish_system_status)
self.get_logger().info("Autonomous Humanoid System initialized")
def check_for_voice_command(self):
"""Check for and process voice commands"""
if not self.listening_for_command:
return
try:
# Record voice command
command_text = self.record_voice_command(duration=3)
if command_text.strip():
self.get_logger().info(f"Heard: {command_text}")
# Process with cognitive planning
plan = self.cognitive_plan_command(command_text)
if plan:
self.execute_plan(plan)
else:
self.speak_response("I didn't understand that command.")
except Exception as e:
self.get_logger().error(f"Error in voice processing: {e}")
def record_voice_command(self, duration=3):
"""Record voice command from microphone"""
stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk
)
frames = []
for _ in range(0, int(self.rate / self.chunk * duration)):
data = stream.read(self.chunk)
frames.append(data)
stream.stop_stream()
stream.close()
# Convert to numpy array
audio_data = b''.join(frames)
audio_np = np.frombuffer(audio_data, dtype=np.int16)
audio_np = audio_np.astype(np.float32) / 32768.0
# Transcribe using Whisper
result = self.whisper_model.transcribe(audio_np, fp16=False)
return result["text"].strip()
def cognitive_plan_command(self, command_text):
"""Generate execution plan using cognitive planning"""
prompt = f"""
Natural language command: "{command_text}"
Current world state:
- Robot position: {self.world_state['robot_position']}
- Known locations: {self.world_state['locations']}
- Known objects: {list(self.world_state['objects'].keys())}
Available robot capabilities:
- Navigation: move to locations, avoid obstacles
- Manipulation: pick up and place objects
- Perception: detect objects in environment
- Interaction: speak responses
Please decompose this command into a sequence of specific robot actions.
Consider the current world state and robot capabilities.
Respond with the action sequence in JSON format:
{{
"actions": [
{{
"action": "action_type",
"parameters": {{"param1": "value1"}}
}}
]
}}
"""
try:
response = openai.ChatCompletion.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "You are an advanced robot task planner. Generate detailed action sequences for humanoid robots."},
{"role": "user", "content": prompt}
]
)
result = response.choices[0].message.content
# Extract JSON from response (in real implementation, use proper JSON parsing)
import re
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if json_match:
plan_data = json.loads(json_match.group())
return plan_data.get("actions", [])
else:
return []
except Exception as e:
self.get_logger().error(f"Error in cognitive planning: {e}")
return []
def execute_plan(self, plan):
"""Execute the generated plan"""
if not plan:
self.speak_response("I couldn't create a plan for that command.")
return
self.current_plan = plan
self.plan_index = 0
self.is_executing = True
self.listening_for_command = False # Pause listening during execution
self.get_logger().info(f"Executing plan with {len(plan)} actions")
self.speak_response("Starting to execute your command.")
self.execute_next_action()
def execute_next_action(self):
"""Execute the next action in the plan"""
if (not self.current_plan or
self.plan_index >= len(self.current_plan) or
not self.is_executing):
# Plan completed
self.is_executing = False
self.listening_for_command = True
self.current_plan = []
self.plan_index = 0
self.speak_response("Task completed successfully.")
return
action = self.current_plan[self.plan_index]
action_type = action.get("action", "")
parameters = action.get("parameters", {})
self.get_logger().info(f"Executing action {self.plan_index + 1}: {action_type}")
# Execute based on action type
if action_type == "navigate_to_location":
self.execute_navigation_action(parameters)
elif action_type == "detect_objects":
self.execute_detection_action(parameters)
elif action_type == "speak":
self.execute_speak_action(parameters)
else:
self.get_logger().warn(f"Unknown action type: {action_type}")
self.plan_index += 1
if self.is_executing:
self.execute_next_action()
def execute_navigation_action(self, parameters):
"""Execute navigation action"""
location = parameters.get("location", "unknown")
# In a real system, you would look up coordinates for the location
# For this exercise, we'll use a simple mapping
location_map = {
"kitchen": (2.0, 1.0, 0.0),
"living_room": (1.0, 2.0, 0.0),
"bedroom": (4.0, 3.0, 0.0),
"office": (2.0, 4.0, 0.0)
}
if location in location_map:
target_pos = location_map[location]
target_pose = Pose()
target_pose.position.x = target_pos[0]
target_pose.position.y = target_pos[1]
target_pose.position.z = target_pos[2]
target_pose.orientation.w = 1.0 # Default orientation
self.send_navigation_goal(target_pose)
else:
self.get_logger().error(f"Unknown location: {location}")
self.plan_index += 1
if self.is_executing:
self.execute_next_action()
def send_navigation_goal(self, pose):
"""Send navigation goal to Nav2"""
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = 'map'
goal_msg.pose.pose = pose
self.nav_client.wait_for_server()
future = self.nav_client.send_goal_async(goal_msg)
future.add_done_callback(self.navigation_result_callback)
def navigation_result_callback(self, future):
"""Handle navigation result"""
try:
goal_result = future.result()
if goal_result.status == 3: # STATUS_SUCCEEDED (would normally use GoalStatus.STATUS_SUCCEEDED)
self.get_logger().info('Navigation succeeded')
self.plan_index += 1
if self.is_executing:
self.execute_next_action()
else:
self.get_logger().error('Navigation failed')
self.is_executing = False
self.listening_for_command = True
self.speak_response("Navigation failed. I couldn't reach the destination.")
except Exception as e:
self.get_logger().error(f'Error in navigation result callback: {e}')
self.is_executing = False
self.listening_for_command = True
def execute_detection_action(self, parameters):
"""Execute object detection action"""
target_objects = parameters.get("target_objects", ["object"])
self.get_logger().info(f"Detecting objects: {target_objects}")
# In a real system, this would trigger computer vision modules
# For this exercise, we'll simulate detection
detected = f"Detected: {', '.join(target_objects)}"
self.speak_response(detected)
# Update world state with detected objects
for obj in target_objects:
self.world_state["objects"][obj] = {"position": (0, 0, 0), "detected": True}
self.plan_index += 1
if self.is_executing:
self.execute_next_action()
def execute_speak_action(self, parameters):
"""Execute speech action"""
text = parameters.get("text", "Hello")
self.speak_response(text)
self.plan_index += 1
if self.is_executing:
self.execute_next_action()
def speak_response(self, text):
"""Make robot speak"""
self.get_logger().info(f"Robot says: {text}")
msg = String()
msg.data = text
self.speech_pub.publish(msg)
def publish_system_status(self):
"""Publish system status"""
status_msg = String()
status_msg.data = f"Executing: {self.is_executing}, Listening: {self.listening_for_command}, Actions in plan: {len(self.current_plan) if self.current_plan else 0}"
self.status_pub.publish(status_msg)
def camera_callback(self, msg):
"""Handle camera data"""
# Process camera feed for object detection
pass
def imu_callback(self, msg):
"""Handle IMU data"""
# Update robot orientation
self.world_state["robot_orientation"] = msg.orientation.z
def destroy_node(self):
"""Clean up resources"""
self.audio.terminate()
super().destroy_node()
def main(args=None, api_key="your-api-key-here"):
rclpy.init(args=args)
node = AutonomousHumanoidSystem(api_key)
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Expected Output
- Complete autonomous humanoid system operational
- Voice commands processed and executed
- Multi-step tasks completed successfully
- System status monitoring active
Assessment Questions
-
How does the integration of Whisper with cognitive planning enhance robot capabilities?
-
What are the main challenges in translating natural language to executable robot actions?
-
How do you ensure safety when using LLMs for robot control?
-
What performance metrics are important for VLA systems?
-
How would you extend this system for real-world deployment?
Submission Requirements
For each exercise, submit:
- Source code for all implemented components
- Configuration files and launch files
- Test results and performance metrics
- Screenshots of successful execution
- A detailed report documenting your implementation, challenges faced, and solutions implemented
Evaluation Rubric
- Functionality (40%): Systems work as expected and meet requirements
- Integration (25%): Components work together seamlessly
- Innovation (20%): Creative solutions and advanced features
- Documentation (15%): Clear explanations and proper documentation
Complete all exercises to gain comprehensive experience with Vision-Language-Action systems for Physical AI and humanoid robotics applications.