Hi, I'm Andrew
A developer exploring Canada

Code
Setting Kafka's pace with Broadway
2021/05/12

Using Broadway to parallelise and stay in control

Continue...
Travel
Dressing for a Calgary winter
2018/02/09

With temperatures ranging from -30ºC to +10ºC during winter, how you dress will make all the difference to how comfortable you survive ou...

Continue...
test

Setting Kafka's pace with Broadway

2021/05/12 - Code

Recently a colleague, Matt, and I were faced with a new Kafka experience: roughly a million messages placed on a highly partitioned Kafka topic, and a consumer which does a simple query to the database for each message - what followed was some impressively fast processing, a thrashed database, and some unhappy users. Below is the story of how we wrangled Kafka using Broadway in an environment where we are unable to change Kafka configuration.

KafkaEx Background

We use KafkaEx for consuming the Kafka service in our microservice applications. It’s generally easy to set up, and get going with, and we have few complaints about it.

In order to fully consume messages off a Kafka topic, KafkaEx will create at least one connection per partition. Each connection is in fact a GenConsumer, which is responsible for consuming messages. The implication of this is that if you have an application consuming a topic with 50 partitions, you will automatically end up with a minimum of 50 concurrent consumers. If you naively scale your application horizontally, especially without clustering, you can multiply your instance count by 50 in this case.

Having instances * partitions consumers per topic isn’t necessarily an issue in and of itself, however the kicker with KafkaEx is that unless you want to manually set up each of your consumers, you’re going to use a ConsumerGroup which doesn’t provide any mechanism for throttling other than by the mechanism of back-pressure.

The flow of data becomes ‘push’ rather than ‘pull’ from the perspective of our application Recall back to the experience we faced where a million messages were coming through, doing a simple database query each. The result was that instances * consumers all competing for a database connection over and over caused a huge load on a small database, and prevented APIs from getting access to a connection to the database in a timely fashion.

Cue the next feature

While a one-off (ish) of database starvation isn’t the end of the world for this particular application, we had a feature coming up that needed to increase the complexity of this message processor. Instead of a simple database call, it would have to double the database calls and make an outbound API call to another service.

Having the processing rate unlimited would now be unacceptable as it may interfere with the performance of other systems

At this point I knew we needed to introduce some artificial back-pressure in order to make sure neither our application, nor the external services, were being overwhelmed. KafkaEx will only grab more messages when it thinks it’s finished processing the messages it has in hand already. If we can convince it that it’s not done for a bit longer, we can slow down the process to be controlled.

Another option could be to use broadway_kafka at this point, but we have built some systems around the use of KafkaEx which would make this a non-trivial transition

Enter Broadway

I am a big fan of flow, which builds on top of gen_stage, so I had at least a little awareness of broadway. For those not familiar, Broadway is a library used to “Build concurrent and multi-stage data ingestion and data processing pipelines with Elixir.”

Broadway offers some key functionalities to us that make it perfect for solving our performance problem:

  • Back-pressure
  • Automatic acknowledgements at the end of the pipeline
  • Batching
  • Fault tolerance with minimal data loss
  • Graceful shutdown
  • Built-in testing
  • Custom failure handling
  • Ordering and partitioning
  • Rate-limiting
  • Metrics

The components of a broadway pipeline

A Broadway pipeline needs a producer and a consumer. For us, we already produce data through KafkaEx, and we can easily consume that data. The problem with hooking up KafkaEx to Broadway, is that KafkaEx in a ConsumerGroup configuration moves data through a push mechanism, whereas Broadway works on the principle of downstream stages pulling data.

We can start a simple Broadway pipeline with just the below code, defining the consumer (processor) inline. This is well explained in the Broadway documentation:

defmodule BroadwayPipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: BroadwayPipeline,
      producer: [
        module: {Producer, []},
        concurrency: 1,
      ],
      processors: [
        default: [concurrency: 1]
      ]
    )
  end

  @impl true
  def handle_message(processor, message, context) do
    IO.inspect("Processing #{inspect(message)}")
    
    # Call the database, call some APIs, etc
    
    message
  end
end

The part that’s missing is Producer; the glue between KafkaEx and Broadway.

The glue

As I mentioned before, we need to turn the data flow from push to pull, and we need to force backpressure on the kafka consumer. One way to achieve this is a simple GenServer which queues messages from Kafka, and forces Kafka to wait before putting more messages into the queue.

