One time I had a job interview with a live-coding challenge. It went badly, because I was too anxious to do well. It asked a question about optimally allocating test jobs across CI workers. In essence, a bin packing problem.

OK, so, test job allocation.

Suppose you have a set of test jobs and you want to allocate them to a set of workers for execution. What is the optimal way to do this?

At moments like this I wish I had taken the CS class that covers queueing theory and bin packing problems. If you never took that class, there’s no good way to deduce it all from scratch in a random job interview.

Well anyway, that interview went badly. It did not produce any working implementation, and I was obviously not hired. But I went home and wrote the code I wished I had been relaxed enough to write. It compares naive allocation algorithms with the following preconditions:

What counts as optimal?

We want to allocate job load as evenly as possible among workers, such that each worker finishes at the same time. We don’t want to have worker 1 finish first and sit idle, while worker 2 keeps working twice as long.

As the jobs have randomly large durations, there is no absolute guarantee that all workers can finish at the same moment. If you have two workers and two jobs, and job J1 takes 1 minute while J2 takes 3 minutes, then one worker is going to finish 2 minutes sooner than the other, no matter how you allocate them.

But in principle, especially as the individual job durations approach zero relative to the total length of the queue, you can evenly divide the jobs among workers and they will all finish simultaneously. I like to think here of the analogy with dividing a large volume of water into multiple buckets: if we were dividing a continuous volume V liters into N buckets, then each bucket should ideally contain the exact same quantity of water, namely V/N liters.

Of course, jobs do have discrete individual durations, so the analogy with dividing a fluid is inexact. But it still gives us an excellent target to measure against. For any allocation of a finite set of jobs across a given set of workers, we can measure how closely our allocation approaches the ideal case of perfect division across a worker pool.

Say that the total duration of a set of jobs is D seconds and the number of workers is N. Ideally, each worker should finish processing after D/N seconds. Let’s call this the optimal target duration for each worker, or t(optimal) for short.

You can then trivially measure the variance from the optimal outcome for each worker with:

100 * (t(actual) - t(optimal)) / t(optimal)

(Multiply by 100 to get an output in percent, so you can say that a particular worker finished 1% slower than optimal, or 30% faster, etc.)

Some naive allocation algorithms

Let’s suppose that before allocating jobs, we sort the input job set into an list that’s sorted by historical job duration, so we can allocate larger jobs from the tip and small jobs from the tail of the list.

We want to explore algorithms for sorting jobs into N worker queues. For simplicity, let’s stipulate that N must be a power of 2.

I tested some simple allocation algorithms:

  1. Allocate from ends: Go through each worker in round robin form. Push the longest available test job onto each queue. If there are any more test jobs, also push the shortest available test job onto each queue. My inspiration here was about trying to balance out the slowest and fastest jobs across our available workers (so each queue gets part of the short and long tails).
  2. Allocate to one worker until full: Compute t(optimal) for a given set of workers. Allocate jobs to the first worker until it contains >= t(optimal) jobs. Then allocate jobs to the second worker until it reaches or exceeds t(optimal), and so on for N workers. (I believe this is fairly similar to the next fit bin packing algorithm.)
  3. Recursive partitioning: Create two buckets of jobs, B1 and B2. Iterate through all the jobs, assigning the next job to whichever bucket is currently emptier. (This handles the case where you put a large job into B1 and then you put a number of smaller jobs into B2 until they converge.) Then recursively repeat the process, dividing each bucket into 2 more buckets via the same process. Stop when you have allocated jobs into N buckets, equivalent to the available number of workers.
  4. Rebalanced recursive allocation. Follow the process for recursive allocation, but before recursing, also try to rebalance the buckets by swapping an item between queues in such a way as to make the total duration of each buckets closer to each other. Suppose that bucket B1 has duration 150 and bucket B2 has duration 130. We can rebalance them by looking for a job in B1 that has duration (150-130)/2 = 15 and moving it to B2. If we can’t find a job whose duration is exactly 15, we can just use the closest approximation we can find. We would not swap anything if there are no items we can move that improve the balance between buckets.

