guv: Automatic scaling of Heroku workers

At The Grid we do a lot of computationally heavy work server-side, in order to produce websites from user-provided content. This includes image analytics (for understanding the content), constraint solving (for page layout) and image processing (optimization and filtering to achieve a particular look). Currently we serve some thousand sites, with some hundred thousands sites expected by the time we’ve completed beta – so scalability is a core concern.

All computationally intensive work is put as jobs in a AMQP/RabbitMQ message queue, which are consumed by Heroku workers. To make it easy to manage many queues and worker roles we also use MsgFlo.
This provides us with the required flexibility to scale: the queues buffer the in-progress work, broker distributes evenly between available workers, and with Heroku we can change number of workers with one command. But, it still leaves us with the decision on how much compute capacity to provision. And when load is dynamic, it is tedious & inefficient to do it manually – especially as Heroku bills workers used by the second.

RabbitMQ and Heroku dashboards

Monitoring RabbitMQ queues and scaling Heroku workers manually when demand changes; not fun.

If we would instead regulate this every 1-5 minute based on demand, we would reduce costs. Or alternatively, with a fixed budget, provide a better quality-of-service. And most importantly, let developers worry about other things.

Of course, there already exists a number of solutions for this. However, some used particular metrics providers which we were not using, some used metrics with unclear relationship to required workers (like number of users), or had unacceptable limitations (only one worker per service, only run as a service with pay-by-number-of-workers).

guv

guv 0.1 implements a simple proportional scaling model. Based the current number of jobs in the queue, and an estimate of job processing time – it calculates the number of workers required for all work to be completed within a configured deadline.

guv system model

The deadline is the maximum time you allow for your users to wait for a completed job. The job processing time [average, deviation] can be calculated from metrics of previous jobs. And the number of jobs in queue is read directly from RabbitMQ.

# A simple guv config for one worker role.
# One guv instance typically manages many worker roles
'*':
  app: my-heroku-app
analyze:
  queue: 'analyze.IN' # RabbitMQ queue name
  worker: analyzeworker # Heroku dyno role name
  process: 20
  deadline: 120.0
  min: 1 # keep something always running
  max: 15 # budget limits

Now there are a couple limitations of this model. Primarily, it is completely reactive; we do not attempt to predict how traffic will develop in the future. Prediction is after all terribly tricky business – better not go there if it can be avoided.
And since it takes a non-zero amount of time to spin up a new worker (about 45-60 seconds), on a sudden spike in demand may cause some jobs to miss a tight deadline, as the workers can’t spin up fast enough. To compensate for this, there is some simple hysteresis: scale up more aggressively, and scale down a bit reluctanctly – we might need the workers next couple of minutes.

As a bonus, guv includes some integration with common metrics services: The statuspage.io metrics about ‘jobs-in-flight’ on status.thegrid.io, come directly from guv. And using New Relic Insights, we can analyze how the scaling is performing.

Last 2 days of guv scaling history on some of the workers roles at The Grid.

If we had a manual scaling with a constant number over 48 hours period, workers=35 (Max), then we would have paid at least 3-4 times more than we did with autoscaling (difference in size of area under Max versus area under the 10 minute line). Alternatively we could have provisioned a lower number of workers, but then with spikes above that number – our users would have suffered because things would be taking longer than normal.

We’ve been running this in production since early June. Back then we had 25 users, where as now we have several thousand. Apart from updating the configuration to reflect service changes we do not deal with scaling – the minute to minute decisions are all done by guv. Not much is planned in terms of new features for guv, apart from some more tools to analyze configuration. For more info on using guv, see the README.

Announcing MsgFlo, a distributed FBP runtime

At The Grid we do a lot of CPU intensive work on the backend as part of producing web pages. This includes content extraction, normalization, image analytics, webpage auto-layout using constraint solvers, webpage optimization (GSS to CSS compilation) and image processing.

The system runs on Heroku, and spreads over some 10 different dyno roles, communicating between each other using AMQP message queues. Some of the dyno separation also deals with external APIs, allowing us to handle service failures and API rate limiting in a robust manner.

Majority of the workers are implemented using NoFlo, a flow-based-programming for Node.js (and browser), using Flowhub as our IDE. This gives us a strictly encapsulated, visual, introspectable view of the worker; making for a testable and easy-to-understand architecture.

Inside a process: In NoFlo each node is a JavaScript class

However NoFlo is only concerned about an individual worker process: it does not comprehend that it is a part of a bigger system.

Enter MsgFlo

MsgFlo is a new FBP runtime designed for distributed systems. Each node represents a separate process, and the connections (edges) between nodes are message queues in a broker process.
To make this distinction clearer, we’ve adopted the term participant for a node which participates in a MsgFlo network.
Because MsgFlo implements the same FBP runtime protocol and JSON graph format as NoFlo, imgflo, MicroFlo – we can use the same tools, including the .FBP DSL and Flowhub IDE.

