user1057741
user1057741

Reputation: 265

algorithm to efficiently send request to nodes

I want to uniform distribute traffic to various nodes depending on the configuration of each node.There can be max 100 nodes and percentage of traffic to be distributed to multiple nodes can be configured .


So say,if there are 4 nodes :-

node 1 - 20
node 2 - 50
node 3 - 10
node 4 - 20
------------
sum    - 100
------------

Sum of value of all nodes should be 100. Example :-

node 1 - 50
node 2 - 1
node 3 - 1
node 4 - 1
.
.
.
node 100 - 1

In above configuration there are total 51 nodes. node 1 is 50 and rest 50 nodes are configured to 1.

In one senario , request can be distributed in below pattern :- node1,node2,node3,node4,node5,....,node51,node1,node1,node1,node1,node1,node1,node1,......

Above distribution is inefficient because we are sending too much traffic continously to node1, which may result in rejection of request by node1.

In another senario , request can be distributed in below pattern :- node1,node2,node1,node3,node1,node4,node1,node5,node1,node6,node1,node7,node1,node8......

In above senario request is dributed more effciently.

I have found below code but not able to understand the idea behind it.

func()
{
  for(int itr=1;itr<=total_requests+1;itr++)
  {
      myval = 0;           
      // Search the node that needs to be incremented
       // to best approach the rates of all branches                      
      for(int j=0;j<Total_nodes;j++)
      {

         if((nodes[j].count*100/itr > nodes[j].value) ||
           ((nodes[j].value - nodes[j].count*100/itr) < myval) ||
           ((nodes[j].value==0 && nodes[j].count ==0 )))
              continue;

            cand = j;
            myval = abs((long)(nodes[j].count*100/itr - nodes[j].value));
       }
       nodes[cand].count++;

  }

  return nodes[cand].nodeID;
}

In above code , total_requests is total number of request received till now. total_requests variable will be incremented everytime , consider it as a global value for understanding purpose.

Total_nodes , is the total number of nodes configured and each node is represented using following structure.

nodes is a struct :-

struct node{
  int count;
  int value;
  int nodeID;
};

For example :-

If 4 nodes are configured :-
node 1 - 20
node 2 - 50
node 3 - 10
node 4 - 20
------------
sum    - 100
------------

There will four nodes[4] created with following values:-

node1{
  count = 0;
  value = 20;
  nodeID = 1;
};

node2{
  count = 0;
  value = 50;
  nodeID = 2;
};

node3{
  count = 0;
  value = 10;
  nodeID = 3;
};

node4{
  count = 0;
  value = 20;
  nodeID = 4;
};

Could you please explain me the algorithm or idea of how it is actually distributing it efficiently.

Upvotes: 1

Views: 237

Answers (2)

user3793679
user3793679

Reputation:

Hmmm. Seems that as you reach 100 nodes they must each take 1% of the traffic ?

I honestly have no idea what the function you provide is doing. I assume it's trying to find the node which is furthest from its long-term average load. But if total_requests is the total to date, then I don't get what the outer for(int itr=1;itr<=total_requests+1;itr++) loop is doing, unless that's actually part of some test to show how it distributes load ?

Anyway, basically what you are doing is similar to constucting a non-uniform random sequence. With up to 100 nodes, if I can assume (for a moment) that 0..999 gives sufficient resolution, then you could use an "id_vector[]" with 1000 node-ids, which is filled with n1 copies of node-1's ID, n2 copies of node-2's ID, and so on -- where node-1 is to receive n1/1000 of the traffic, and so on. The decision process is then very, very simple -- pick id_vector[random() % 1000]. Over time, the nodes will receive about the right amount of traffic.