Intuitively, I expect the recursive algorithms to work better than the round-robin or next-fit approaches.

Measuring the results

As the total duration of all jobs t(total) grows very large compared to the maximum duration of any individual job t(individual_max), we should be able to converge on solutions that approach the optimal (continuous) target t(optimal) mentioned above.

I wrote a ruby script to generate random input sets and test each allocation algorithm against them at various sizes. The results were as follows:

Small scale: 4 workers, 40 jobs
Ran 150 rounds of allocate_from_ends with 40 jobs in 4 workers
      => Duration 3.1ms
      => Mean variance from optimal allocation: 0.912%
      => Max variance from optimal allocation: 4.831%
Ran 150 rounds of allocate_until_full with 40 jobs in 4 workers
      => Duration 3.5ms
      => Mean variance from optimal allocation: 2.237%
      => Max variance from optimal allocation: 7.23%
Ran 150 rounds of recursive_allocation with 40 jobs in 4 workers
      => Duration 4.1ms
      => Mean variance from optimal allocation: 4.999%
      => Max variance from optimal allocation: 9.178%
Ran 150 rounds of optimized_recursive_allocation with 40 jobs in 4 workers
      => Duration 7.8ms
      => Mean variance from optimal allocation: 0.568%
      => Max variance from optimal allocation: 2.807%
===================
Medium scale: 16 workers, 250 jobs
Ran 150 rounds of allocate_from_ends with 250 jobs in 16 workers
      => Duration 17.2ms
      => Mean variance from optimal allocation: 3.921%
      => Max variance from optimal allocation: 11.217%
Ran 150 rounds of allocate_until_full with 250 jobs in 16 workers
      => Duration 17.8ms
      => Mean variance from optimal allocation: 1.274%
      => Max variance from optimal allocation: 4.754%
Ran 150 rounds of recursive_allocation with 250 jobs in 16 workers
      => Duration 23.4ms
      => Mean variance from optimal allocation: 3.223%
      => Max variance from optimal allocation: 5.441%
Ran 150 rounds of optimized_recursive_allocation with 250 jobs in 16 workers
      => Duration 39.3ms
      => Mean variance from optimal allocation: 0.194%
      => Max variance from optimal allocation: 1.078%
===================
Large scale: 32 workers, 20000 jobs
Ran 30 rounds of allocate_from_ends with 20000 jobs in 32 workers
      => Duration 229.9ms
      => Mean variance from optimal allocation: 0.16%
      => Max variance from optimal allocation: 0.175%
Ran 30 rounds of allocate_until_full with 20000 jobs in 32 workers
      => Duration 263.5ms
      => Mean variance from optimal allocation: 0.006%
      => Max variance from optimal allocation: 0.048%
Ran 30 rounds of recursive_allocation with 20000 jobs in 32 workers
      => Duration 342.9ms
      => Mean variance from optimal allocation: 0.039%
      => Max variance from optimal allocation: 0.056%
Ran 30 rounds of optimized_recursive_allocation with 20000 jobs in 32 workers
      => Duration 350.6ms
      => Mean variance from optimal allocation: 0.0%
      => Max variance from optimal allocation: 0.001%

TLDR:

Takeaways

Coda

The experimental code was as follows.


############################
# Experiment - a class that tests a given allocation strategy for a given number of rounds,
# using a specified number of buckets and a specified volume of randomly generated tests
# After the experiment is done, it outputs its results to stdout.
############################
class Experiment
  def initialize(strategy, rounds:, buckets:, tests:)
    total_variance = 0
    max_variance = 0
    start = Time.now

    rounds.times do
      test_mean, test_max = run_test(strategy, buckets, sample(tests))
      total_variance += test_mean
      max_variance = test_max if test_max > max_variance
    end
    duration = (Time.now - start)*1000 # ms
    puts "Ran #{rounds} rounds of #{strategy} with #{tests} tests in #{buckets} buckets"
    puts "      => Duration #{duration.round(1)}ms"
    puts "      => Mean variance from optimal allocation: #{(total_variance/rounds).round(3)}%"
    puts "      => Max variance from optimal allocation: #{max_variance.round(3)}%"
  end

  def run_test(strategy, n, test_files)
    sorted_tests = test_files.to_a.sort_by(&:last)

    # optimally, we should have (sum(total_test_time)/n) in each bucket
    total_time = sorted_tests.map(&:last).inject(&:+)
    target = total_time.to_f / n

    buckets = (1..n).map { Bucket.new(target) }
    full_buckets = 0

    puts "Allocating #{test_files.count} tests into #{n} buckets, optimal bucket size: #{target}" if ENV['VERBOSE']

    buckets = Allocator.new.send strategy, sorted_tests, buckets, target

    # Report results
    puts buckets.map(&:status) if ENV['VERBOSE']

    # Return average variance
    mean_variance = buckets.map {|b| b.variance.abs}.sum / buckets.size
    max_variance = buckets.map {|b| b.variance.abs}.max
    puts "  ... mean variance #{mean_variance}, max variance #{max_variance}" if ENV['VERBOSE']
    # Return mean variance
    [mean_variance, max_variance]
  end

  # Generates a random test data set
  def sample(size)
    (1..size).map { |i| ["test-#{i}.rb".to_sym, Integer(rand * 250) + 1]}.to_h
  end
end

############################
# Allocator - a class implementing several possible algorithms for allocating input
#   items to buckets.
# The inputs to these methods are called `sorted_tests` because the original spec was for
#   each item to represent a test file with an integer duration in seconds, and we expect
#   the Experiment class to start out by sorting the inputs before invoking these methods.
############################
class Allocator
  # Allocate sorted tests into buckets, pulling from the ends of the array at each iteration
  def allocate_from_ends(sorted_tests, buckets, target)
    current_bucket = 0
    while sorted_tests.count > 0
      current = buckets[current_bucket % buckets.size]
      current.add sorted_tests.pop
      current.add(sorted_tests.shift) if sorted_tests.size > 0 # we might run out of tests
      current_bucket += 1
    end
    buckets
  end

  # Allocate sorted tests into buckets, removing buckets from rotation once they reach capacity
  def allocate_until_full(sorted_tests, buckets, target)
    current_bucket = 0
    full_buckets = 0
    while sorted_tests.count > 0 && full_buckets < buckets.count
      current = buckets[current_bucket % buckets.size]
      if current.total > target # don't add to already full buckets
        if !current.full
          current.full = true
          full_buckets += 1

          puts " ... just filled up a bucket: #{current.status}" if ENV['VERBOSE']
        end
      else
        current.add sorted_tests.pop
      end
      current_bucket += 1
    end
    buckets
  end

  # Repeatedly divide the list into two as-close-to-equal-halves as we can
  def recursive_allocation(sorted_tests, buckets, target)
    recursion_levels = Math.log(buckets.count, 2)
    raise "Must pass number of buckets that are a power of 2" unless recursion_levels % 1 == 0

    # Don't reuse the buckets we were given, we'll regenerate them in the recursion process
    buckets.clear
    buckets.concat recursive_list_division(sorted_tests, recursion_levels.to_i, target, optimize: false).flatten
  end

  # Same as recursive_allocation, but also make an effort to balance out imbalances between left and right halves
  def optimized_recursive_allocation(sorted_tests, buckets, target)
    recursion_levels = Math.log(buckets.count, 2)
    raise "Must pass number of buckets that are a power of 2" unless recursion_levels % 1 == 0

    # Don't reuse the buckets we were given, we'll regenerate them in the recursion process
    buckets.clear
    buckets.concat recursive_list_division(sorted_tests, recursion_levels.to_i, target, optimize: true).flatten
  end

  # Actually does the work for the recursive strategy
  def recursive_list_division(items, levels_remaining, target, optimize: false)
    left = Bucket.new(target)
    right = Bucket.new(target)

    # Allocate the next item to whichever list is currently emptier
    while items.size > 0
      if left.total <= right.total
        left.add items.pop
      else
        right.add items.pop
      end
    end
    puts "Balanced two lists: #{left.total} vs #{right.total}" if ENV['VERBOSE']

    # Try to balance the two lists if optimization is enabled
    if optimize
      25.times do
        left, right = recursive_list_balance(left, right)
      end
    end

    if levels_remaining > 1
      [ recursive_list_division(left.items, levels_remaining - 1, target, optimize: optimize),
        recursive_list_division(right.items, levels_remaining - 1, target, optimize: optimize)]
    else
      [left, right]
    end
  end

  # Attempts to balance out gaps between two unequal lists
  def recursive_list_balance(left, right)
    # Compute the gap between left and right -- that's what we hope to improve
    difference = (left.total - right.total).abs
    # Don't bother doing anything if the two lists happen to be identical
    return [left, right] if difference == 0

    # Now search for an element in the larger list
    # that is about half the size of the difference between lists:
    larger = left.total > right.total ? left.items : right.items
    candidate = nil
    for i in (0..larger.size - 1) do
      if larger[i][1] > difference / 2
        candidate = i
        break
      end
    end
    return [left, right] if candidate.nil?

    # Either the candidate item or the item preceding it should be the best item to swap
    if candidate > 0 && ((larger[candidate][1] - difference) > (larger[candidate - 1][1] - difference))
      candidate -= 1
    end

    # Let's only do the swap if it actually improves the balance between buckets
    if larger[candidate][1] < difference
      puts "Balancing lists: moving item of size #{larger[candidate][1]} to balance a gap of size #{difference}" if ENV['VERBOSE']
      if left.total > right.total
        right.add left.remove(candidate)
      else
        left.add right.remove(candidate)
      end
    else
      puts "Best candidate was #{larger[candidate][1]} to balance a gap of size #{difference}, skipping" if ENV['VERBOSE']
    end
    [left, right]
  end
