John Stud
John Stud

Reputation: 1779

PySpark: Generating Data in Pandas Very Slow

I need to generate some data in PySpark and I am currently using PySpark pandas to do it. What I have found is that when I want to use .repeat() to scale my data generating process, it is very, very slow (tens of minutes).

Are there any other alternatives that I can use to generate a dataframe of sorts like as follows?

import pyspark.pandas as ps

# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8, ...]
dStates = ['A', 'B', 'C', 'D', ....]

# delta time
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)

# create DF
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').repeat(  len(dStates) * len(IDs)  )
stateSet = ps.Series( dStates * ( delta_time + 1 ) * len(IDs) )
nodeSet = ps.Series(IDs).repeat( len(dStates) * ( delta_time + 1 ) ).reset_index(drop=True)

# combine
tseries = ps.DataFrame({'monthlyTrend': timeSet.astype(str),
                   'FromState': stateSet,
                  'ID': nodeSet})

Upvotes: 2

Views: 312

Answers (1)

viggnah
viggnah

Reputation: 1879

Usually numpy functions are more optimized, so you could try using numpy.repeat(). I have tweaked the below code to generate dates day by day in a range and adjust IDs and dStates according to the timeList's length:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']

# Generate data based on params
timeList = np.arange(datetime(2022, 4, 1), datetime(2022, 7, 1), timedelta(days=1)).astype(datetime)

stateList = np.repeat(dStates, len(timeList)//len(dStates))
stateList = np.append(stateList, dStates[:len(timeList)%len(dStates)]) # this ensures the lengths remain the same

nodeList = np.repeat(IDs, len(timeList)//len(IDs))
nodeList = np.append(nodeList, IDs[:len(timeList)%len(IDs)])

# combine
tseries = pd.DataFrame({
    'monthlyTrend': timeList.astype(str),
    'FromState': stateList,
     'ID': nodeList
})

df = spark.createDataFrame(tseries)

Update

Here is another approach that uses explode() and array_repeat to achieve the above using only pyspark functions. We first create a dataframe that is as long as your longest list of params (in the example it's IDs). Then use pyspark functions to expand it.

from pyspark.sql import functions as F
import pyspark.pandas as ps

# params
start_time = '2022-04-01'
end_time = '2022-07-01'
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').tolist()

IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']

# create a minimum length DF aligned to the longest list of params
longest_list = IDs
timeSet = ps.concat([ps.Series(timeSet * (len(longest_list)//len(timeSet))), ps.Series(timeSet[:len(longest_list)%len(timeSet)])], ignore_index=True)
stateSet = ps.concat([ps.Series(dStates * (len(longest_list)//len(dStates))), ps.Series(dStates[:len(longest_list)%len(dStates)])], ignore_index=True)
nodeSet = ps.Series(IDs)

# combine
df_tseries = ps.DataFrame({
    'monthlyTrend': timeSet,
    'FromState': stateSet,
    'ID': nodeSet}).to_spark()

# expand the df with explode and array_repeat
no_of_repeats = 10
df_tseries = df_tseries.withColumn("ID", F.explode(F.array_repeat("ID", no_of_repeats)))

Upvotes: 1

Related Questions