Live programming IoT systems with MsgFlo+Flowhub

Last weekend at FOSDEM I presented in the Internet of Things (IoT) devroom,
showing how one can use MsgFlo with Flowhub to visually live-program devices that talk MQTT.

If the video does not work, try the alternatives here. See also full presentation notes, incl example code.

Background

Since announcing MsgFlo in 2015, it has mostly been used to build scalable backend systems (“cloud”), using AMQP and RabbitMQ. At The Grid we’ve been processing hundred thousands of jobs each week, so that usecase is pretty well tested by now.

However, MsgFlo was designed from the beginning to support multiple messaging systems (including MQTT), as well as other kinds of distributed systems – like a networks of embedded devices working together (one aspect of “IoT”). And in MsgFlo 0.10 this is starting to work pretty nicely.

Visual system architecture

Typical MQTT devices have the topic names hidden in code. Any documentation is typically kept in sync (or not…) by hand.
MsgFlo lets you represent your devices and services as FBP/dataflow “components”, and a system as a connected graph of component instances. Each device periodically sends a discovery message to the broker. This message describing the role name, as well as what ports exists (including the MQTT topic name). This leads to a system architecture which can be visualized easily:

Imaginary solution to a typically Norwegian problem: Avoiding your waterpipes freezing in the winter.

Rewire live system

In most MQTT devices, output is sent directly to the input of another device, by using the same MQTT topic name. This hardcodes the system functionality, reducing encapsulation and reusability.
MsgFlo each device *should* send output and receive inports on topic namespaced to the device.
Connections between devices are handled on the layer above, by the broker/router binding different topics together. With Flowhub, one can change these connections while the system is running.

Change program values on the fly

Changing a parameter or configuration of an embedded device usually requires changing the code and flashing it. This means recompiling and usually being connected to the device over USB. This makes the iteration cycle pretty slow and tedious.
In MsgFlo, devices can (and should!) expose their parameters on MQTT and declare them as inports.
Then they can be changed in Flowhub, the device instantly reflecting the new values.

Great for exploratory coding; quickly trying out different values to find the right one.
Examples are when tweaking animations or game mechanics, as it is near impossible to know up front what feels right.

Add component as adapters

MsgFlo encourages devices to be fairly stupid, focused on a single generally-useful task like providing sensor data, or a way to cause actions in the real world. This lets us define “applications” without touching the individual devices, and adapt the behavior of the system over time.

Imagine we have a device which periodically sends current temperature, as a floating-point number in Celcius unit. And a display device which can display text (for instance a small OLED). To show current temperature, we could wire them directly:

Our display would show something like “22.3333333”. Not very friendly, how does one know what this number means?

Better add a component to do some formatting.

Adding a Python component

Component formatting incoming temperature number to a friendly text string

And then insert it before the display. This will create a new process, and route the data through it.

Our display would now show “Temperature: 22.3 C”

Over time maybe the system grows further

Added another sensor, output now like “Inside 22.2 C Outside: -5.5 C”.

Getting started with MsgFlo

If you have existing “things” that support MQTT, you can start using MsgFlo by either:
1) Modifying the code to also send the discovery message.
2) Use the msgflo-foreign-participant tool to provide discovery without code changes

If you have new things, using one of the MsgFlo libraries is a quick way to support MQTT and MsgFlo. Right now there are libraries for Python, C++11, Node.js, NoFlo and Arduino.

guv: Automatic scaling of Heroku workers

At The Grid we do a lot of computationally heavy work server-side, in order to produce websites from user-provided content. This includes image analytics (for understanding the content), constraint solving (for page layout) and image processing (optimization and filtering to achieve a particular look). Currently we serve some thousand sites, with some hundred thousands sites expected by the time we’ve completed beta – so scalability is a core concern.

All computationally intensive work is put as jobs in a AMQP/RabbitMQ message queue, which are consumed by Heroku workers. To make it easy to manage many queues and worker roles we also use MsgFlo.
This provides us with the required flexibility to scale: the queues buffer the in-progress work, broker distributes evenly between available workers, and with Heroku we can change number of workers with one command. But, it still leaves us with the decision on how much compute capacity to provision. And when load is dynamic, it is tedious & inefficient to do it manually – especially as Heroku bills workers used by the second.

RabbitMQ and Heroku dashboards

Monitoring RabbitMQ queues and scaling Heroku workers manually when demand changes; not fun.

If we would instead regulate this every 1-5 minute based on demand, we would reduce costs. Or alternatively, with a fixed budget, provide a better quality-of-service. And most importantly, let developers worry about other things.

Of course, there already exists a number of solutions for this. However, some used particular metrics providers which we were not using, some used metrics with unclear relationship to required workers (like number of users), or had unacceptable limitations (only one worker per service, only run as a service with pay-by-number-of-workers).

guv

guv 0.1 implements a simple proportional scaling model. Based the current number of jobs in the queue, and an estimate of job processing time – it calculates the number of workers required for all work to be completed within a configured deadline.

guv system model

The deadline is the maximum time you allow for your users to wait for a completed job. The job processing time [average, deviation] can be calculated from metrics of previous jobs. And the number of jobs in queue is read directly from RabbitMQ.

# A simple guv config for one worker role.
# One guv instance typically manages many worker roles
'*':
  app: my-heroku-app
analyze:
  queue: 'analyze.IN' # RabbitMQ queue name
  worker: analyzeworker # Heroku dyno role name
  process: 20
  deadline: 120.0
  min: 1 # keep something always running
  max: 15 # budget limits

Now there are a couple limitations of this model. Primarily, it is completely reactive; we do not attempt to predict how traffic will develop in the future. Prediction is after all terribly tricky business – better not go there if it can be avoided.
And since it takes a non-zero amount of time to spin up a new worker (about 45-60 seconds), on a sudden spike in demand may cause some jobs to miss a tight deadline, as the workers can’t spin up fast enough. To compensate for this, there is some simple hysteresis: scale up more aggressively, and scale down a bit reluctanctly – we might need the workers next couple of minutes.

As a bonus, guv includes some integration with common metrics services: The statuspage.io metrics about ‘jobs-in-flight’ on status.thegrid.io, come directly from guv. And using New Relic Insights, we can analyze how the scaling is performing.

Last 2 days of guv scaling history on some of the workers roles at The Grid.

If we had a manual scaling with a constant number over 48 hours period, workers=35 (Max), then we would have paid at least 3-4 times more than we did with autoscaling (difference in size of area under Max versus area under the 10 minute line). Alternatively we could have provisioned a lower number of workers, but then with spikes above that number – our users would have suffered because things would be taking longer than normal.

We’ve been running this in production since early June. Back then we had 25 users, where as now we have several thousand. Apart from updating the configuration to reflect service changes we do not deal with scaling – the minute to minute decisions are all done by guv. Not much is planned in terms of new features for guv, apart from some more tools to analyze configuration. For more info on using guv, see the README.