- Description: ROS Python clients,
rospy (ROS 1) first then rclpy (ROS 2). Nodes, publishers/subscribers, services, timers, parameters, the bag APIs (rosbag and rosbag2_py), and migration notes
- My Notion Note ID: K2A-D2-3
- Created: 2023-04-05
- Updated: 2026-05-11
- License: Reuse is very welcome. Please credit Yu Zhang and link back to the original on yuzhang.io
Table of Contents
1. ROS 1 vs ROS 2 in Brief
- ROS 1 (Indigo, Kinetic, Melodic, Noetic), custom XML-RPC + TCPROS stack, central master. Final release Noetic EOL 2025-05-31.
- ROS 2 (Foxy, Humble, Jazzy, …), built on DDS, no central master, native QoS, real Windows/macOS support, lifecycle nodes. Active development.
- Use ROS 2 for new robots; many fleets still maintain ROS 1
ros1_bridge lets the two stacks talk during migration
2. ROS 1 with rospy
2.1 Node and init_node
- Every ROS process is a node
rospy.init_node registers it with the master
import rospy
if __name__ == "__main__":
rospy.init_node("listener", anonymous=False)
rospy.loginfo("listener started")
rospy.spin()
anonymous=True, random suffix so multiple instances of the same script can coexist
- Logging:
rospy.loginfo, logwarn, logerr, logfatal, logdebug
2.2 Publisher and Subscriber
from std_msgs.msg import String
pub = rospy.Publisher("/chatter", String, queue_size=10)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
msg = String(data="hello")
pub.publish(msg)
rate.sleep()
def callback(msg: String) -> None:
rospy.loginfo("got: %s", msg.data)
rospy.Subscriber("/chatter", String, callback, queue_size=10)
rospy.spin()
queue_size is the outbound (publisher) or inbound (subscriber) buffer in messages, set explicitly to avoid the unbounded-queue warning
2.3 Services
- Synchronous request/reply over a typed
.srv interface
from my_pkg.srv import AddTwoInts, AddTwoIntsResponse
def handle(req):
return AddTwoIntsResponse(req.a + req.b)
rospy.Service("add_two_ints", AddTwoInts, handle)
rospy.spin()
rospy.wait_for_service("add_two_ints")
add = rospy.ServiceProxy("add_two_ints", AddTwoInts)
resp = add(2, 3)
2.4 Parameters
rospy.set_param("/robot/name", "yuzhang_bot")
name = rospy.get_param("/robot/name", "default")
rospy.delete_param("/robot/name")
rospy.has_param("/robot/name")
- Parameters live on the master, global, not per-node
2.5 Time and Rate
now = rospy.Time.now()
duration = rospy.Duration(secs=1, nsecs=500_000_000)
t_after = now + duration
r = rospy.Rate(20)
while not rospy.is_shutdown():
do_step()
r.sleep()
- Sim time (
/use_sim_time=true + /clock publisher), rospy.Time.now() returns simulator time
2.6 rosbag (ROS 1)
import rosbag
with rosbag.Bag("recorded.bag", "r") as bag:
for topic, msg, t in bag.read_messages(topics=["/chatter", "/imu"]):
process(topic, msg, t)
with rosbag.Bag("out.bag", "w") as bag:
bag.write("/chatter", String(data="hello"), rospy.Time.now())
- Bag closed on
with exit
- CLI
rosbag info out.bag summarizes it
3. ROS 2 with rclpy
3.1 Node Class
- Idiomatic ROS 2, subclass
Node
import rclpy
from rclpy.node import Node
class Listener(Node):
def __init__(self):
super().__init__("listener")
self.get_logger().info("listener started")
def main():
rclpy.init()
node = Listener()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
get_logger(), node-scoped logger; .debug/.info/.warning/.error/.fatal
3.2 Publisher and Subscriber
from std_msgs.msg import String
class Talker(Node):
def __init__(self):
super().__init__("talker")
self.pub = self.create_publisher(String, "chatter", 10)
self.timer = self.create_timer(0.1, self.tick)
def tick(self):
self.pub.publish(String(data="hello"))
class Listener(Node):
def __init__(self):
super().__init__("listener")
self.create_subscription(String, "chatter", self.cb, 10)
def cb(self, msg):
self.get_logger().info(f"got: {msg.data}")
- Topic name has no leading
/ by convention, names resolved relative to the node's namespace
3.3 QoS Profiles
- DDS QoS replaces ROS 1's
queue_size
- A QoS depth integer is the shortcut; full profiles control reliability, history, durability
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
self.create_publisher(String, "chatter", qos)
- Publishers and subscribers must agree on QoS to connect
rclpy.qos_event, callbacks for QoS mismatches
3.4 Services and Clients
from example_interfaces.srv import AddTwoInts
class Server(Node):
def __init__(self):
super().__init__("adder")
self.srv = self.create_service(AddTwoInts, "add_two_ints", self.handle)
def handle(self, request, response):
response.sum = request.a + request.b
return response
class Client(Node):
def __init__(self):
super().__init__("adder_client")
self.cli = self.create_client(AddTwoInts, "add_two_ints")
self.cli.wait_for_service()
async def call(self, a, b):
req = AddTwoInts.Request(a=a, b=b)
return await self.cli.call_async(req)
call_async returns a Future, await it (asyncio-compatible executor) or rclpy.spin_until_future_complete
3.5 Parameters
self.declare_parameter("rate_hz", 10.0)
rate = self.get_parameter("rate_hz").get_parameter_value().double_value
add_on_set_parameters_callback, validate/react to runtime changes
3.6 Executors and Multi-Threading
from rclpy.executors import MultiThreadedExecutor
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node_a)
executor.add_node(node_b)
try:
executor.spin()
finally:
executor.shutdown()
- Use
ReentrantCallbackGroup for concurrent callbacks on the same node
- Default group serializes them
3.7 rosbag2_py
import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
reader = rosbag2_py.SequentialReader()
storage = rosbag2_py.StorageOptions(uri="path/to/bag", storage_id="sqlite3")
converter = rosbag2_py.ConverterOptions("", "")
reader.open(storage, converter)
type_map = {t.name: t.type for t in reader.get_all_topics_and_types()}
while reader.has_next():
topic, raw, t = reader.read_next()
msg_cls = get_message(type_map[topic])
msg = deserialize_message(raw, msg_cls)
process(topic, msg, t)
writer = rosbag2_py.SequentialWriter()
writer.open(storage, converter)
writer.create_topic(rosbag2_py.TopicMetadata(
name="/chatter", type="std_msgs/msg/String", serialization_format="cdr",
))
- Default storage backend:
sqlite3 (Foxy/Humble)
- Newer distros also offer
mcap
4. Migration: rospy → rclpy
| Concept |
rospy |
rclpy |
| Init |
rospy.init_node(name) |
rclpy.init(); class Node |
| Spin |
rospy.spin() |
rclpy.spin(node) |
| Publisher |
rospy.Publisher(topic, type, queue_size=) |
self.create_publisher(type, topic, qos) |
| Subscriber |
rospy.Subscriber(topic, type, cb) |
self.create_subscription(type, topic, cb, qos) |
| Service |
rospy.Service(name, srv, handler) |
self.create_service(srv, name, handler) |
| Param |
rospy.get_param(name) (global) |
self.declare_parameter(name); self.get_parameter(name) (per-node) |
| Time |
rospy.Time, rospy.Duration |
rclpy.time.Time, rclpy.duration.Duration |
| Rate |
rospy.Rate(hz) |
self.create_timer(period, cb) |
| Bag |
rosbag.Bag |
rosbag2_py.SequentialReader/Writer |
| Logging |
rospy.loginfo |
self.get_logger().info |
- Message type imports also change:
from std_msgs.msg import String works in both
- In rclpy the package path includes the kind explicitly when introspecting (
std_msgs/msg/String)
5. References