前提条件
确保你的 Conda 环境中安装了 rclpy 包。如果没有安装,可以使用以下命令进行安装:
创建 ROS 2 发布节点
创建一个名为 simple_ros2_node.py 的 Python 文件,并添加以下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import rclpy from rclpy.node import Node from std_msgs.msg import String
class MyRobotNode(Node): def __init__(self): super().__init__('robot_brain') self.publisher_ = self.create_publisher(String, 'chatter', 10) self.timer = self.create_timer(3, self.timer_callback)
def timer_callback(self): msg = String() msg.data = 'Robot is walking...' self.publisher_.publish(msg) self.get_logger().info('Publishing: "%s"' % msg.data)
def main(): rclpy.init() node = MyRobotNode() rclpy.spin(node) rclpy.shutdown()
if __name__ == '__main__': main()
|
创建 ROS 2 监听节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import rclpy from rclpy.node import Node from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self): super().__init__('minimal_subscriber') self.subscription = self.create_subscription( String, 'chatter', self.listener_callback, 10) self.subscription
def listener_callback(self, msg): self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None): rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
minimal_subscriber.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()
|
运行节点
在终端中运行以下命令来启动你的 ROS 2 节点:
此时,你运行 ros2 topic list 会看到 /chatter。

监听 node_one.py 发出的东西
在终端中运行以下命令:
1
| ros2 topic echo /chatter
|
你就会看到 data: Robot is walking...
或者
你就会看到 [INFO] [1770962759.315087388] [minimal_subscriber]: I heard: "Robot is walking..."
源码
GitHub: IsaacSimPlusROS2/An-Easy-Ros-2-Program