Sunday, July 8, 2018

Approaches to IO

I'm currently looking at ROS which, despite its name (Robot Operating System), is neither an operating system nor at its core particularly specific to robotics. ROS includes libraries that help you model and program robots, but at its core, it is a framework for distributed applications.

A quick overview: a ROS application consists of several nodes (processes) that communicate via topics (broadcast) and services (remote procedure calls). Using a message definition language, you can define which data is sent between nodes. There's also a master node which provides name resolution, so that clients can find the topics and services they're interested in. Libraries can provide you with message definitions and utilities, and you will usually use third-party nodes in your application. I would say these nodes still count as libraries, but unlike a traditional library you don't call their code in the context of your process. Instead the node runs as its own process and you use topics and services for communication.

This may sound familiar because it's basically the same as a microservice architecture or probably dozens of other approaches to distributed applications. What follows is applicable to a lot of different systems, but I'll use ROS to present my code samples.

Topics, publisher:
import rospy
from std_msgs.msg import String

pub = rospy.Publisher('chatter', String, queue_size=10)
rospy.init_node('talker', anonymous=True)
rate = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
    hello_str = "hello world %s" % rospy.get_time()
    rospy.loginfo(hello_str)
    pub.publish(hello_str)
    rate.sleep()
Topics, subscriber:
import rospy
from std_msgs.msg import String

def callback(data):
    rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)

rospy.init_node('listener', anonymous=True)
rospy.Subscriber("chatter", String, callback)

# spin() simply keeps python from exiting until this node is stopped
rospy.spin()
The code is simplified and taken from the ROS Wiki. It should be pretty easy to understand: the publisher sends a message to the chatter topic ten times a second, and the subscriber registers a callback to handle these messages. There can be any number of publishers and subscribers on the same topic.

A different approach to the same problem might be a more traditional blocking I/O API:
import rospy
from std_msgs.msg import String

rospy.init_node('listener', anonymous=True)
sub = rospy.Subscriber("chatter", String, callback)

while not rospy.is_shutdown():
    data = sub.receive()
    rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
This code is of course only hypothetical. The idea is that receive() would block until a new message arrived. What are the up- and downsides? One upside is that it is perfectly clear how the message is handled: right in the main thread. The original code only had a callback, and we had basically no idea where that is called. Does each subscription spawn a new thread? Are the callbacks called inside spin()? If so, what happens when we replace spin() by a busy loop? (for the record, my tests suggest one thread per subscription, but maybe ROS is smarter when the number of subscriptions rises. Or maybe not - another downside of the original approach?)

But of course there are upsides to the original code: for one, it hides the logic of receiving messages from the user. What you're interested in is the message payload, not the control flow you have to use to get it! And the most serious consideration: what if you have two subscriptions? You can't simply call receive for both, because it blocks. When subscription A doesn't get new messages, it won't let subscription B receive its messages, no matter how many there are.

This is a classic problem in blocking I/O, and the naive solution is multithreading: just block a dedicated thread that doesn't have to process anything else. For robotics, this is probably fine, but spawning and switching between operating system threads is relatively expensive and so not that appropriate for servers that operate under high load.

One step in the right direction would be a nonblocking receive:
import rospy
from std_msgs.msg import String

rospy.init_node('listener', anonymous=True)
sub1 = rospy.Subscriber("chatter", String, callback)
sub2 = rospy.Subscriber("gossip", String, callback)

while not rospy.is_shutdown():
    data = sub1.receive()
    if data is not None:
        rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
    data = sub2.receive()
    if data is not None:
        rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
Here, instead of blocking, receive will immediately return None if there is no message yet. If you haven't spotted it, the downside is that this is a busy loop: no matter whether there are messages or not, the CPU will constantly iterate through this code. In the blocking code, the operating system would just put the thread aside until the socket that receive is waiting for has new bytes. Now this is not the case and the OS will run the thread, even though it's actually blocked.

What we really want is some kind of multi-source polling. Let me show just one possible variant:
import rospy
from std_msgs.msg import String

rospy.init_node('listener', anonymous=True)
sub1 = rospy.Subscriber("chatter", String, callback)
sub2 = rospy.Subscriber("gossip", String, callback)

while not rospy.is_shutdown():
    rospy.poll(sub1, sub2)
    data = sub1.receive()
    if data is not None:
        rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
    data = sub2.receive()
    if data is not None:
        rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
here we still assume receive is nonblocking, but before running the loop, we use poll. poll will block until data on either subscription is available. Afterwards, we still need to check the subscriptions ourselves, but performance-wise this works.

It turns out that this - at least the principle - is a rather useful primitive for the operating system to provide. It allows to build many different abstractions efficiently. receive is not enough, but poll is.

No comments: