Alex Foglia
Alex Foglia

Reputation: 530

Test for conditional independence in python as part of the PC algorithm

I'm implementing the PC algorithm in python. Such algorithm constructs the graphical model of a n-variate gaussian distribution. This graphical model is basically the skeleton of a directed acyclic graph, which means that if a structure like:

(x1)---(x2)---(x3)

Is in the graph, then x1 is independent by x3 given x2. More generally if A is the adjacency matrix of the graph and A(i,j)=A(j,i) = 0 (there is a missing edge between i and j) then i and j are conditionally independent, by all the variables that appear in any path from i to j. For statistical and machine learning purposes, it is be possible to "learn" the underlying graphical model. If we have enough observations of a jointly gaussian n-variate random variable we could use the PC algorithm that works as follows:

given n as the number of variables observed, initialize the graph as G=K(n) 
for each pair i,j of nodes:
    if exists an edge e from i to j:
        look for the neighbours of i
        if j is in neighbours of i then remove j from the set of neighbours
        call the set of neighbours k
        TEST if i and j are independent given the set k, if TRUE:
             remove the edge e from i to j

This algorithm computes also the separating set of the graph, that are used by another algorithm that constructs the dag starting from the skeleton and the separation set returned by the pc algorithm. This is what i've done so far:

def _core_pc_algorithm(a,sigma_inverse):
l = 0
N = len(sigma_inverse[0])
n = range(N)
sep_set = [ [set() for i in n] for j in n]
act_g = complete(N)
z = lambda m,i,j : -m[i][j]/((m[i][i]*m[j][j])**0.5)
while l<N:
    for (i,j) in itertools.permutations(n,2):
        adjacents_of_i = adj(i,act_g)
        if j not in adjacents_of_i:
            continue
        else:
            adjacents_of_i.remove(j)
        if len(adjacents_of_i) >=l:
            for k in itertools.combinations(adjacents_of_i,l):
                if N-len(k)-3 < 0:
                    return (act_g,sep_set)
                if test(sigma_inverse,z,i,j,l,a,k):
                    act_g[i][j] = 0
                    act_g[j][i] = 0
                    sep_set[i][j] |= set(k)
                    sep_set[j][i] |= set(k)
    l = l + 1
return (act_g,sep_set)

a is the tuning-parameter alpha with which i will test for conditional independence, and sigma_inverse is the inverse of the covariance matrix of the sampled observations. Moreover, my test is:

def test(sigma_inverse,z,i,j,l,a,k):
    def erfinv(x): #used to approximate the inverse of a gaussian cumulative density function
        sgn = 1
        a = 0.147
        PI = numpy.pi
        if x<0:
            sgn = -1
        temp = 2/(PI*a) + numpy.log(1-x**2)/2
        add_1 = temp**2
        add_2 = numpy.log(1-x**2)/a
        add_3 = temp
        rt1 = (add_1-add_2)**0.5
        rtarg = rt1 - add_3
        return sgn*(rtarg**0.5)
    def indep_test_ijK(K): #compute partial correlation of i and j given ONE conditioning variable K
        part_corr_coeff_ij = z(sigma_inverse,i,j) #this gives the partial correlation coefficient of i and j
        part_corr_coeff_iK = z(sigma_inverse,i,K) #this gives the partial correlation coefficient of i and k
        part_corr_coeff_jK = z(sigma_inverse,j,K) #this gives the partial correlation coefficient of j and k
        part_corr_coeff_ijK = (part_corr_coeff_ij - part_corr_coeff_iK*part_corr_coeff_jK)/((((1-part_corr_coeff_iK**2))**0.5) * (((1-part_corr_coeff_jK**2))**0.5)) #this gives the partial correlation coefficient of i and j given K
        return part_corr_coeff_ijK == 0 #i independent from j given K if partial_correlation(i,k)|K == 0 (under jointly gaussian assumption) [could check if abs is < alpha?]
    def indep_test():
        n = len(sigma_inverse[0])    
        phi = lambda p : (2**0.5)*erfinv(2*p-1)
        root = (n-len(k)-3)**0.5
        return root*abs(z(sigma_inverse,i,j)) <= phi(1-a/2)