defmodule BlockingBuffer do
  @moduledoc """
  A process which holds a queue and blocks when the length of the queue
  exceeds a predefined number (100)
  """

  use GenServer
  
  @max_buffer 100 # Tweak as required, or move into opts

  # GenServer startup
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(__opts) do
    {:ok, {:queue.new(), 0}}
  end

  # Public API
  @spec push(any) :: :ok | no_return
  def push(message) do
    case GenServer.call(__MODULE__, {:push, message}) do
      :wait ->
        100 # ms - consider tweaking and adding jitter
        :timer.sleep(wait_time) # Calling process (KafkaEx consumer) sleeps
        throttled_push(message) # Try again!

      :pushed ->
        :ok
    end
  end

  @spec drain(pos_integer) :: list(any) 
  def drain(demand) do
    GenServer.call(__MODULE__, {:drain, demand})
  end

  # Message callbacks
  @impl true
  def handle_call({:push, _message}, _from, {_queue, count} = state) when count > @max_buffer do
    {:reply, :wait, state}
  end

  def handle_call({:push, message}, _from, {queue, count}) do
    {:reply, :pushed, {:queue.in(message, queue), count + 1}}
  end

  def handle_call({:drain, demand}, _from, {queue, count}) when demand > 0 do
    {items, queue} =
      Enum.reduce((0..demand), {[], queue}, fn _i, {items, queue} ->
        case :queue.out(queue) do
          {{:value, value}, queue} ->
            {[value | items], queue}

          {:empty, queue} ->
            {items, queue}
        end
      end)

    {:reply, Enum.reverse(items), {queue, count - length(items)}}
  end
end

Now we’ve got a process that converts push to pull, we can write a producer which simply pulls from this queue when downstream stages ask it for demand. We also need to store how much demand the producer has been asked for, so that if there are no kafka messages for a time, then some start being received again, we can kickstart the pipeline once more.

defmodule Producer do
  use GenStage

  @behaviour Broadway.Producer

  def init(_opts) do
    {:producer, 0}
  end

  def handle_demand(demand, stored_demand) do
    total_demand = demand + stored_demand

    messages =
      total_demand
      |> BlockingBuffer.drain()
      |> Enum.map(&%Broadway.Message{data: &1, acknowledger: {Broadway.NoopAcknowledger, nil, nil}})
    # ^ Create a broadway message from the kafka message
      
    message_count = length(messages)

    # If we've run out of messages, we need to kickstart the pipeline again
    # so, try drain again soon
    if message_count < total_demand do
      Process.send_after(self(), :kick_start, 1_000)
    end

    {:noreply, messages, total_demand - message_count}
  end
  
  def handle_info(:kick_start, stored_demand) do
    handle_demand(0, stored_demand)
  end
end

This is great - now we can change our Kafka consumer to use BlockingBuffer.push(message) and update the BroadwayPipeline.handle_message/3 to do the processing of the Kafka message. This will move all the concurrent processing into the Broadway pipeline. As our configuration stands, we’re going to be processing in a single process, at an unlimited rate… but broadway lets us change this easily in configuration. Let’s alter the way we start the broadway pipeline now. Let’s imagine we want to process a million messages a day, and we want to restrict the open database connections per instance to 5 - we can easily configure that like below:

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: BroadwayPipeline,
      producer: [
        module: {Producer, []},
        concurrency: 1,
        rate_limiting: [allowed_messages: 11, interval: 1000] # 11/s is just over 950k / day
      ],
      processors: [
        default: [concurrency: 5]
      ]
    )
  end

Summary

Usually people look to Elixir to parallelise their workload, but sometimes it’s easy to over-parallelise and shoot yourself in the foot. By making use of Broadway, we can easily parallelise while staying in control. This is just a taste of what Broadway offers - I’d encourage you to check out their documentation to learn more about its functionality and use cases.

- Andrew

Using Dataloader.KV to call APIs lazily

2019/11/04 - Code

Dataloader is a library for loading data in batches. It integrates very well with Absinthe, a GraphQL library for Elixir.

My background with Absinthe & Dataloader

I’ve been working a bit with GraphQL (Absinthe/Elixir) microservices in the past few months. For the most part, those services have been able to exist in their realm and not need to query outside of it. However, from time to time, there’s a need to incorporate a field - an association - within a schema which calls APIs outside of that realm. Sometimes those APIs are friendly to batch queries, and sometimes they’re not. Either way, I’ve found that Dataloader.KV can be used to effectively manage batching requests to those services.

When I first came across the problem of calling other APIs efficiently in Absinthe, I only knew of Dataloader.KV. I read a bit about the documentation and asked around the community for more information. Oddly, there seemed to be few resources on getting it going in a simple case. I hope this blog can help other people jump-start into using it for the future.

Getting started

Let’s skip ahead to the result first. I find myself making use of Dataloader.KV in two ways.

Pretend we have a User, with an association, posts, which are hosted on an external service we’ll call postal;

object :user do
  field :posts, list(:post), resolve: dataloader(:postal)
