Hi, I'm Andrew
A developer exploring Canada
Setting Kafka's pace with Broadway
Using Broadway to parallelise and stay in control
Continue...Dressing for a Calgary winter
With temperatures ranging from -30ºC to +10ºC during winter, how you dress will make all the difference to how comfortable you survive ou...
Continue...Setting Kafka's pace with Broadway
Recently a colleague, Matt, and I were faced with a new Kafka experience: roughly a million messages placed on a highly partitioned Kafka topic, and a consumer which does a simple query to the database for each message - what followed was some impressively fast processing, a thrashed database, and some unhappy users. Below is the story of how we wrangled Kafka using Broadway in an environment where we are unable to change Kafka configuration.
KafkaEx Background
We use KafkaEx for consuming the Kafka service in our microservice applications. It’s generally easy to set up, and get going with, and we have few complaints about it.
In order to fully consume messages off a Kafka topic, KafkaEx will create at least one connection per partition. Each connection is in fact a GenConsumer, which is responsible for consuming messages. The implication of this is that if you have an application consuming a topic with 50 partitions, you will automatically end up with a minimum of 50 concurrent consumers. If you naively scale your application horizontally, especially without clustering, you can multiply your instance count by 50 in this case.
Having instances * partitions
consumers per topic isn’t necessarily an issue in and of itself, however the kicker with KafkaEx is that unless you want to manually set up each of your consumers, you’re going to use a ConsumerGroup which doesn’t provide any mechanism for throttling other than by the mechanism of back-pressure.
The flow of data becomes ‘push’ rather than ‘pull’ from the perspective of our application
Recall back to the experience we faced where a million messages were coming through, doing a simple database query each. The result was that instances * consumers
all competing for a database connection over and over caused a huge load on a small database, and prevented APIs from getting access to a connection to the database in a timely fashion.
Cue the next feature
While a one-off (ish) of database starvation isn’t the end of the world for this particular application, we had a feature coming up that needed to increase the complexity of this message processor. Instead of a simple database call, it would have to double the database calls and make an outbound API call to another service.
Having the processing rate unlimited would now be unacceptable as it may interfere with the performance of other systems
At this point I knew we needed to introduce some artificial back-pressure in order to make sure neither our application, nor the external services, were being overwhelmed. KafkaEx will only grab more messages when it thinks it’s finished processing the messages it has in hand already. If we can convince it that it’s not done for a bit longer, we can slow down the process to be controlled.
Another option could be to use broadway_kafka at this point, but we have built some systems around the use of KafkaEx which would make this a non-trivial transition
Enter Broadway
I am a big fan of flow, which builds on top of gen_stage, so I had at least a little awareness of broadway. For those not familiar, Broadway is a library used to “Build concurrent and multi-stage data ingestion and data processing pipelines with Elixir.”
Broadway offers some key functionalities to us that make it perfect for solving our performance problem:
- Back-pressure
- Automatic acknowledgements at the end of the pipeline
- Batching
- Fault tolerance with minimal data loss
- Graceful shutdown
- Built-in testing
- Custom failure handling
- Ordering and partitioning
- Rate-limiting
- Metrics
The components of a broadway pipeline
A Broadway pipeline needs a producer and a consumer. For us, we already produce data through KafkaEx, and we can easily consume that data. The problem with hooking up KafkaEx to Broadway, is that KafkaEx in a ConsumerGroup
configuration moves data through a push mechanism, whereas Broadway works on the principle of downstream stages pulling data.
We can start a simple Broadway pipeline with just the below code, defining the consumer (processor) inline. This is well explained in the Broadway documentation:
defmodule BroadwayPipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: BroadwayPipeline,
producer: [
module: {Producer, []},
concurrency: 1,
],
processors: [
default: [concurrency: 1]
]
)
end
@impl true
def handle_message(processor, message, context) do
IO.inspect("Processing #{inspect(message)}")
# Call the database, call some APIs, etc
message
end
end
The part that’s missing is Producer
; the glue between KafkaEx and Broadway.
The glue
As I mentioned before, we need to turn the data flow from push
to pull
, and we need to force backpressure on the kafka consumer. One way to achieve this is a simple GenServer which queues messages from Kafka, and forces Kafka to wait before putting more messages into the queue.
defmodule BlockingBuffer do
@moduledoc """
A process which holds a queue and blocks when the length of the queue
exceeds a predefined number (100)
"""
use GenServer
@max_buffer 100 # Tweak as required, or move into opts
# GenServer startup
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(__opts) do
{:ok, {:queue.new(), 0}}
end
# Public API
@spec push(any) :: :ok | no_return
def push(message) do
case GenServer.call(__MODULE__, {:push, message}) do
:wait ->
100 # ms - consider tweaking and adding jitter
:timer.sleep(wait_time) # Calling process (KafkaEx consumer) sleeps
throttled_push(message) # Try again!
:pushed ->
:ok
end
end
@spec drain(pos_integer) :: list(any)
def drain(demand) do
GenServer.call(__MODULE__, {:drain, demand})
end
# Message callbacks
@impl true
def handle_call({:push, _message}, _from, {_queue, count} = state) when count > @max_buffer do
{:reply, :wait, state}
end
def handle_call({:push, message}, _from, {queue, count}) do
{:reply, :pushed, {:queue.in(message, queue), count + 1}}
end
def handle_call({:drain, demand}, _from, {queue, count}) when demand > 0 do
{items, queue} =
Enum.reduce((0..demand), {[], queue}, fn _i, {items, queue} ->
case :queue.out(queue) do
{{:value, value}, queue} ->
{[value | items], queue}
{:empty, queue} ->
{items, queue}
end
end)
{:reply, Enum.reverse(items), {queue, count - length(items)}}
end
end
Now we’ve got a process that converts push
to pull
, we can write a producer which simply pulls from this queue when downstream stages ask it for demand.
We also need to store how much demand the producer has been asked for, so that if there are no kafka messages for a time, then some start being received again, we can kickstart the pipeline once more.
defmodule Producer do
use GenStage
@behaviour Broadway.Producer
def init(_opts) do
{:producer, 0}
end
def handle_demand(demand, stored_demand) do
total_demand = demand + stored_demand
messages =
total_demand
|> BlockingBuffer.drain()
|> Enum.map(&%Broadway.Message{data: &1, acknowledger: {Broadway.NoopAcknowledger, nil, nil}})
# ^ Create a broadway message from the kafka message
message_count = length(messages)
# If we've run out of messages, we need to kickstart the pipeline again
# so, try drain again soon
if message_count < total_demand do
Process.send_after(self(), :kick_start, 1_000)
end
{:noreply, messages, total_demand - message_count}
end
def handle_info(:kick_start, stored_demand) do
handle_demand(0, stored_demand)
end
end
This is great - now we can change our Kafka consumer to use BlockingBuffer.push(message)
and update the BroadwayPipeline.handle_message/3
to do the processing of the Kafka message. This will move all the concurrent processing into the Broadway pipeline. As our configuration stands, we’re going to be processing in a single process, at an unlimited rate… but broadway lets us change this easily in configuration. Let’s alter the way we start the broadway pipeline now. Let’s imagine we want to process a million messages a day, and we want to restrict the open database connections per instance to 5 - we can easily configure that like below:
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: BroadwayPipeline,
producer: [
module: {Producer, []},
concurrency: 1,
rate_limiting: [allowed_messages: 11, interval: 1000] # 11/s is just over 950k / day
],
processors: [
default: [concurrency: 5]
]
)
end
Summary
Usually people look to Elixir to parallelise their workload, but sometimes it’s easy to over-parallelise and shoot yourself in the foot. By making use of Broadway, we can easily parallelise while staying in control. This is just a taste of what Broadway offers - I’d encourage you to check out their documentation to learn more about its functionality and use cases.
- Andrew
Using Dataloader.KV to call APIs lazily
Dataloader is a library for loading data in batches. It integrates very well with Absinthe, a GraphQL library for Elixir.
My background with Absinthe & Dataloader
I’ve been working a bit with GraphQL (Absinthe/Elixir) microservices in the past few months. For the most part, those services have been able to exist in their realm and not need to query outside of it. However, from time to time, there’s a need to incorporate a field - an association - within a schema which calls APIs outside of that realm. Sometimes those APIs are friendly to batch queries, and sometimes they’re not. Either way, I’ve found that Dataloader.KV
can be used to effectively manage batching requests to those services.
When I first came across the problem of calling other APIs efficiently in Absinthe, I only knew of Dataloader.KV. I read a bit about the documentation and asked around the community for more information. Oddly, there seemed to be few resources on getting it going in a simple case. I hope this blog can help other people jump-start into using it for the future.
Getting started
Let’s skip ahead to the result first. I find myself making use of Dataloader.KV
in two ways.
Pretend we have a User
, with an association, posts
, which are hosted on an external service we’ll call postal
;
object :user do
field :posts, list(:post), resolve: dataloader(:postal)
end
field :posts, list(:post) do
resolve fn parent, args, %{context: %{loader: loader}} ->
loader
|> Dataloader.load(:postal, :posts, parent)
|> on_load(fn loader ->
loader
|> Dataloader.get(:postal, :posts, parent)
|> do_something_with_the_result()
end)
end
end
For those using Dataloader already, both of those usages should look familiar.
The question now is how to make it work… First, we’re going to create a module that will house the code for calling the external API.
We will need to define a function to give to Dataloader.KV
that will take the association and parent records and resolve the data; dataloader_postal_loader
(arbitrarily named). I, however, factor the actual resolving of data out of this function, and just let it manage the incoming options.
Note: One thing that’s important to getting this right, is that you will receive a list of parent records in the format of a MapSet. Dataloader is expecting you to return a map of these records to a result. The keys of those maps must be exactly as they came; if these records are Ecto structs, and are missing some associations that you need to load before you resolve this external data, then you will need to hold onto the original key alongside your record with loaded associations so you can return it in the result map.
defmodule UserAppWeb.Sources.Postal do
@spec dataloader_postal_loader() :: (({:posts, keyword} | :posts, MapSet.new(User.t)) -> map)
def dataloader_postal_loader do
fn
# Signature for use with `dataloader` helper function
{:posts, _opts}, users ->
load_posts_for_users(users)
# Signature for use with manual dataloading (without opts)
:posts, users ->
load_posts_for_users(users)
end
end
# Example - just call an API which accepts bulk arguments
@spec load_posts_for_users(list(User.t)) :: map
defp load_posts_for_users(users) do
user_ids =
users
|> Stream.map(users, & &1.id)
|> Enum.join(",")
# Perhaps returns JSON map of user_id -> [string]
# Don't hard pattern match {:ok, result} in the real world.
# You could also use Flow to concurrently make external calls for non-bulk APIs
{:ok, result} = HTTPoison.get("http://example.com/posts?user_ids=#{user_ids}")
# Wrapping result in ok tuple will result in `nil` being returned for the field
# if no result was found for the user.
users
|> Stream.map(fn original_user -> {original_user, {:ok, Map.get(result, original_user.id)}} end)
|> Map.new()
end
end
You will need to add a dataloader source, finally, to make use of this functionality. Probably somewhere like a main schema.ex
def context(ctx) do
postal_loader =
UserAppWeb.Sources.Postal.dataloader_postal_loader()
|> Dataloader.KV.new()
loader =
Dataloader.new()
|> Dataloader.add_source(..., ...) # Your other sources
|> Dataloader.add_source(:postal, postal_loader)
Map.put(ctx, :loader, loader)
end
You should now be able to make GraphQL queries which lazily call the external API.
Further considerations
The above example touches on the very simple case. There are some issues with managing errors from your external APIs. Instead of returning {:ok, value}
or {:ok, nil}
for every result, you can in fact return an error tuple, however you will need to specially manage this in your field resolver. I found that I couldn’t get the ok or error without first changing the dataloader setting of get_policy
to :tuples
(see dataloader options). This looks a bit like;
field :posts, list(:post) do
resolve fn parent, args, %{context: %{loader: loader}} ->
loader
|> Dataloader.load(:postal, :posts, parent)
|> on_load(fn loader ->
loader
|> Map.put(:options, Keyword.put(loader.options, :get_policy, :tuples)) # So that we can get errors out
|> Dataloader.get(:postal, :posts, parent)
|> case do
{:ok, {:ok, _value} = success} ->
success
{:ok, nil} ->
{:ok, nil}
{:error, _error} = error ->
error
end
end)
end
end
If you have any better ways of managing that issue, I’d love to hear them - please get in touch!
- Andrew
Better Phoenix APIs - featuring Ecto, Absinthe & GraphQL - Part 1
At Abletech, I’ve been using Elixir full time for almost 9 months. During that time I have authored or been involved in more than 10 separate codebases, mostly using Phoenix to serve JSON APIs. I have come away from those codebases with a few opinions around how best to manage data. Full disclosure; I’m not an expert, and much of what I’ve done has been driven through conversations with talented developers in the Elixir community.
Part 1 doesn’t get into the code specifics of using Absinthe or GraphQL. Stay tuned for that
Associations, scoping, consistency and boilerplate
One of the best of ways of getting an application going quickly with Phoenix and Ecto is to use the built in phx
generators. These are useful for basic CRUD, and will usually work well while you’ve got only a few schemas involved in your applications.
However, complexity increases very quickly when you add authentication, multiple assocations and scoping. In particular, when different scenarios require different associations to be preloaded and scoped.
My first low-tech solution for this is based off of the recommended usage for dataloader
, a package I’ll get into more later.
Let’s take a look at some code and how it might progress naturally in a simple application. Consider listing users and their posts for a simple blog.
- The generated boilerplate
@spec list_users() :: list(User)
def list_users do
Repo.all(User)
end
This works perfectly for a simple concept.
- The associated schema
If we add in posts, we can simply load them for each user by preloading like so.
@spec list_users() :: list(User)
def list_users do
User
|> Repo.all()
|> Repo.preload([:posts])
end
However, chances are we don’t always want to preload the posts each time we load users, so we add a second function or clause;
@spec list_users() :: list(User)
def list_users do
Repo.all(User)
end
@spec list_users_with_posts() :: list(User)
def list_users_with_posts do
User
|> Repo.all()
|> Repo.preload([:posts])
end
This works, but is not sustainable if we start adding other associations
- The preload list
That brings us to adding a list with the associations we want to preload.
def list_users(preloads \\ []) when is_list(preloads) do
User
|> Repo.all()
|> Repo.preload(preloads)
end
Which works well until you need to do a search for users’ names. Perhaps in this scenario, you don’t want to preload associations - or only preload a subset of them.
- Keyword opts
First a totally naive implementation
def list_users(opts \\ []) do
users = User
users = if Keyword.has_key?(opts, :name) do
name = opts[:name]
if is_nil(name) do
from u in users, where: is_nil(u.name)
else
from u in users, where: u.name == ^name
end
else
users
end
users = if Keyword.has_key?(opts, :preloads) do
preloads = opts[:preloads]
from u in users, preload: ^preloads
else
users
end
Repo.all(users)
end
This is obviously not a good functional approach to the problem. Since we’re always building on the queryable depending on the next opt, we can simplify this using Enum.reduce
:
def list_users(opts) do
users = Enum.reduce(opts, User, fn
{:name, nil}, users ->
from u in users, where: is_nil(u.name)
{:name, name}, users when is_binary(name) ->
from u in users, where: u.name == name
{:preloads, preloads}, users ->
from u in users, preload: ^preloads
end)
Repo.all(users)
end
Bam! This is much more elegant and takes a functional approach. This isn’t something that came to me intuitively - this is a tip I received from @dpehrson on the Elixir Slack. This tip has helped pave the way to how I currently handle ecto queries. More on that later.
At some point, it’s likely we want to apply these filters and options to a query where we want only one result. Something like a get_user
function. Let’s extract out all that option handling and let list_users
and get_user
handle their real responsibilities.
query/2
Introducing the query/2
function. Totally derived from how dataloader
is used - it did not make sense to me at first. However, its value is clear after going through this process once or twice.
@spec query(Ecto.Queryable.t, keyword) :: Ecto.Queryable.t
def query(queryable, opts \\ [])
def query(User, opts) do
Enum.reduce(opts, User, fn
{:name, nil}, users ->
from u in users, where: is_nil(u.name)
{:name, name}, users when is_binary(name) ->
from u in users, where: u.name == name
{:preloads, preloads}, users ->
from u in users, preload: ^preloads
end)
end
@spec list_users(keyword) :: {:ok, list(User)}
def list_users(opts \\ []) do
users =
User
|> query(opts)
|> Repo.all()
{:ok, users}
end
@spec get_user(keyword) :: {:ok, User} | {:error, {:not_found, {:user, keyword}}}
def get_user(opts \\ []) do
user =
User
|> query(opts)
|> Repo.one()
case user do
%User{} = user ->
{:ok, user}
nil ->
{:error, {:not_found, {:user, opts}}}
end
end
Now we’ve factored the option handling out so that we can easily apply arbitrary filtering, pagination, scoping or otherwise in a single location. I have found this to be considerably more flexible and easy to handle, and is worth the small extra work to me as soon as one association is introduced. Not only that, but it works just fine with multiple schemas handled in a same context by pattern matching on the queryable
argument in query/2
In part 2, we’ll introduce authorisation to the equation, and start discussing how Absinthe can help us solve some issues.
- Andrew
I'm a Kiwi-Canadian taking a working holiday in Calgary, Canada.
As a developer, I mainly work with backend technologies such as Ruby and Elixir. I have a weak spot for Elixir right now, and jump at the chance to use it. I've got some experience with React.js and Angular 2 (4+) but don't usually have a need to get into that realm.
Contact me at
andrew
pett
nz