Reputation: 4505
Given an array [x1, x2, x3, ..., xk ] where xi is the number of items in box i, how can I redistribute the items so that no box contains more than N items. N is close to sum(xi)/k -- That is, N is close to every box having the same number of items. Intermediate boxes shouldn't be used to carry items -- if x1 has a surplus and x2 and x3 have deficits, x1 should send some items to x2 and to x3, but not send all its items to x2 and then let x2 resolve the surplus.
The actual problem is: each computing node has a set of samples, and after a resampling step some computer nodes might have a surplus while others have a deficit, so I'd like to re-distribute the samples while minimizing communication.
I imagine this sort of problem has a name.
Upvotes: 7
Views: 665
Reputation: 50927
I usually just see this called data redistribution, with the understanding being that if you're redistributing it you want the distribution to be optimal under some metric, like evenness between tasks.
This does come up in scientific/technical computing when you're trying to do computational load balancing. Even if you're doing computation in several dimensions, if you're redistributing spatial data that you assigning to processors by a space filling curve, this exact problem comes up, and there you often do want the data to be evenly divided.
The procedure is pretty straightforward; you start by taking an exclusive prefix sum of the xi so that you know how many items are to the "left" of you. Eg, for Noxville's example above, if you had data
[9, 6, 1, 6, 2]
the prefix sums would be
[0, 9, 15, 16, 22]
and you'd find (from the last processor's sum plus how many it has) that there are 24 items in total.
Then you figure out how big your ideal partitions would be - say, ceil(totitems / nprocs). You can do this however you like as long as every processor will agree on what all of the partition sizes will be.
Now, you have a few ways to proceed. If the data items are large in some sense and you can't have two copies of them in memory, then you can start shifting data to just your nearest neighbours. You know the number of items to your left and the "excess" or "deficit" in that direction; and you also know how many you have (and will have after you've done your part to fix the excess or deficit). So you start sending data to your left and right neighbor, and receive data from your left and right neighbour, until the processors leftward collectively have the right amount of items and you do as well.
But if you can afford to have two copies of the data, then you can take another approach which minimizes the number of messages sent. You can think of the number of cells to your left as the starting index of your local data into the "global" array. Since you know how many items each processor will end up with, you can figure out directly which process those items will end up at, and can send them directly. (For instance, in the example above, procesor 0 - which has items 0..8 - knows that if each processor but the last is going to end up with 5 data items, then values 5-8 can be sent to processor 1.) Once those are sent, you simply receive until you have the amount of data you're expecting; and you're done.
Below is a simple example of doing this in C and MPI, but the basic approach should work pretty much anywhere. MPI's prefix scan operation generates inclusive sums, so we have to subtract off our own number of values to get the exclusive sum:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <time.h>
void initdata(const int rank, const int maxvals, char **data, int *nvals) {
time_t t;
unsigned seed;
t = time(NULL);
seed = (unsigned)(t * (rank + 1));
srand(seed);
*nvals = (rand() % (maxvals-1)) + 1;
*data = malloc((*nvals+1) * sizeof(char));
for (int i=0; i<*nvals; i++) {
(*data)[i] = 'A' + (rank % 26);
}
(*data)[*nvals] = '\0';
}
int assignrank(const int globalid, const int totvals, const int size) {
int nvalsperrank = (totvals + size - 1)/size;
return (globalid/nvalsperrank);
}
void redistribute(char **data, const int totvals, const int curvals, const int globalstart,
const int rank, const int size, int *newnvals) {
const int stag = 1;
int nvalsperrank = (totvals + size - 1)/size;
*newnvals = nvalsperrank;
if (rank == size-1) *newnvals = totvals - (size-1)*nvalsperrank;
char *newdata = malloc((*newnvals+1) * sizeof(char));
newdata[(*newnvals)] = '\0';
MPI_Request requests[curvals];
int nmsgs=0;
/* figure out whose data we have, redistribute it */
int start=0;
int newrank = assignrank(globalstart, totvals, size);
for (int val=1; val<curvals; val++) {
int nextrank = assignrank(globalstart+val, totvals, size);
if (nextrank != newrank) {
MPI_Isend(&((*data)[start]), (val-1)-start+1, MPI_CHAR, newrank, stag, MPI_COMM_WORLD, &(requests[nmsgs]));
nmsgs++;
start = val;
newrank = nextrank;
}
}
MPI_Isend(&((*data)[start]), curvals-start, MPI_CHAR, newrank, stag, MPI_COMM_WORLD, &(requests[nmsgs]));
nmsgs++;
/* now receive all of our data */
int newvalssofar= 0;
int count;
MPI_Status status;
while (newvalssofar != *newnvals) {
MPI_Recv(&(newdata[newvalssofar]), *newnvals - newvalssofar, MPI_CHAR, MPI_ANY_SOURCE, stag, MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_CHAR, &count);
newvalssofar += count;
}
/* wait until all of our sends have been received */
MPI_Status statuses[curvals];
MPI_Waitall(nmsgs, requests, statuses);
/* now we can get rid of data and relace it with newdata */
free(*data);
*data = newdata;
}
int main(int argc, char **argv) {
const int maxvals=30;
int size, rank;
char *data;
int mycurnvals, mylvals, myfinalnvals;
int totvals;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
initdata(rank, maxvals, &data, &mycurnvals);
MPI_Scan( &mycurnvals, &mylvals, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD );
if (rank == size-1) totvals = mylvals;
mylvals -= mycurnvals;
MPI_Bcast( &totvals, 1, MPI_INT, size-1, MPI_COMM_WORLD );
printf("%3d : %s %d\n", rank, data, mylvals);
redistribute(&data, totvals, mycurnvals, mylvals, rank, size, &myfinalnvals);
printf("%3d after: %s\n", rank, data);
free(data);
MPI_Finalize();
return 0;
}
Running this you get the expected behaviour; note that the way I've determined the "desired" partitioning (using ceil(totvals/nprocesses)) the final processor will generally be under-loaded. Also, I've not made any attempt to ensure that order is preserved in the redistribution (although that's easy enough to do if order is important):
$ mpirun -np 13 ./distribute
0 : AAAAAAAAAAA 0
1 : BBBBBBBBBBBB 11
2 : CCCCCCCCCCCCCCCCCCCCCCCCCC 23
3 : DDDDDDD 49
4 : EEEEEEEEE 56
5 : FFFFFFFFFFFFFFFFFF 65
6 : G 83
7 : HHHHHHH 84
8 : IIIIIIIIIIIIIIIIIIIII 91
9 : JJJJJJJJJJJJJJJJJJ 112
10 : KKKKKKKKKKKKKKKKKKKK 130
11 : LLLLLLLLLLLLLLLLLLLLLLLLLLLL 150
12 : MMMMMMMMMMMMMMMMMM 178
0 after: AAAAAAAAAAABBBBB
1 after: BBBBBBBCCCCCCCCC
2 after: CCCCCCCCCCCCCCCC
3 after: DDDDDDDCEEEEEEEE
4 after: EFFFFFFFFFFFFFFF
5 after: FFFHHHHHHHIIIIIG
6 after: IIIIIIIIIIIIIIII
7 after: JJJJJJJJJJJJJJJJ
8 after: JJKKKKKKKKKKKKKK
9 after: LLLLLLLLLLKKKKKK
10 after: LLLLLLLLLLLLLLLL
11 after: LLMMMMMMMMMMMMMM
12 after: MMMM
Upvotes: 1
Reputation: 24647
I think the question should be clarified in this way:
With M
surplus nodes and K
deficit nodes we should redistribute samples between nodes to the state with 0 surplus nodes and (possibly) some deficit nodes. Samples should be exchanged in packs and number of these packs should be minimized.
Or, mathematically, we have M*K
matrix, each cell of it representing the number of samples to pass from node M
to node K
, with given sum of elements in each row and given maximum of element's sum in each column. The goal is to minimize the number of nonzero cells.
This is a kind of the "Constraint satisfaction problems". It is NP-complete. I found two classical problems that are similar to this question: "Set packing" and "Generalized exact cover".
To reduce the problem to "Set packing", we need to add (temporarily) several surplus nodes with N+1
samples each, so that after the redistribution there are no deficit nodes left. Then for each node we need to generate all possible partitions for surplus elements and for "deficit" elements. Then to the Cartesian product of surplus and deficit partitions apply the "Set packing" in its "optimization" version, which finds the minimum number of subsets.
To reduce the problem to "Generalized exact cover", for each node we need to generate all possible partitions for surplus elements and for "deficit" elements. Then we should add M
, M+1
, ... optimization nodes to minimize number of subsets in the cover. Then to the Cartesian product of surplus and deficit partitions and optimization nodes apply the "Generalized exact cover". For smaller number of optimization nodes, this algorithm will fail, for some greater number it will find the minimum number of subsets.
"Generalized exact cover" may be solved by "Knuth's Algorithm X". I don't know any algorithm for "Set packing".
All these solutions give an exact answer, but they have tremendous computational complexity, it is very unlikely that someone uses them in real schedulers. Practical approach is to use some heuristics and greedy algorithm. Just sort both the surplus nodes and the deficit nodes and apply the "best fit" strategy.
Upvotes: 1
Reputation: 11051
I don't believe your problem (as stated) is complex enough to have attracted independent study. If the machine count (call it C
) is in the thousands, and your sample counts are even trillions large, then it is trivial to send the sample counts to a coordinating master node.
The master can then trivially (O(C)
) compute N
, identify nodes violating this bound, and by how much. Notice that the sum of the excesses over the bound is exactly the minimum amount of communication required. By inserting a slack parameter when calculating N
(i.e. by accepting unbalance), you can control this quantity.
Using a sort, the ordering of nodes by sample count can be had in O(C log C)
. Walk two cursors, one from the either end, towards the middle. At each step, schedule a transfer from the large node to the small node, sized at the minimum of the remaining excess in the large, or remaining slack in the small. Advance the pointer of whichever node had the active constraint in the last sentence and repeat until the new large has no excess. (I believe this is the greedy algorithm @Noxville was getting at.)
Assuming N
is greater than the mean count of samples per node, it is trivial to see this is ideal w.r.t. minimum communication.
If your network of machines has any constraints, either as absent links or maximum flow or variable cost across an edge, then you need to break out the graph flow stuff. However you didn't mention anything like this, and computer clusters within the same data center typically have none.
Upvotes: 3
Reputation: 2624
This problem could be modeled as an instance of minimum-cost flow.
Let G be a directed graph with vertices s, t, a1, …, ak, b1, …, bk and arcs s→ai of capacity xi and cost 0, arcs ai→bj of infinite capacity and cost 0 if i = j and cost 1 if i ≠ j, and arcs bj→t of capacity N and cost 0. We want the minimum-cost flow that moves ∑i xi units from s to t. The idea is that if y units flow ai→bj, then y items are moved from box i to box j (if i ≠ j; if i = j, then no move occurs).
Using minimum-cost flow to solve this simple problem is of course using a sledgehammer to crack a nut, but several variants can be modeled as network flow problems.
If a pair of nodes i, j is not directly connected, remove the ai→bj arc.
We can start up and shut down nodes by giving them vertices on only the "a" side or the "b" side.
We can model differences in communication speeds by adjusting the costs from uniform.
We can limit the number of items two nodes exchange by limiting the capacity of their arc.
We can introduce more interior vertices and change the complete bipartite topology of the connections to model the effects of network contention.
Upvotes: 3
Reputation: 1499
I think this redistribution problem is a bit different from loadbalancing in computing.
Load balancing algorithm term usually means collection of policies/heuristics to ensure relatively even distribution of FUTURE load(and not current).
In this case, a load balancer would not redistribute the load from existing servers/systems, but any new request that comes, it will try to assign using some policy (ie. least-loaded, roundrobin etc), which will keep servers relatively evenly loaded in the long run.
http://en.wikipedia.org/wiki/Load_balancing_(computing)
The load redistribution in this question, can perhaps be implemented by moving items from maximal loaded to minimal loaded box, iteratively.
Upvotes: 0
Reputation: 544
Basically you want to go from
[9, 6, 1, 6, 2]
to
[5, 5, 4, 5, 5]
I think the best way to do this is to calculate floor(sum(array)/len(array)), and then assess the differential required to get to this position. In this case, floor( (9+6+1+6+2) / 5) = 4, hence we're looking at an initial differential of [-5, -2, +3, -2, +2]. You can then greedily swap values in adjoining pairs where the signs differ (such as transferring 2 from array[2] -> arr[1] and 2 from array[4] -> array[3]). You're then left with [-5,0,1,0,0] and from here you can greedily assign the remaining bits.
Upvotes: 0
Reputation: 3684
It sounds like you want to use consistent hashing
http://en.wikipedia.org/wiki/Consistent_hashing
Basically using a good random hash function will allow you to get unique ids for your samples that give an even distribution. It is then easy to distribute the samples across a set of nodes consistently.
See
http://www.lexemetech.com/2007/11/consistent-hashing.html http://www.tomkleinpeter.com/2008/03/17/programmers-toolbox-part-3-consistent-hashing/
for more information.
Upvotes: 1