end

############################
# Bucket - a utility class that contains a bucket of items and calculates their total size.
#  Initialize it with a target size, and then it can calculate the
#    difference between its actual total and the optimal target result size.
#    Let's call this difference the `variance`.
#  It implements two list operations, `add` and `remove`.
#  It has a flag called `full` that allocation algorithms might use
#    to keep track of buckets that have exceeded their optimal capacity.
############################
class Bucket
  attr_accessor :total
  attr_accessor :full
  attr_accessor :target
  attr_accessor :items

  def initialize(target=0)
    @total = 0
    @items = []
    @target = target
    @full = false
  end

  def add(item)
    @total += item[1]
    @items << item
  end

  def remove(item_index)
    raise "can't remove invalid item with index #{item_index}, total items #{@items.size}" if @items[item_index].nil?
    item = @items.delete_at(item_index)
    @total -= item[1]
    item
  end

  def status
    "#{variance}% variance, #{@items.size} items, #{@total} total, #{Integer(@total - @target)} over target"
  end

  # how far off we are from the target bucket size, in percent
  def variance
    (((@total - @target).to_f / @target) * 100).round(4)
  end
end



############################
# Imperative script code:
############################

# little wrapper function to kick off each set of experiments
def run_experiments(label, rounds:, buckets:, tests:)
  puts "==================="
  puts "#{label}: #{buckets} buckets, #{tests} tests"
  Experiment.new :allocate_from_ends, rounds: rounds, buckets: buckets, tests: tests
  Experiment.new :allocate_until_full, rounds: rounds, buckets: buckets, tests: tests
  Experiment.new :recursive_allocation, rounds: rounds, buckets: buckets, tests: tests
  Experiment.new :optimized_recursive_allocation, rounds: rounds, buckets: buckets, tests: tests
end

run_experiments 'Two buckets', rounds: 150, buckets: 2, tests: 40
run_experiments 'Small scale', rounds: 150, buckets: 4, tests: 40
run_experiments 'Medium scale', rounds: 150, buckets: 16, tests: 250
run_experiments 'Large scale', rounds: 30, buckets: 32, tests: 20000
# The large scale one is run for fewer rounds because it gets slow

Posted under: programming queues