If you are unhappy with randomly distributing traffic, then you seed the id_vector with node-ids so that you can select by "round-robin" and get a suitable frequency for each node. One way to do that would be to randomly shuffle the id_vector constructed as above (and, perhaps, reshuffle occasionally, so that if one shuffle is "bad", you aren't stuck with it). Or you could do a one-time leaky-bucket thing and fill the id_vector from that. Each time around the id_vector this guarantees each node will receive its allotted number of requests.

The finer grain you make the id_vector, the better control you get over the short term frequency of requests to each node.

Mind you, all the above rather assumes that the relative load for the nodes is constant. If not, then you would need to (very now and then ?) adjust the id_vector.


Edit to add more detail as requested...

...suppose we have just 5 nodes, but we express the "weight" of each one as n/1000, to allow for up to 100 nodes. Suppose they have IDs 1..5, and weights:

  ID=1, weight = 100
  ID=2, weight = 375
  ID=3, weight = 225
  ID=4, weight = 195
  ID=5, weight = 105

Which, obviously, add up to 1000.

So we construct an id_vector[1000] so that:

  id_vector[  0.. 99] = 1   -- first 100 entries = ID 1
  id_vector[100..474] = 2   -- next  375 entries = ID 2
  id_vector[475..699] = 3   -- next  225 entries = ID 3
  id_vector[700..894] = 4   -- next  195 entries = ID 4
  id_vector[100..999] = 5   -- last  105 entries = ID 5

And now if we shuffle id_vector[] we get a random sequence of node selections, but over 1000 requests, the right average frequency of requests to each node.

For the amusement value I had a go at a "leaky bucket" to see how well it could maintain a steady frequency of requests to each node, by filling the id_vector using one leaky bucket for each node. The code to do this, and see how well it does, and how well the simple random version does, is enclosed below.

Each leaky bucket has a cc count of the number of requests that should be sent (to other nodes) before the next request is sent to this one. Each time a request is dispatched, all buckets have their cc count decremented, and the node whose bucket has the smallest cc (or smallest id, if the cc are equal) is sent the request, and at that point the node's bucket's cc is recharged. (Each request causes all the buckets to drip once, and the bucket for the chosen node is recharged.)

The cc is the integer part of the bucket's "contents". The initial value for cc is q = 1000 / w, where w is the node's weight. And each time the bucket is recharged, q is added to cc. To do things precisely, however, we need to deal with the remainder r = 1000 % w... or to put it another way, the "contents" have a fractional part -- which is is where cr comes in. The true value of the contents is cc + cr/w (where cr/w is a true fraction, not an integer division). The initial value of that is cc = q and cr = r. Each time the bucket is recharged, q is added to cc, and r is added to cr. When cr/w >= 1/2, we round up, so cc +=1, and cr -= w (adding one to the integer part is balanced by subtracting 1 -- ie w/w -- from the fractional part). To test for cr/w >= 1/2, the code actually tests (cr * 2) >= w. Hopefully the bucket_recharge() function will make sense (now).

The leaky-bucket is run 1000 times to fill the id_vector[]. The little bit of testing shows that this maintains a pretty steady frequency for all nodes, and an exact number of packets per node every time around the id_vector[] cycle.

The little bit of testing shows that the random() shuffle approach has much more variable frequency, within each id_vector[] cycle, but still provides an exact number of packets per node for each cycle.

The steadyness of the leaky bucket assumes a steady stream of incoming requests. Which may be an entirely unrealistic assumption. If the requests arrive in large (large compared to the id_vector[] cycle, 1000 in this example) bursts, then the variability of the (simple) random() shuffle approach may be dwarfed by the variability in request arrival !


enum
{
  n_nodes  =    5,        /* number of nodes      */
  w_res    = 1000,        /* weight resolution    */
} ;

struct node_bucket
{
  int   id ;            /* 1 origin                 */

  int   cc ;            /* current count            */
  int   cr ;            /* current remainder        */

  int   q ;             /* recharge -- quotient     */
  int   r ;             /* recharge -- remainder    */

  int   w ;             /* weight                   */
} ;

static void bucket_recharge(struct node_bucket* b) ;
static void node_checkout(int weights[], int id_vector[], bool rnd) ;
static void node_shuffle(int id_vector[]) ;

/*------------------------------------------------------------------------------
 * To begin at the beginning...
 */
int
main(int argc, char* argv[])
{
  int node_weights[n_nodes] = { 100, 375, 225, 195, 105 } ;
  int id_vector[w_res] ;
  int cx ;

  struct node_bucket buckets[n_nodes] ;

  /* Initialise the buckets -- charged
   */
  cx = 0 ;
  for (int id = 0 ; id < n_nodes ; ++id)
    {
      struct node_bucket* b ;

      b = &buckets[id] ;

      b->id = id + 1 ;              /* 1 origin     */
      b->w  = node_weights[id] ;

      cx += b->w ;

      b->q  = w_res / b->w ;
      b->r  = w_res % b->w ;

      b->cc = 0 ;
      b->cr = 0 ;

      bucket_recharge(b) ;
    } ;

  assert(cx == w_res) ;

  /* Run the buckets for one cycle to fill the id_vector
   */
  for (int i = 0 ; i < w_res ; ++i)
    {
      int id ;

      id = 0 ;
      buckets[id].cc -= 1 ;         /* drip     */

      for (int jd = 1 ; jd < n_nodes ; ++jd)
        {
          buckets[jd].cc -= 1 ;     /* drip     */

          if (buckets[jd].cc < buckets[id].cc)
            id = jd ;
        } ;

      id_vector[i] = id + 1 ;       /* '1' origin   */

      bucket_recharge(&buckets[id]) ;
    } ;

  /* Diagnostics and checking
   *
   * First, check that the id_vector contains exactly the right number of
   * each node, and that the bucket state at the end is the same (apart from
   * cr) as it is at the beginning.
   */
  int nf[n_nodes] = { 0 } ;

  for (int i = 0 ; i < w_res ; ++i)
    nf[id_vector[i] - 1] += 1 ;

  for (int id = 0 ; id < n_nodes ; ++id)
    {
      struct node_bucket* b ;

      b = &buckets[id] ;

      printf("ID=%2d weight=%3d freq=%3d  (cc=%3d  cr=%+4d  q=%3d  r=%3d)\n",
                                b->id, b->w, nf[id], b->cc, b->cr, b->q, b->r) ;
    } ;

  node_checkout(node_weights, id_vector, false /* not random */) ;

  /* Try the random version -- with shuffled id_vector.
   */
  int iv ;

  iv = 0 ;
  for (int id = 0 ; id < n_nodes ; ++id)
    {
      for (int i = 0 ; i < node_weights[id] ; ++i)
        id_vector[iv++] = id + 1 ;
    } ;
  assert(iv == 1000) ;

  for (int s = 0 ; s < 17 ; ++s)
    node_shuffle(id_vector) ;

  node_checkout(node_weights, id_vector, true /* random */) ;

  return 0 ;
} ;

static void
bucket_recharge(struct node_bucket* b)
{
  b->cc += b->q ;
  b->cr += b->r ;

  if ((b->cr * 2) >= b->w)
    {
      b->cc += 1 ;
      b->cr -= b->w ;
    } ;
} ;

static void
node_checkout(int weights[], int id_vector[], bool rnd)
{
  struct node_test
  {
    int   last_t ;
    int   count ;
    int   cycle_count ;
    int   intervals[w_res] ;
  } ;

  struct node_test tests[n_nodes] = { { 0 } } ;

  printf("\n---Test Run: %s ---\n", rnd ? "Random Shuffle" : "Leaky Bucket") ;

  /* Test run
   */
  int s ;
  s = 0 ;
  for (int t = 1 ; t <= (w_res * 5) ; ++t)
    {
      int id ;

      id = id_vector[s++] - 1 ;

      if (tests[id].last_t != 0)
        tests[id].intervals[t - tests[id].last_t] += 1 ;

      tests[id].count += 1 ;
      tests[id].last_t = t ;

      if (s == w_res)
        {
          printf("At time %4d\n", t) ;

          for (id = 0 ; id < n_nodes ; ++id)
            {
              struct node_test*   nt ;
              long   total_intervals ;

              nt = &tests[id] ;

              total_intervals = 0 ;
              for (int i = 0 ; i < w_res ; ++i)
                total_intervals += (long)i * nt->intervals[i] ;

              printf("  ID=%2d weight=%3d count=%4d(+%3d)  av=%6.2f vs %6.2f\n",
                        id+1, weights[id], nt->count, nt->count - nt->cycle_count,
                                          (double)total_intervals / nt->count,
                                          (double)w_res / weights[id]) ;
              nt->cycle_count = nt->count ;

              for (int i = 0 ; i < w_res ; ++i)
                {
                  if (nt->intervals[i] != 0)
                    {
                      int h ;

                      printf("  %6d x %4d ", i, nt->intervals[i]) ;

                      h = ((nt->intervals[i] * 75) + ((nt->count + 1) / 2))/
                                                                     nt->count ;
                      while (h-- != 0)
                        printf("=") ;
                      printf("\n") ;
                    } ;
                } ;
            } ;

          if (rnd)
            node_shuffle(id_vector) ;

          s = 0 ;
        } ;
    } ;
} ;

static void
node_shuffle(int id_vector[])
{
  for (int iv = 0 ; iv < (w_res - 1) ; ++iv)
    {
      int is, s ;

      is = (int)(random() % (w_res - iv)) + iv ;

      s             = id_vector[iv] ;
      id_vector[iv] = id_vector[is] ;
      id_vector[is] = s ;
    } ;
} ;

Upvotes: 1

David Eisenstat
David Eisenstat

Reputation: 65427

nodes[j].count*100/itr is the floor of the percentage of requests that node j has answered so far. nodes[j].value is the percentage of requests that node j should answer. The code that you posted looks for the node lagging the furthest behind its target percentage (more or less, subject to the wobbliness of integer division) and assigns it the next request.

Upvotes: 1

Related Questions