end
field :posts, list(:post) do
  resolve fn parent, args, %{context: %{loader: loader}} ->
    loader
    |> Dataloader.load(:postal, :posts, parent)
    |> on_load(fn loader ->
      loader
      |> Dataloader.get(:postal, :posts, parent)
      |> do_something_with_the_result()
    end)
  end
end

For those using Dataloader already, both of those usages should look familiar.

The question now is how to make it work… First, we’re going to create a module that will house the code for calling the external API.

We will need to define a function to give to Dataloader.KV that will take the association and parent records and resolve the data; dataloader_postal_loader (arbitrarily named). I, however, factor the actual resolving of data out of this function, and just let it manage the incoming options.

Note: One thing that’s important to getting this right, is that you will receive a list of parent records in the format of a MapSet. Dataloader is expecting you to return a map of these records to a result. The keys of those maps must be exactly as they came; if these records are Ecto structs, and are missing some associations that you need to load before you resolve this external data, then you will need to hold onto the original key alongside your record with loaded associations so you can return it in the result map.

defmodule UserAppWeb.Sources.Postal do
  @spec dataloader_postal_loader() :: (({:posts, keyword} | :posts, MapSet.new(User.t)) -> map)
  def dataloader_postal_loader do
    fn
      # Signature for use with `dataloader` helper function
      {:posts, _opts}, users ->
        load_posts_for_users(users)
        
      # Signature for use with manual dataloading (without opts)
      :posts, users ->
        load_posts_for_users(users)
    end
  end
  
  # Example - just call an API which accepts bulk arguments
  @spec load_posts_for_users(list(User.t)) :: map
  defp load_posts_for_users(users) do
    user_ids = 
      users
      |> Stream.map(users, & &1.id)
      |> Enum.join(",")
    
    # Perhaps returns JSON map of user_id -> [string]
    # Don't hard pattern match {:ok, result} in the real world.
    # You could also use Flow to concurrently make external calls for non-bulk APIs
    {:ok, result} = HTTPoison.get("http://example.com/posts?user_ids=#{user_ids}")
    
    # Wrapping result in ok tuple will result in `nil` being returned for the field
    # if no result was found for the user.
    users
    |> Stream.map(fn original_user -> {original_user, {:ok, Map.get(result, original_user.id)}} end)
    |> Map.new()
  end
end

You will need to add a dataloader source, finally, to make use of this functionality. Probably somewhere like a main schema.ex

def context(ctx) do
  postal_loader =
    UserAppWeb.Sources.Postal.dataloader_postal_loader()
    |> Dataloader.KV.new()

  loader =
    Dataloader.new()
    |> Dataloader.add_source(..., ...) # Your other sources
    |> Dataloader.add_source(:postal, postal_loader)

  Map.put(ctx, :loader, loader)
end

You should now be able to make GraphQL queries which lazily call the external API.

Further considerations

The above example touches on the very simple case. There are some issues with managing errors from your external APIs. Instead of returning {:ok, value} or {:ok, nil} for every result, you can in fact return an error tuple, however you will need to specially manage this in your field resolver. I found that I couldn’t get the ok or error without first changing the dataloader setting of get_policy to :tuples (see dataloader options). This looks a bit like;

field :posts, list(:post) do
  resolve fn parent, args, %{context: %{loader: loader}} ->
    loader
    |> Dataloader.load(:postal, :posts, parent)
    |> on_load(fn loader ->
      loader
      |> Map.put(:options, Keyword.put(loader.options, :get_policy, :tuples)) # So that we can get errors out
      |> Dataloader.get(:postal, :posts, parent)
      |> case do
        {:ok, {:ok, _value} = success} ->
          success
        {:ok, nil} ->
          {:ok, nil}
        {:error, _error} = error ->
          error
      end
    end)
  end
end

If you have any better ways of managing that issue, I’d love to hear them - please get in touch!

- Andrew

Better Phoenix APIs - featuring Ecto, Absinthe & GraphQL - Part 1

2018/10/12 - Code

At Abletech, I’ve been using Elixir full time for almost 9 months. During that time I have authored or been involved in more than 10 separate codebases, mostly using Phoenix to serve JSON APIs. I have come away from those codebases with a few opinions around how best to manage data. Full disclosure; I’m not an expert, and much of what I’ve done has been driven through conversations with talented developers in the Elixir community.

Part 1 doesn’t get into the code specifics of using Absinthe or GraphQL. Stay tuned for that

Associations, scoping, consistency and boilerplate

One of the best of ways of getting an application going quickly with Phoenix and Ecto is to use the built in phx generators. These are useful for basic CRUD, and will usually work well while you’ve got only a few schemas involved in your applications.