if l == 0:
    return z(sigma_inverse,i,j) == 0 #i independent from j <=> partial_correlation(i,j) == 0 (under jointly gaussian assumption) [could check if abs is < alpha?]
elif l == 1:
    return indep_test_ijK(k[0])
elif l == 2:
    return indep_test_ijK(k[0]) and indep_test_ijK(k[1]) #ASSUMING THAT IJ ARE INDEPENDENT GIVEN Y,Z <=> IJ INDEPENDENT GIVEN Y AND IJ INDEPENDENT GIVEN Z  
else: #i have to use the independent test with the z-fisher function
    return indep_test()

Where z is a lambda that receives a matrix (the inverse of the covariance matrix), an integer i, an integer j and it computes the partial correlation of i and j given all the rest of variables with the following rule (which I read in my teacher's slides):

corr(i,j)|REST = -var^-1(i,j)/sqrt(var^-1(i,i)*var^-1(j,j))

The main core of this application is the indep_test() function:

    def indep_test():
        n = len(sigma_inverse[0])    
        phi = lambda p : (2**0.5)*erfinv(2*p-1)
        root = (n-len(k)-3)**0.5
        return root*abs(z(sigma_inverse,i,j)) <= phi(1-a/2)

This function implements a statistical test which uses the fisher's z-transform of estimated partial correlations. I am using this algorithm in two ways:

In both cases i do not always get correct results, either because I know the DAG underlying a certain dataset, or because i know the generative model but it does not coincide with the one my algorithm learns. I perfectly know that this is a non-trivial task and I may have misunderstand theoretical concept as well as committed error even in parts of the code i have omitted here; but first i'd like to know (from someone who is more experienced than me), if the test i wrote is right, and also if there are library functions that perform this kind of tests, i tried searching but i couldn't find any suitable function.

Upvotes: 5

Views: 3612

Answers (1)

Alex Foglia
Alex Foglia

Reputation: 530

I get to the point. The most critical issue in the above code, regards the following error:

sqrt(n-len(k)-3)*abs(z(sigma_inverse[i][j])) <= phi(1-alpha/2)

I was mistaking the mean of n, it is not the size of the precision matrix but the number of total multi-variate observations (in my case, 10000 instead of 5). Another wrong assumption is that z(sigma_inverse[i][j]) has to provide the partial correlation of i and j given all the rest. That's not correct, z is the Fisher's transform on a proper subset of the precision matrix which estimates the partial correlation of i and j given the K. The correct test is the following:

if len(K) == 0: #CM is the correlation matrix, we have no variables conditioning (K has 0 length)
    r = CM[i, j] #r is the partial correlation of i and j 
elif len(K) == 1: #we have one variable conditioning, not very different from the previous version except for the fact that i have not to compute the correlations matrix since i start from it, and pandas provide such a feature on a DataFrame
    r = (CM[i, j] - CM[i, K] * CM[j, K]) / math.sqrt((1 - math.pow(CM[j, K], 2)) * (1 - math.pow(CM[i, K], 2))) #r is the partial correlation of i and j given K
else: #more than one conditioning variable
    CM_SUBSET = CM[np.ix_([i]+[j]+K, [i]+[j]+K)] #subset of the correlation matrix i'm looking for
    PM_SUBSET = np.linalg.pinv(CM_SUBSET) #constructing the precision matrix of the given subset
    r = -1 * PM_SUBSET[0, 1] / math.sqrt(abs(PM_SUBSET[0, 0] * PM_SUBSET[1, 1]))
r = min(0.999999, max(-0.999999,r)) 
res = math.sqrt(n - len(K) - 3) * 0.5 * math.log1p((2*r)/(1-r)) #estimating partial correlation with fisher's transofrmation
return 2 * (1 - norm.cdf(abs(res))) #obtaining p-value

I hope someone could find this helpful

Upvotes: 4

Related Questions