Reputation: 1779
I need to generate some data in PySpark and I am currently using PySpark pandas to do it. What I have found is that when I want to use .repeat()
to scale my data generating process, it is very, very slow (tens of minutes).
Are there any other alternatives that I can use to generate a dataframe of sorts like as follows?
import pyspark.pandas as ps
# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8, ...]
dStates = ['A', 'B', 'C', 'D', ....]
# delta time
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)
# create DF
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').repeat( len(dStates) * len(IDs) )
stateSet = ps.Series( dStates * ( delta_time + 1 ) * len(IDs) )
nodeSet = ps.Series(IDs).repeat( len(dStates) * ( delta_time + 1 ) ).reset_index(drop=True)
# combine
tseries = ps.DataFrame({'monthlyTrend': timeSet.astype(str),
'FromState': stateSet,
'ID': nodeSet})
Upvotes: 2
Views: 312
Reputation: 1879
Usually numpy functions are more optimized, so you could try using numpy.repeat()
. I have tweaked the below code to generate dates day by day in a range and adjust IDs
and dStates
according to the timeList
's length:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']
# Generate data based on params
timeList = np.arange(datetime(2022, 4, 1), datetime(2022, 7, 1), timedelta(days=1)).astype(datetime)
stateList = np.repeat(dStates, len(timeList)//len(dStates))
stateList = np.append(stateList, dStates[:len(timeList)%len(dStates)]) # this ensures the lengths remain the same
nodeList = np.repeat(IDs, len(timeList)//len(IDs))
nodeList = np.append(nodeList, IDs[:len(timeList)%len(IDs)])
# combine
tseries = pd.DataFrame({
'monthlyTrend': timeList.astype(str),
'FromState': stateList,
'ID': nodeList
})
df = spark.createDataFrame(tseries)
Update
Here is another approach that uses explode()
and array_repeat
to achieve the above using only pyspark functions. We first create a dataframe that is as long as your longest list of params (in the example it's IDs
). Then use pyspark functions to expand it.
from pyspark.sql import functions as F
import pyspark.pandas as ps
# params
start_time = '2022-04-01'
end_time = '2022-07-01'
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').tolist()
IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']
# create a minimum length DF aligned to the longest list of params
longest_list = IDs
timeSet = ps.concat([ps.Series(timeSet * (len(longest_list)//len(timeSet))), ps.Series(timeSet[:len(longest_list)%len(timeSet)])], ignore_index=True)
stateSet = ps.concat([ps.Series(dStates * (len(longest_list)//len(dStates))), ps.Series(dStates[:len(longest_list)%len(dStates)])], ignore_index=True)
nodeSet = ps.Series(IDs)
# combine
df_tseries = ps.DataFrame({
'monthlyTrend': timeSet,
'FromState': stateSet,
'ID': nodeSet}).to_spark()
# expand the df with explode and array_repeat
no_of_repeats = 10
df_tseries = df_tseries.withColumn("ID", F.explode(F.array_repeat("ID", no_of_repeats)))
Upvotes: 1