However, complexity increases very quickly when you add authentication, multiple assocations and scoping. In particular, when different scenarios require different associations to be preloaded and scoped.

My first low-tech solution for this is based off of the recommended usage for dataloader, a package I’ll get into more later.

Let’s take a look at some code and how it might progress naturally in a simple application. Consider listing users and their posts for a simple blog.

  1. The generated boilerplate
@spec list_users() :: list(User)
def list_users do
  Repo.all(User)
end

This works perfectly for a simple concept.

  1. The associated schema

If we add in posts, we can simply load them for each user by preloading like so.

@spec list_users() :: list(User)
def list_users do
  User
  |> Repo.all()
  |> Repo.preload([:posts])
end

However, chances are we don’t always want to preload the posts each time we load users, so we add a second function or clause;

@spec list_users() :: list(User)
def list_users do
  Repo.all(User)
end

@spec list_users_with_posts() :: list(User)
def list_users_with_posts do
  User
  |> Repo.all()
  |> Repo.preload([:posts])
end

This works, but is not sustainable if we start adding other associations

  1. The preload list

That brings us to adding a list with the associations we want to preload.

def list_users(preloads \\ []) when is_list(preloads) do
  User
  |> Repo.all()
  |> Repo.preload(preloads)
end

Which works well until you need to do a search for users’ names. Perhaps in this scenario, you don’t want to preload associations - or only preload a subset of them.

  1. Keyword opts

First a totally naive implementation

def list_users(opts \\ []) do
  users = User
  users = if Keyword.has_key?(opts, :name) do
    name = opts[:name]

    if is_nil(name) do
      from u in users, where: is_nil(u.name)
    else
      from u in users, where: u.name == ^name
    end
  else
    users
  end
  
  users = if Keyword.has_key?(opts, :preloads) do
    preloads = opts[:preloads]
    from u in users, preload: ^preloads
  else
    users
  end
  
  Repo.all(users)
end

This is obviously not a good functional approach to the problem. Since we’re always building on the queryable depending on the next opt, we can simplify this using Enum.reduce:

def list_users(opts) do
  users = Enum.reduce(opts, User, fn
    {:name, nil}, users ->
      from u in users, where: is_nil(u.name)
    {:name, name}, users when is_binary(name) ->
      from u in users, where: u.name == name
    {:preloads, preloads}, users ->
      from u in users, preload: ^preloads
  end)
  
  Repo.all(users)
end

Bam! This is much more elegant and takes a functional approach. This isn’t something that came to me intuitively - this is a tip I received from @dpehrson on the Elixir Slack. This tip has helped pave the way to how I currently handle ecto queries. More on that later.

At some point, it’s likely we want to apply these filters and options to a query where we want only one result. Something like a get_user function. Let’s extract out all that option handling and let list_users and get_user handle their real responsibilities.

  1. query/2

Introducing the query/2 function. Totally derived from how dataloader is used - it did not make sense to me at first. However, its value is clear after going through this process once or twice.

@spec query(Ecto.Queryable.t, keyword) :: Ecto.Queryable.t
def query(queryable, opts \\ [])
def query(User, opts) do
  Enum.reduce(opts, User, fn
    {:name, nil}, users ->
      from u in users, where: is_nil(u.name)
    {:name, name}, users when is_binary(name) ->
      from u in users, where: u.name == name
    {:preloads, preloads}, users ->
      from u in users, preload: ^preloads
  end)
end

@spec list_users(keyword) :: {:ok, list(User)}
def list_users(opts \\ []) do
  users =
    User
    |> query(opts)
    |> Repo.all()
    
  {:ok, users}
end

@spec get_user(keyword) :: {:ok, User} | {:error, {:not_found, {:user, keyword}}}
def get_user(opts \\ []) do
  user =
    User
    |> query(opts)
    |> Repo.one()
    
  case user do
    %User{} = user ->
      {:ok, user}
    nil ->
      {:error, {:not_found, {:user, opts}}}
  end
end

Now we’ve factored the option handling out so that we can easily apply arbitrary filtering, pagination, scoping or otherwise in a single location. I have found this to be considerably more flexible and easy to handle, and is worth the small extra work to me as soon as one association is introduced. Not only that, but it works just fine with multiple schemas handled in a same context by pattern matching on the queryable argument in query/2

In part 2, we’ll introduce authorisation to the equation, and start discussing how Absinthe can help us solve some issues.

- Andrew

About

I'm a Kiwi-Canadian taking a working holiday in Calgary, Canada.

As a developer, I mainly work with backend technologies such as Ruby and Elixir. I have a weak spot for Elixir right now, and jump at the chance to use it. I've got some experience with React.js and Angular 2 (4+) but don't usually have a need to get into that realm.

Contact me at
andrew pett nz

Recent Posts