Setting Kafka's pace with Broadway
Recently a colleague, Matt, and I were faced with a new Kafka experience: roughly a million messages placed on a highly partitioned Kafka topic, and a consumer which does a simple query to the database for each message - what followed was some impressively fast processing, a thrashed database, and some unhappy users. Below is the story of how we wrangled Kafka using Broadway in an environment where we are unable to change Kafka configuration.
KafkaEx Background
We use KafkaEx for consuming the Kafka service in our microservice applications. It’s generally easy to set up, and get going with, and we have few complaints about it.
In order to fully consume messages off a Kafka topic, KafkaEx will create at least one connection per partition. Each connection is in fact a GenConsumer, which is responsible for consuming messages. The implication of this is that if you have an application consuming a topic with 50 partitions, you will automatically end up with a minimum of 50 concurrent consumers. If you naively scale your application horizontally, especially without clustering, you can multiply your instance count by 50 in this case.
Having instances * partitions
consumers per topic isn’t necessarily an issue in and of itself, however the kicker with KafkaEx is that unless you want to manually set up each of your consumers, you’re going to use a ConsumerGroup which doesn’t provide any mechanism for throttling other than by the mechanism of back-pressure.
The flow of data becomes ‘push’ rather than ‘pull’ from the perspective of our application
Recall back to the experience we faced where a million messages were coming through, doing a simple database query each. The result was that instances * consumers
all competing for a database connection over and over caused a huge load on a small database, and prevented APIs from getting access to a connection to the database in a timely fashion.
Cue the next feature
While a one-off (ish) of database starvation isn’t the end of the world for this particular application, we had a feature coming up that needed to increase the complexity of this message processor. Instead of a simple database call, it would have to double the database calls and make an outbound API call to another service.
Having the processing rate unlimited would now be unacceptable as it may interfere with the performance of other systems
At this point I knew we needed to introduce some artificial back-pressure in order to make sure neither our application, nor the external services, were being overwhelmed. KafkaEx will only grab more messages when it thinks it’s finished processing the messages it has in hand already. If we can convince it that it’s not done for a bit longer, we can slow down the process to be controlled.
Another option could be to use broadway_kafka at this point, but we have built some systems around the use of KafkaEx which would make this a non-trivial transition
Enter Broadway
I am a big fan of flow, which builds on top of gen_stage, so I had at least a little awareness of broadway. For those not familiar, Broadway is a library used to “Build concurrent and multi-stage data ingestion and data processing pipelines with Elixir.”
Broadway offers some key functionalities to us that make it perfect for solving our performance problem:
- Back-pressure
- Automatic acknowledgements at the end of the pipeline
- Batching
- Fault tolerance with minimal data loss
- Graceful shutdown
- Built-in testing
- Custom failure handling
- Ordering and partitioning
- Rate-limiting
- Metrics
The components of a broadway pipeline
A Broadway pipeline needs a producer and a consumer. For us, we already produce data through KafkaEx, and we can easily consume that data. The problem with hooking up KafkaEx to Broadway, is that KafkaEx in a ConsumerGroup
configuration moves data through a push mechanism, whereas Broadway works on the principle of downstream stages pulling data.
We can start a simple Broadway pipeline with just the below code, defining the consumer (processor) inline. This is well explained in the Broadway documentation:
defmodule BroadwayPipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: BroadwayPipeline,
producer: [
module: {Producer, []},
concurrency: 1,
],
processors: [
default: [concurrency: 1]
]
)
end
@impl true
def handle_message(processor, message, context) do
IO.inspect("Processing #{inspect(message)}")
# Call the database, call some APIs, etc
message
end
end
The part that’s missing is Producer
; the glue between KafkaEx and Broadway.
The glue
As I mentioned before, we need to turn the data flow from push
to pull
, and we need to force backpressure on the kafka consumer. One way to achieve this is a simple GenServer which queues messages from Kafka, and forces Kafka to wait before putting more messages into the queue.
defmodule BlockingBuffer do
@moduledoc """
A process which holds a queue and blocks when the length of the queue
exceeds a predefined number (100)
"""
use GenServer
@max_buffer 100 # Tweak as required, or move into opts
# GenServer startup
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(__opts) do
{:ok, {:queue.new(), 0}}
end
# Public API
@spec push(any) :: :ok | no_return
def push(message) do
case GenServer.call(__MODULE__, {:push, message}) do
:wait ->
100 # ms - consider tweaking and adding jitter
:timer.sleep(wait_time) # Calling process (KafkaEx consumer) sleeps
throttled_push(message) # Try again!
:pushed ->
:ok
end
end
@spec drain(pos_integer) :: list(any)
def drain(demand) do
GenServer.call(__MODULE__, {:drain, demand})
end
# Message callbacks
@impl true
def handle_call({:push, _message}, _from, {_queue, count} = state) when count > @max_buffer do
{:reply, :wait, state}
end
def handle_call({:push, message}, _from, {queue, count}) do
{:reply, :pushed, {:queue.in(message, queue), count + 1}}
end
def handle_call({:drain, demand}, _from, {queue, count}) when demand > 0 do
{items, queue} =
Enum.reduce((0..demand), {[], queue}, fn _i, {items, queue} ->
case :queue.out(queue) do
{{:value, value}, queue} ->
{[value | items], queue}
{:empty, queue} ->
{items, queue}
end
end)
{:reply, Enum.reverse(items), {queue, count - length(items)}}
end
end
Now we’ve got a process that converts push
to pull
, we can write a producer which simply pulls from this queue when downstream stages ask it for demand.
We also need to store how much demand the producer has been asked for, so that if there are no kafka messages for a time, then some start being received again, we can kickstart the pipeline once more.
defmodule Producer do
use GenStage
@behaviour Broadway.Producer
def init(_opts) do
{:producer, 0}
end
def handle_demand(demand, stored_demand) do
total_demand = demand + stored_demand
messages =
total_demand
|> BlockingBuffer.drain()
|> Enum.map(&%Broadway.Message{data: &1, acknowledger: {Broadway.NoopAcknowledger, nil, nil}})
# ^ Create a broadway message from the kafka message
message_count = length(messages)
# If we've run out of messages, we need to kickstart the pipeline again
# so, try drain again soon
if message_count < total_demand do
Process.send_after(self(), :kick_start, 1_000)
end
{:noreply, messages, total_demand - message_count}
end
def handle_info(:kick_start, stored_demand) do
handle_demand(0, stored_demand)
end
end
This is great - now we can change our Kafka consumer to use BlockingBuffer.push(message)
and update the BroadwayPipeline.handle_message/3
to do the processing of the Kafka message. This will move all the concurrent processing into the Broadway pipeline. As our configuration stands, we’re going to be processing in a single process, at an unlimited rate… but broadway lets us change this easily in configuration. Let’s alter the way we start the broadway pipeline now. Let’s imagine we want to process a million messages a day, and we want to restrict the open database connections per instance to 5 - we can easily configure that like below:
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: BroadwayPipeline,
producer: [
module: {Producer, []},
concurrency: 1,
rate_limiting: [allowed_messages: 11, interval: 1000] # 11/s is just over 950k / day
],
processors: [
default: [concurrency: 5]
]
)
end
Summary
Usually people look to Elixir to parallelise their workload, but sometimes it’s easy to over-parallelise and shoot yourself in the foot. By making use of Broadway, we can easily parallelise while staying in control. This is just a taste of what Broadway offers - I’d encourage you to check out their documentation to learn more about its functionality and use cases.
- Andrew