Setting Kafka's pace with Broadway

2021/05/12

Recently a colleague, Matt, and I were faced with a new Kafka experience: roughly a million messages placed on a highly partitioned Kafka topic, and a consumer which does a simple query to the database for each message - what followed was some impressively fast processing, a thrashed database, and some unhappy users. Below is the story of how we wrangled Kafka using Broadway in an environment where we are unable to change Kafka configuration.

KafkaEx Background

We use KafkaEx for consuming the Kafka service in our microservice applications. It’s generally easy to set up, and get going with, and we have few complaints about it.

In order to fully consume messages off a Kafka topic, KafkaEx will create at least one connection per partition. Each connection is in fact a GenConsumer, which is responsible for consuming messages. The implication of this is that if you have an application consuming a topic with 50 partitions, you will automatically end up with a minimum of 50 concurrent consumers. If you naively scale your application horizontally, especially without clustering, you can multiply your instance count by 50 in this case.

Having instances * partitions consumers per topic isn’t necessarily an issue in and of itself, however the kicker with KafkaEx is that unless you want to manually set up each of your consumers, you’re going to use a ConsumerGroup which doesn’t provide any mechanism for throttling other than by the mechanism of back-pressure.

The flow of data becomes ‘push’ rather than ‘pull’ from the perspective of our application Recall back to the experience we faced where a million messages were coming through, doing a simple database query each. The result was that instances * consumers all competing for a database connection over and over caused a huge load on a small database, and prevented APIs from getting access to a connection to the database in a timely fashion.

Cue the next feature

While a one-off (ish) of database starvation isn’t the end of the world for this particular application, we had a feature coming up that needed to increase the complexity of this message processor. Instead of a simple database call, it would have to double the database calls and make an outbound API call to another service.

Having the processing rate unlimited would now be unacceptable as it may interfere with the performance of other systems

At this point I knew we needed to introduce some artificial back-pressure in order to make sure neither our application, nor the external services, were being overwhelmed. KafkaEx will only grab more messages when it thinks it’s finished processing the messages it has in hand already. If we can convince it that it’s not done for a bit longer, we can slow down the process to be controlled.

Another option could be to use broadway_kafka at this point, but we have built some systems around the use of KafkaEx which would make this a non-trivial transition

Enter Broadway

I am a big fan of flow, which builds on top of gen_stage, so I had at least a little awareness of broadway. For those not familiar, Broadway is a library used to “Build concurrent and multi-stage data ingestion and data processing pipelines with Elixir.”

Broadway offers some key functionalities to us that make it perfect for solving our performance problem:

The components of a broadway pipeline

A Broadway pipeline needs a producer and a consumer. For us, we already produce data through KafkaEx, and we can easily consume that data. The problem with hooking up KafkaEx to Broadway, is that KafkaEx in a ConsumerGroup configuration moves data through a push mechanism, whereas Broadway works on the principle of downstream stages pulling data.

We can start a simple Broadway pipeline with just the below code, defining the consumer (processor) inline. This is well explained in the Broadway documentation:

defmodule BroadwayPipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: BroadwayPipeline,
      producer: [
        module: {Producer, []},
        concurrency: 1,
      ],
      processors: [
        default: [concurrency: 1]
      ]
    )
  end

  @impl true
  def handle_message(processor, message, context) do
    IO.inspect("Processing #{inspect(message)}")
    
    # Call the database, call some APIs, etc
    
    message
  end
end

The part that’s missing is Producer; the glue between KafkaEx and Broadway.

The glue

As I mentioned before, we need to turn the data flow from push to pull, and we need to force backpressure on the kafka consumer. One way to achieve this is a simple GenServer which queues messages from Kafka, and forces Kafka to wait before putting more messages into the queue.

defmodule BlockingBuffer do
  @moduledoc """
  A process which holds a queue and blocks when the length of the queue
  exceeds a predefined number (100)
  """

  use GenServer
  
  @max_buffer 100 # Tweak as required, or move into opts

  # GenServer startup
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(__opts) do
    {:ok, {:queue.new(), 0}}
  end

  # Public API
  @spec push(any) :: :ok | no_return
  def push(message) do
    case GenServer.call(__MODULE__, {:push, message}) do
      :wait ->
        100 # ms - consider tweaking and adding jitter
        :timer.sleep(wait_time) # Calling process (KafkaEx consumer) sleeps
        throttled_push(message) # Try again!

      :pushed ->
        :ok
    end
  end

  @spec drain(pos_integer) :: list(any) 
  def drain(demand) do
    GenServer.call(__MODULE__, {:drain, demand})
  end

  # Message callbacks
  @impl true
  def handle_call({:push, _message}, _from, {_queue, count} = state) when count > @max_buffer do
    {:reply, :wait, state}
  end

  def handle_call({:push, message}, _from, {queue, count}) do
    {:reply, :pushed, {:queue.in(message, queue), count + 1}}
  end

  def handle_call({:drain, demand}, _from, {queue, count}) when demand > 0 do
    {items, queue} =
      Enum.reduce((0..demand), {[], queue}, fn _i, {items, queue} ->
        case :queue.out(queue) do
          {{:value, value}, queue} ->
            {[value | items], queue}

          {:empty, queue} ->
            {items, queue}
        end
      end)

    {:reply, Enum.reverse(items), {queue, count - length(items)}}
  end
end

Now we’ve got a process that converts push to pull, we can write a producer which simply pulls from this queue when downstream stages ask it for demand. We also need to store how much demand the producer has been asked for, so that if there are no kafka messages for a time, then some start being received again, we can kickstart the pipeline once more.

defmodule Producer do
  use GenStage

  @behaviour Broadway.Producer

  def init(_opts) do
    {:producer, 0}
  end

  def handle_demand(demand, stored_demand) do
    total_demand = demand + stored_demand

    messages =
      total_demand
      |> BlockingBuffer.drain()
      |> Enum.map(&%Broadway.Message{data: &1, acknowledger: {Broadway.NoopAcknowledger, nil, nil}})
    # ^ Create a broadway message from the kafka message
      
    message_count = length(messages)

    # If we've run out of messages, we need to kickstart the pipeline again
    # so, try drain again soon
    if message_count < total_demand do
      Process.send_after(self(), :kick_start, 1_000)
    end

    {:noreply, messages, total_demand - message_count}
  end
  
  def handle_info(:kick_start, stored_demand) do
    handle_demand(0, stored_demand)
  end
end

This is great - now we can change our Kafka consumer to use BlockingBuffer.push(message) and update the BroadwayPipeline.handle_message/3 to do the processing of the Kafka message. This will move all the concurrent processing into the Broadway pipeline. As our configuration stands, we’re going to be processing in a single process, at an unlimited rate… but broadway lets us change this easily in configuration. Let’s alter the way we start the broadway pipeline now. Let’s imagine we want to process a million messages a day, and we want to restrict the open database connections per instance to 5 - we can easily configure that like below:

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: BroadwayPipeline,
      producer: [
        module: {Producer, []},
        concurrency: 1,
        rate_limiting: [allowed_messages: 11, interval: 1000] # 11/s is just over 950k / day
      ],
      processors: [
        default: [concurrency: 5]
      ]
    )
  end

Summary

Usually people look to Elixir to parallelise their workload, but sometimes it’s easy to over-parallelise and shoot yourself in the foot. By making use of Broadway, we can easily parallelise while staying in control. This is just a taste of what Broadway offers - I’d encourage you to check out their documentation to learn more about its functionality and use cases.

- Andrew