Liam McIntyre
Liam McIntyre

Reputation: 364

Can this pandas workflow be converted to dask?

Please be nice - I'm not a proper programmer, I'm a scientist and I've read as many docs on this as I can find (they're a bit sparse).

I'm trying to convert this pandas code into dash because my input file is ~0.5TB with gz and it loads too slowly in native pandas. I have a 3 TB machine, btw.

This is an example of what I'm doing with pandas:

df = pd.DataFrame([['chr1',33329,17,'''33)'6'4?1&AB=?+..''','''X%&=E&!%,0("&"Y&!'''],
                    ['chr1',33330,15,'''6+'/7=1@><C1*'*''','''X%=E!%,("&"Y&&!'''],
                    ['chr1',33331,13,'''2*3A@/9@CC3--''','''X%E!%,("&"Y&!'''],
                    ['chr1',33332,1,'''4**(,:3)+7-@<(0-''','''X%&E&!%,0("&"Y&!'''],
                    ['chr1',33333,2,'''66(/C=*42A:.&*''','''X%=&!%0("&"&&!''']],
                 columns = ['chrom','pos','depth','phred','map'])

df.loc[:,'phred'] = [(sum(map(ord,i))-len(i)*33)/len(i) for i in df.loc[:,"phred"]]
df.loc[:,"map"] = [(sum(map(ord,i)))/len(i) for i in df.loc[:,"map"]]
df = df.astype({'phred': 'int32', 'map': 'int32'})
df.query('(depth < 10) | (phred < 7) | (map < 10)', inplace=True)
for chrom, df_tmp in df.groupby('chrom'):
    df_end = df_tmp[~((df_tmp.pos.shift(0) == df_tmp.pos.shift(-1)-1))]
    df_start = df_tmp[~((df_tmp.pos.shift(0) == df_tmp.pos.shift(+1)+1))]
    for start, end in zip(df_start.pos, df_end.pos):
        print (start, end)

Gives

33332 33333

This works (to find regions of a cancer genome with no data) and it's optimised as much as I know how.

I load the real thing like:

df = pd.read_csv(
    '/Users/liamm/Downloads/test_head33333.tsv.gz',
     sep='\t',
    header=None,
    index_col=None,
    usecols=[0,1,3,5,6],
    names = ['chrom','pos','depth','phred','map']
)

and I can do the same with Dask (way faster!):

df = dd.read_csv(
    '/Users/liamm/Downloads/test_head33333.tsv.gz',
     sep='\t',
    header=None,
    usecols=[0,1,3,5,6],
    compression='gzip',
    blocksize=None,
    names = ['chrom','pos','depth','phred','map']
)

but i'm stuck here:

ff=[(sum(map(ord,i))-len(i)*33)/len(i) for i in df.loc[:,"phred"]]
df['phred'] = ff

Error: Column assignment doesn't support type list

Question - is this sort of thing possible? If so are there good tutes somewhere? I need to convert the whole block of pandas code above.

Thanks in advance!

Upvotes: 1

Views: 147

Answers (2)

jsmart
jsmart

Reputation: 3001

@rpanai noted that you could eliminate the for loops. The following example uses groupby() (and a couple helper columns) to find the start and end position for each contiguous sequence of positions.

Using only pandas built-in functions should be compatible with Dask (and fast).

First, create demo data frame with multiple chromosomes and multiple contiguous blocks of positions:

data1 = {
    'chrom' : 'chrom_1',
    'pos' : [1000, 1001, 1002, 
             2000, 2001, 2002, 2003]}
data2 = {
    'chrom' : 'chrom_2',
    'pos' : [30000, 30001, 30002, 30003, 30004, 
             40000, 40001, 40002, 40003, 40004, 40005]}
df = pd.DataFrame(data1).append( pd.DataFrame(data2) )

Second, create two helper functions:

  • rank is a sequential counter for each group;
  • key is constant for positions in a contiguous 'run' of positions.
df['rank'] = df.groupby('chrom')['pos'].rank(method='first')
df['key'] = df['pos'] - df['rank']

Third, group by chrom and key to create a groupby object for each contiguous block of positions, then use min and max to find start and end value for the positions.

result = (df.groupby(['chrom', 'key'])['pos']
            .agg(['min', 'max'])
            .droplevel('key')
            .rename(columns={'min': 'start', 'max': 'end'})
         )
print(result)

         start    end
chrom                
chrom_1   1000   1002
chrom_1   2000   2003
chrom_2  30000  30004
chrom_2  40000  40005

Upvotes: 1

jsmart
jsmart

Reputation: 3001

You created list comprehensions to transform 'Fred' and 'map'; I converted these list comps to functions, and wrapped the functions in np.vectorize().

def func_p(p):
    return (sum(map(ord, p)) - len(p) * 33) / len(p)

def func_m(m):
    return (sum(map(ord, m)))  / len(m)

vec_func_p = np.vectorize(func_p)
vec_func_m = np.vectorize(func_m)

np.vectorize() does not make code faster, but it does let you write a function with scalar inputs and outputs, and convert it to a function that takes array inputs and outputs.

The benefit is that we can now pass pandas Series to these functions (I also added the type conversion to this step):

df.loc[:, 'phred'] = vec_func_p( df.loc[:, 'phred']).astype(np.int32)
df.loc[:, 'map'] = vec_func_m( df.loc[:, 'map']).astype(np.int32)

Replacing the list comprehensions with these new functions gives the same results as your version (33332 33333).

Upvotes: 1

Related Questions