Distributed MsgFlo system: HTTP frontends + workers in separate processes.

The graph above represents how different roles are wired together. There may be 1-N participants in the same role, for instance 10 dynos of the same dyno type on Heroku.
There can also be multiple participants in a single process. This can be useful to make different independent facets show up as independent nodes in a graph, even if they happen to be executing in the same process. One could use the same mechanism to implement a shared-nothing message-passing multithreading model, with the limitation that every message will pass through a broker.

Connections have pub-sub semantics, so generally each of the individual dynos will receive messages sent on the connection.
The special component msgflo/RoundRobin specifies that messages should be delivered in a round-robin fashion: new message goes only to the next process in that role with available capacity. The RoundRobin component also supports dead-lettering, so failed jobs can be routed to another queue. For instance to be re-processed at a later point automatically, or manually after developers have located and fixed the issue. This way one never loose pending work.
On AMQP roundrobin delivery and deadlettering can be fulfilled by the broker (e.g. RabbitMQ), so there is no dedicated process for that node.

Messaging systems

People use different messaging systems. We’ve tried to make sure that MsgFlo architecture and tools can be used with many different. The format and delivery of discovery messages is specified, and the tools have a transport abstraction layer. Currently there is production-level support for AMQP 0-9-1 (tested with RabbitMQ). Basic support exists for MQTT, a simple protocol popular in distributed “Internet-of-Things” type systems. Support for more transports can be added by implementing two classes.

Polyglot participation

MsgFlo itself only handles the discovery of participants and setup of the connections between them, as well as providing debug capabilities like Flowhub endpoint support. Having participants in a particular language requires implementing . We do provide a set of libraries that makes this easy for popular languages:

Using noflo-runtime-msgflo makes it super simple to use NoFlo as MsgFlo participants. The exported ports of the NoFlo graph or component (for instance ‘in’, ‘out’, and ‘error’) will be automatically made available as queues in MsgFlo, and one can connect this into a bigger system.

noflo-runtime-msgflo --name compute_foo --graph project/MyGraph

 

If you have some plain Node.js you can use msgflo-nodejs, like this real-life example from imgflo-server.

msgflo = require 'msgflo'

ProcessImageParticipant = (client, role) ->

  definition =
    component: 'imgflo-server/ProcessImage'
    icon: 'file-image-o'
    label: 'Executes image processing jobs'
    inports: [
      id: 'job'
      type: 'object'
    ]
    outports: [
      id: 'jobresult'
      type: 'object'
    ]

  func = (inport, job, send) ->
    throw new Error 'Unsupported port: ' + inport if inport != 'job'

    # XXX: use an error queue?
    @executor.doJob job, (result) ->
      send 'jobresult', null, result

  return new msgflo.participant.Participant client, definition, func, role

In addition to node.js and NoFlo, there is basic participant support provided for Python and for C++ (with AMQP). It took about about half a day and 2-300 lines of code, so adding support for more languages should be pretty simple. There are even tests you can reuse.

Example in Python using msgflo-python:

import msgflo

class Repeat(msgflo.Participant):
  def __init__(self, role):
    d = {
      'component': 'PythonRepeat',
      'label': 'Repeat input data without change',
    }
    msgflo.Participant.__init__(self, d, role)

  def process(self, inport, msg):
    self.send('out', msg.data)
    self.ack(msg)

Example becomes a bit more verbose in C++11, using msgflo-cpp.


class Repeat : public msgflo::Participant
{
    struct Def : public msgflo::Definition {
        Def(void) : msgflo::Definition()
        {
            component = "C++Repeat";
            label = "Repeats input on outport unchanged";
            outports = {
                { "out", "any", "" }
            };
        }
    };

public:
    Repeat(std::string role)
        : msgflo::Participant(role, Def())
    {
    }

private:
    virtual void process(std::string port, msgflo::Message msg)
    {
        std::cout << "Repeat.process()" << std::endl;
        msgflo::Message out;
        out.json = msg.json;
        send("out", out);
        ack(msg);
    }
};

Next

Since MsgFlo 0.3, we are using MsgFlo in production for all workers across The Grid backends. After migrating we’ve also moved more things into dedicated participants, because we now have the tooling that makes managing that complexity easy. Our short term focus now is more tools around MsgFlo, like deadline-based autoscaling and integration of data-driven testing using fbp-spec. Features planned for MsgFlo itself includes live introspection of messages in Flowhub.

Looking further ahead, we would like to make more use of the polyglot capabilities, for instance by move some of our image analytics out from NoFlo/node.js participants (with C/C++ libs) to pure C++ 11 participants.
I also hope to do some fun projects with MQTT and MicroFlo – and validate MsgFlo for Embedded/Internet-of-Things-type.