Reputation: 2490
I have this resulting correlation matrix:
id | row | col | corr | target_corr |
---|---|---|---|---|
0 | a | b | 0.95 | 0.2 |
1 | a | c | 0.7 | 0.2 |
2 | a | d | 0.2 | 0.2 |
3 | b | a | 0.95 | 0.7 |
4 | b | c | 0.35 | 0.7 |
5 | b | d | 0.65 | 0.7 |
6 | c | a | 0.7 | 0.6 |
7 | c | b | 0.35 | 0.6 |
8 | c | d | 0.02 | 0.6 |
9 | d | a | 0.2 | 0.3 |
10 | d | b | 0.65 | 0.3 |
11 | d | c | 0.02 | 0.3 |
After filtering high correlated variables based on "corr" variable I try to add new column that will compare will decide to mark "keep" the least correlated variable from "row" or mark "drop" of that variable for the most correlated variable "target_corr" column. In other works from corelated variables matching cut > 0.5 select the one least correlated to "target_corr":
Expected result:
id | row | col | corr | target_corr | drop/keep |
---|---|---|---|---|---|
0 | a | b | 0.95 | 0.2 | keep |
1 | a | c | 0.7 | 0.2 | keep |
2 | b | a | 0.95 | 0.7 | drop |
3 | b | d | 0.65 | 0.7 | drop |
4 | c | a | 0.7 | 0.6 | drop |
5 | d | b | 0.65 | 0.3 | keep |
This approach does use very large dataframes so resulting corr matrix for example is > 100kx100k and generated using pyspark:
def corrwith_matrix_no_save(df, data_cols=None, select_targets = None, method='pearson'):
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.mllib.stat import Statistics
start_time = time.time()
vector_col = "corr_features"
if data_cols == None and select_targets == None:
data_cols = df.columns
select_target = list(df.columns)
assembler = VectorAssembler(inputCols=data_cols, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)
matrix = Correlation.corr(df_vector, vector_col, method)
result = matrix.collect()[0]["pearson({})".format(vector_col)].values
final_df = pd.DataFrame(result.reshape(-1, len(data_cols)), columns=data_cols, index=data_cols)
final_df = final_df.apply(lambda x: x.abs() if np.issubdtype(x.dtype, np.number) else x )
corr_df = final_df[select_target]
#corr_df.columns = [str(col) + '_corr' for col in corr_df.columns]
corr_df['column_names'] = corr_df.index
print('Execution time for correlation_matrix function:', time.time() - start_time)
return corr_df
created the dataframe from uper triagle with numpy.triuand numpy.stack + added the target column my merging 2 resulting dataframes (if code is required can provide but will increase the content a lot so will provide only if needs clarifcation).
def corrX_to_ls(corr_mtx) :
# Get correlation matrix and upper triagle
df_target = corr_mtx['target']
corr_df = corr_mtx.drop('target', inplace=True)
up = corr_df.where(np.triu(np.ones(corr_df.shape), k=1).astype(np.bool))
print('This is triu: \n', up )
df = up.stack().reset_index()
df.columns = ['row','col','corr']
df_lsDF = df.query("row" != "col")
df_target_corr = df_target.reset_index()
df_target_corr.columns = ['target_col', 'target_corr']
sample_df = df_lsDF.merge(df_target_corr, how='left', left_ob='row', right_on='target_col')
sample_df = sample_df.drop('target_col', 1)
return (sample_df)
Now after filtering resulting dataframe based on
df.Corr > cut where cut > 0.50
got stuck at marking what variable o keep and what to drop ( I do look to mark them only then select into a list variables) ... so help on solving it will be greatly appreciated and will also benefit community when working on distributed system.
Note: Looking for example/solution to scale so I can distribute operations on executors so lists or like a group/subset of the dataframe to be done in parallel and avoid loops is what I do look, so numpy.vectorize, threading and/or multiprocessing approaches is what I do look.
Additional "thinking" from top of my mind: I do think on grouping by
"row" column
so can distribute processing each group on executors or by using lists distribute processing in parallel on executors so each list will generate a job for each thread fromThreadPool
( I done done this approach for column vectors but for very large matrix/dataframes can become inefficient so for rows I think will work).
Upvotes: 1
Views: 742
Reputation: 150735
Given final_df
as the sample input, you can try:
# filter
output = final_df.query('corr>target_corr').copy()
# assign drop/keep
output['drop_keep'] = np.where(output['corr']>2*output['target_corr'],
'keep','drop')
Output:
id row col corr target_corr drop_keep
0 0 a b 0.95 0.2 keep
1 1 a c 0.70 0.2 keep
3 3 b a 0.95 0.7 drop
6 6 c a 0.70 0.6 drop
10 10 d b 0.65 0.3 keep
Upvotes: 1