One time I had a job interview with a live-coding challenge. It went badly, because I was too anxious to do well. It asked a question about optimally allocating test jobs across CI workers. In essence, a bin packing problem.
OK, so, test job allocation.
Suppose you have a set of test jobs and you want to allocate them to a set of workers for execution. What is the optimal way to do this?
At moments like this I wish I had taken the CS class that covers queueing theory and bin packing problems. If you never took that class, there’s no good way to deduce it all from scratch in a random job interview.
Well anyway, that interview went badly. It did not produce any working implementation, and I was obviously not hired. But I went home and wrote the code I wished I had been relaxed enough to write. It compares naive allocation algorithms with the following preconditions:
- We know the runtime duration of each test file at the start. (Obviously not true in practice in any significant CI system, but we can probably use expected duration or historical average duration as a proxy here.)
- The durations range from 1-251 seconds.
What counts as optimal?
We want to allocate job load as evenly as possible among workers, such that each worker finishes at the same time. We don’t want to have worker 1 finish first and sit idle, while worker 2 keeps working twice as long.
As the jobs have randomly large durations, there is no absolute guarantee that all workers can finish at the same moment. If you have two workers and two jobs, and job J1 takes 1 minute while J2 takes 3 minutes, then one worker is going to finish 2 minutes sooner than the other, no matter how you allocate them.
But in principle, especially as the individual job durations approach zero relative to the total length of the queue, you can evenly divide the jobs among workers and they will all finish simultaneously. I like to think here of the analogy with dividing a large volume of water into multiple buckets: if we were dividing a continuous volume V liters into N buckets, then each bucket should ideally contain the exact same quantity of water, namely V/N liters.
Of course, jobs do have discrete individual durations, so the analogy with dividing a fluid is inexact. But it still gives us an excellent target to measure against. For any allocation of a finite set of jobs across a given set of workers, we can measure how closely our allocation approaches the ideal case of perfect division across a worker pool.
Say that the total duration of a set of jobs is D seconds and the number of workers is N. Ideally, each worker should finish processing after D/N seconds. Let’s call this the optimal target duration for each worker, or t(optimal) for short.
You can then trivially measure the variance from the optimal outcome for each worker with:
100 * (t(actual) - t(optimal)) / t(optimal)
(Multiply by 100 to get an output in percent, so you can say that a particular worker finished 1% slower than optimal, or 30% faster, etc.)
Some naive allocation algorithms
Let’s suppose that before allocating jobs, we sort the input job set into an list that’s sorted by historical job duration, so we can allocate larger jobs from the tip and small jobs from the tail of the list.
We want to explore algorithms for sorting jobs into N worker queues. For simplicity, let’s stipulate that N must be a power of 2.
I tested some simple allocation algorithms:
- Allocate from ends: Go through each worker in round robin form. Push the longest available test job onto each queue. If there are any more test jobs, also push the shortest available test job onto each queue. My inspiration here was about trying to balance out the slowest and fastest jobs across our available workers (so each queue gets part of the short and long tails).
- Allocate to one worker until full: Compute t(optimal) for a given set of workers. Allocate jobs to the first worker until it contains >= t(optimal) jobs. Then allocate jobs to the second worker until it reaches or exceeds t(optimal), and so on for N workers. (I believe this is fairly similar to the next fit bin packing algorithm.)
- Recursive partitioning: Create two buckets of jobs, B1 and B2. Iterate through all the jobs, assigning the next job to whichever bucket is currently emptier. (This handles the case where you put a large job into B1 and then you put a number of smaller jobs into B2 until they converge.) Then recursively repeat the process, dividing each bucket into 2 more buckets via the same process. Stop when you have allocated jobs into N buckets, equivalent to the available number of workers.
- Rebalanced recursive allocation. Follow the process for recursive allocation, but before recursing, also try to rebalance the buckets by swapping an item between queues in such a way as to make the total duration of each buckets closer to each other. Suppose that bucket B1 has duration 150 and bucket B2 has duration 130. We can rebalance them by looking for a job in B1 that has duration (150-130)/2 = 15 and moving it to B2. If we can’t find a job whose duration is exactly 15, we can just use the closest approximation we can find. We would not swap anything if there are no items we can move that improve the balance between buckets.
Intuitively, I expect the recursive algorithms to work better than the round-robin or next-fit approaches.
Measuring the results
As the total duration of all jobs t(total) grows very large compared to the maximum duration of any individual job t(individual_max), we should be able to converge on solutions that approach the optimal (continuous) target t(optimal) mentioned above.
I wrote a ruby script to generate random input sets and test each allocation algorithm against them at various sizes. The results were as follows:
Small scale: 4 workers, 40 jobs
Ran 150 rounds of allocate_from_ends with 40 jobs in 4 workers
=> Duration 3.1ms
=> Mean variance from optimal allocation: 0.912%
=> Max variance from optimal allocation: 4.831%
Ran 150 rounds of allocate_until_full with 40 jobs in 4 workers
=> Duration 3.5ms
=> Mean variance from optimal allocation: 2.237%
=> Max variance from optimal allocation: 7.23%
Ran 150 rounds of recursive_allocation with 40 jobs in 4 workers
=> Duration 4.1ms
=> Mean variance from optimal allocation: 4.999%
=> Max variance from optimal allocation: 9.178%
Ran 150 rounds of optimized_recursive_allocation with 40 jobs in 4 workers
=> Duration 7.8ms
=> Mean variance from optimal allocation: 0.568%
=> Max variance from optimal allocation: 2.807%
===================
Medium scale: 16 workers, 250 jobs
Ran 150 rounds of allocate_from_ends with 250 jobs in 16 workers
=> Duration 17.2ms
=> Mean variance from optimal allocation: 3.921%
=> Max variance from optimal allocation: 11.217%
Ran 150 rounds of allocate_until_full with 250 jobs in 16 workers
=> Duration 17.8ms
=> Mean variance from optimal allocation: 1.274%
=> Max variance from optimal allocation: 4.754%
Ran 150 rounds of recursive_allocation with 250 jobs in 16 workers
=> Duration 23.4ms
=> Mean variance from optimal allocation: 3.223%
=> Max variance from optimal allocation: 5.441%
Ran 150 rounds of optimized_recursive_allocation with 250 jobs in 16 workers
=> Duration 39.3ms
=> Mean variance from optimal allocation: 0.194%
=> Max variance from optimal allocation: 1.078%
===================
Large scale: 32 workers, 20000 jobs
Ran 30 rounds of allocate_from_ends with 20000 jobs in 32 workers
=> Duration 229.9ms
=> Mean variance from optimal allocation: 0.16%
=> Max variance from optimal allocation: 0.175%
Ran 30 rounds of allocate_until_full with 20000 jobs in 32 workers
=> Duration 263.5ms
=> Mean variance from optimal allocation: 0.006%
=> Max variance from optimal allocation: 0.048%
Ran 30 rounds of recursive_allocation with 20000 jobs in 32 workers
=> Duration 342.9ms
=> Mean variance from optimal allocation: 0.039%
=> Max variance from optimal allocation: 0.056%
Ran 30 rounds of optimized_recursive_allocation with 20000 jobs in 32 workers
=> Duration 350.6ms
=> Mean variance from optimal allocation: 0.0%
=> Max variance from optimal allocation: 0.001%
TLDR:
- As expected, smaller input sizes correspond overall with higher variance from the optimal outcome.
- The worst observed performance in any single experiment was 11.2% deviance from optimal, in the medium scale test of the “allocate from ends” strategy. Mean deviance for that strategy at that scale was lower, at 3.9%.
- At large volumes, all these algorithms have well under 1% error. So our expectation that at large scale, we would converge on the optimal (continuous) division case, seemed to be proved true.
- The optimized recursive strategy was the most successful across the board. Given a large input size of 20,000 jobs, it had at worst 0.001% deviance from the optimal solution over 30 experimental trials.
Takeaways
- I would never try to implement an optimized allocation algorithm at work without having some quiet time to do research first. Kind of disliking that someone made me try to do it as a live-coding exercise.
- Bin packing theory seems like an interesting thing to look into further. Probably a good thing to catch up on, given that I never got a CS degree.
- In practice, at work, we just delegate this problem (CI job allocation) to a SaaS solution (Knapsack Pro). We don’t try to roll our own solutions here.
Coda
The experimental code was as follows.
############################
# Experiment - a class that tests a given allocation strategy for a given number of rounds,
# using a specified number of buckets and a specified volume of randomly generated tests
# After the experiment is done, it outputs its results to stdout.
############################
class Experiment
def initialize(strategy, rounds:, buckets:, tests:)
total_variance = 0
max_variance = 0
start = Time.now
rounds.times do
test_mean, test_max = run_test(strategy, buckets, sample(tests))
total_variance += test_mean
max_variance = test_max if test_max > max_variance
end
duration = (Time.now - start)*1000 # ms
puts "Ran #{rounds} rounds of #{strategy} with #{tests} tests in #{buckets} buckets"
puts " => Duration #{duration.round(1)}ms"
puts " => Mean variance from optimal allocation: #{(total_variance/rounds).round(3)}%"
puts " => Max variance from optimal allocation: #{max_variance.round(3)}%"
end
def run_test(strategy, n, test_files)
sorted_tests = test_files.to_a.sort_by(&:last)
# optimally, we should have (sum(total_test_time)/n) in each bucket
total_time = sorted_tests.map(&:last).inject(&:+)
target = total_time.to_f / n
buckets = (1..n).map { Bucket.new(target) }
full_buckets = 0
puts "Allocating #{test_files.count} tests into #{n} buckets, optimal bucket size: #{target}" if ENV['VERBOSE']
buckets = Allocator.new.send strategy, sorted_tests, buckets, target
# Report results
puts buckets.map(&:status) if ENV['VERBOSE']
# Return average variance
mean_variance = buckets.map {|b| b.variance.abs}.sum / buckets.size
max_variance = buckets.map {|b| b.variance.abs}.max
puts " ... mean variance #{mean_variance}, max variance #{max_variance}" if ENV['VERBOSE']
# Return mean variance
[mean_variance, max_variance]
end
# Generates a random test data set
def sample(size)
(1..size).map { |i| ["test-#{i}.rb".to_sym, Integer(rand * 250) + 1]}.to_h
end
end
############################
# Allocator - a class implementing several possible algorithms for allocating input
# items to buckets.
# The inputs to these methods are called `sorted_tests` because the original spec was for
# each item to represent a test file with an integer duration in seconds, and we expect
# the Experiment class to start out by sorting the inputs before invoking these methods.
############################
class Allocator
# Allocate sorted tests into buckets, pulling from the ends of the array at each iteration
def allocate_from_ends(sorted_tests, buckets, target)
current_bucket = 0
while sorted_tests.count > 0
current = buckets[current_bucket % buckets.size]
current.add sorted_tests.pop
current.add(sorted_tests.shift) if sorted_tests.size > 0 # we might run out of tests
current_bucket += 1
end
buckets
end
# Allocate sorted tests into buckets, removing buckets from rotation once they reach capacity
def allocate_until_full(sorted_tests, buckets, target)
current_bucket = 0
full_buckets = 0
while sorted_tests.count > 0 && full_buckets < buckets.count
current = buckets[current_bucket % buckets.size]
if current.total > target # don't add to already full buckets
if !current.full
current.full = true
full_buckets += 1
puts " ... just filled up a bucket: #{current.status}" if ENV['VERBOSE']
end
else
current.add sorted_tests.pop
end
current_bucket += 1
end
buckets
end
# Repeatedly divide the list into two as-close-to-equal-halves as we can
def recursive_allocation(sorted_tests, buckets, target)
recursion_levels = Math.log(buckets.count, 2)
raise "Must pass number of buckets that are a power of 2" unless recursion_levels % 1 == 0
# Don't reuse the buckets we were given, we'll regenerate them in the recursion process
buckets.clear
buckets.concat recursive_list_division(sorted_tests, recursion_levels.to_i, target, optimize: false).flatten
end
# Same as recursive_allocation, but also make an effort to balance out imbalances between left and right halves
def optimized_recursive_allocation(sorted_tests, buckets, target)
recursion_levels = Math.log(buckets.count, 2)
raise "Must pass number of buckets that are a power of 2" unless recursion_levels % 1 == 0
# Don't reuse the buckets we were given, we'll regenerate them in the recursion process
buckets.clear
buckets.concat recursive_list_division(sorted_tests, recursion_levels.to_i, target, optimize: true).flatten
end
# Actually does the work for the recursive strategy
def recursive_list_division(items, levels_remaining, target, optimize: false)
left = Bucket.new(target)
right = Bucket.new(target)
# Allocate the next item to whichever list is currently emptier
while items.size > 0
if left.total <= right.total
left.add items.pop
else
right.add items.pop
end
end
puts "Balanced two lists: #{left.total} vs #{right.total}" if ENV['VERBOSE']
# Try to balance the two lists if optimization is enabled
if optimize
25.times do
left, right = recursive_list_balance(left, right)
end
end
if levels_remaining > 1
[ recursive_list_division(left.items, levels_remaining - 1, target, optimize: optimize),
recursive_list_division(right.items, levels_remaining - 1, target, optimize: optimize)]
else
[left, right]
end
end
# Attempts to balance out gaps between two unequal lists
def recursive_list_balance(left, right)
# Compute the gap between left and right -- that's what we hope to improve
difference = (left.total - right.total).abs
# Don't bother doing anything if the two lists happen to be identical
return [left, right] if difference == 0
# Now search for an element in the larger list
# that is about half the size of the difference between lists:
larger = left.total > right.total ? left.items : right.items
candidate = nil
for i in (0..larger.size - 1) do
if larger[i][1] > difference / 2
candidate = i
break
end
end
return [left, right] if candidate.nil?
# Either the candidate item or the item preceding it should be the best item to swap
if candidate > 0 && ((larger[candidate][1] - difference) > (larger[candidate - 1][1] - difference))
candidate -= 1
end
# Let's only do the swap if it actually improves the balance between buckets
if larger[candidate][1] < difference
puts "Balancing lists: moving item of size #{larger[candidate][1]} to balance a gap of size #{difference}" if ENV['VERBOSE']
if left.total > right.total
right.add left.remove(candidate)
else
left.add right.remove(candidate)
end
else
puts "Best candidate was #{larger[candidate][1]} to balance a gap of size #{difference}, skipping" if ENV['VERBOSE']
end
[left, right]
end
end
############################
# Bucket - a utility class that contains a bucket of items and calculates their total size.
# Initialize it with a target size, and then it can calculate the
# difference between its actual total and the optimal target result size.
# Let's call this difference the `variance`.
# It implements two list operations, `add` and `remove`.
# It has a flag called `full` that allocation algorithms might use
# to keep track of buckets that have exceeded their optimal capacity.
############################
class Bucket
attr_accessor :total
attr_accessor :full
attr_accessor :target
attr_accessor :items
def initialize(target=0)
@total = 0
@items = []
@target = target
@full = false
end
def add(item)
@total += item[1]
@items << item
end
def remove(item_index)
raise "can't remove invalid item with index #{item_index}, total items #{@items.size}" if @items[item_index].nil?
item = @items.delete_at(item_index)
@total -= item[1]
item
end
def status
"#{variance}% variance, #{@items.size} items, #{@total} total, #{Integer(@total - @target)} over target"
end
# how far off we are from the target bucket size, in percent
def variance
(((@total - @target).to_f / @target) * 100).round(4)
end
end
############################
# Imperative script code:
############################
# little wrapper function to kick off each set of experiments
def run_experiments(label, rounds:, buckets:, tests:)
puts "==================="
puts "#{label}: #{buckets} buckets, #{tests} tests"
Experiment.new :allocate_from_ends, rounds: rounds, buckets: buckets, tests: tests
Experiment.new :allocate_until_full, rounds: rounds, buckets: buckets, tests: tests
Experiment.new :recursive_allocation, rounds: rounds, buckets: buckets, tests: tests
Experiment.new :optimized_recursive_allocation, rounds: rounds, buckets: buckets, tests: tests
end
run_experiments 'Two buckets', rounds: 150, buckets: 2, tests: 40
run_experiments 'Small scale', rounds: 150, buckets: 4, tests: 40
run_experiments 'Medium scale', rounds: 150, buckets: 16, tests: 250
run_experiments 'Large scale', rounds: 30, buckets: 32, tests: 20000
# The large scale one is run for fewer rounds because it gets slow