rocksNwaves
rocksNwaves

Reputation: 6163

What do I get from Queue.get() (Python)

Overall question: How do I know what I am getting from a Queue object when I call Queue.get()? How do I sort it, or identify it? Can you get specific items from the Queue and leave others?

Context:

I wanted to learn a little about multi-proccessing (threading?) to make solving a matrix equation more efficient.

To illustrate, below is my working code for solving the matrix equation Ax = b without taking advantage of multiple cores. The solution is [1,1,1].

def jacobi(A, b, x_k):

    N = len(x_k)
    x_kp1 = np.copy(x_k)
    E_rel = 1
    iteration = 0

    if (N != A.shape[0] or N != A.shape[1]):
        raise ValueError('Matrix/vector dimensions do not match.')

    while E_rel > ((10**(-14)) * (N**(1/2))):
        for i in range(N):

            sum = 0

            for j in range(N):
                if j != i:
                    sum = sum + A[i,j] * x_k[j]

            x_kp1[i] =(1 / A[i,i]) * (b[i] - sum)

        E_rel = 0
        for n in range(N):
            E_rel = E_rel + abs(x_kp1[n] - x_k[n]) / ((abs(x_kp1[n]) + abs(x_k[n])) / 2)

        iteration += 1
        # print("relative error for this iteration:", E_rel)
        if iteration < 11:
            print("iteration ", iteration, ":", x_kp1)

        x_k = np.copy(x_kp1)

    return x_kp1

if __name__ == '__main__':

    A = np.matrix([[12.,7,3],[1,5,1],[2,7,-11]])
    b = np.array([22.,7,-2])
    x = np.array([1.,2,1])

    print("Jacobi Method:")
    x_1 = jacobi(A, b, x)

Ok, so I wanted to convert this code following this nice example: https://p16.praetorian.com/blog/multi-core-and-distributed-programming-in-python

So I got some code that runs and converges to the correct solution in the same number of iterations! That's really great, but what is the guarantee that this happens? It seems like Queue.get() just grabs whatever result from whatever process finished first (or last?). I was actually very surprised when my code ran, as I expected

  for i in range(N):
        x_update[i] = q.get(True)

to jumble up the elements of the vector.

Here is my code updated using the multi-processing library:

import numpy as np
import multiprocessing as mu

np.set_printoptions(precision=15)

def Jacobi_step(index, initial_vector, q):
    N = len(initial_vector)
    sum = 0
    for j in range(N):
        if j != i:
            sum = sum + A[i, j] * initial_vector[j]

    # this result is the updated element at given index of our solution vector.
    q.put((1 / A[index, index]) * (b[index] - sum))


if __name__ == '__main__':

    A = np.matrix([[12.,7,3],[1,5,1],[2,7,-11]])
    b = np.array([22.,7,-2])
    x = np.array([1.,2,1])
    q = mu.Queue()
    N = len(x)
    x_update = np.copy(x)
    p = []
    error = 1
    iteration = 0

    while error > ((10**(-14)) * (N**(1/2))):

        # assign a process to each element in the vector x, 
        # update one element with a single Jacobi step
        for i in range(N):
            process = mu.Process(target=Jacobi_step(i, x, q))
            p.append(process)
            process.start()

        # fill in the updated vector with each new element aquired by the last step
        for i in range(N):
            x_update[i] = q.get(True)

        # check for convergence 
        error = 0
        for n in range(N):
            error = error + abs(x_update[n] - x[n]) / ((abs(x_update[n]) + abs(x[n])) / 2)
            p[i].join()

        x = np.copy(x_update)

        iteration += 1
        print("iteration ", iteration, ":", x)
        del p[:]

Upvotes: 0

Views: 1116

Answers (1)

nosklo
nosklo

Reputation: 223152

A Queue is first-in-first-out which means the first element inserted is the first element retrieved, in order of insertion.

Since you have no way to control that, I suggest you insert tuples in the Queue, containing the value and some identifying object that can be used to sort/relate to the original computation.

result = (1 / A[index, index]) * (b[index] - sum)
q.put((index, result))

This example puts the index in the Queue together with the result, so that when you .get() later you get the index too and use it to know which computation this is for:

i, x_i = q.get(True)
x_update[i] = x_i

Or something like that.

Upvotes: 1

Related Questions