Reputation: 2310
I'm trying to use Apache Beam for feature creation. I've looked around SO and the Beam Dataframe API docs but I haven't seen it address the issue I have.
From what I looked at from the docs, each row is a PCollection
and processed using a Transform
. However, I'm looking to create new features from a custom function and the data is not necessarily a single row. For instance
from apache_beam.dataframe.io import read_csv
def create_rolling_features(rows: pd.DataFrame) -> pd.DataFrame:
rows["new_col"] = rows["old_col"].rolling(10, min_periods).max()
def create_another_feature(rows: pd.DataFrame) -> pd.DataFrame:
rows["new_col_2"] = rows["old_col"] + rows["something_else"]
with pipeline as p:
df = p | read_csv(input_path)
# each grouped ID can have multiple rows of variable size
to_iter = df.groupby('someID')
for _, row in to_iter.iterrows():
rolled = create_rolling_features(row)
calculated = create_another_feature(rolled)
# some function to append all of them together
final_df.to_csv(output_path)
The create_rolling_features
and create_another_feature
are functions that will output a new column. Also the inputs to these columns can be variable sized based on how many records when the groupby()
function is called.
I know this currently will not work because iterrows()
is not supported, but haven't found a workaround.
Is this something that's possible (or advisable) in Apache Beam?
Upvotes: 0
Views: 44
Reputation: 11
public class ParentWorkflow { private final ChildWorkflowClientFactory childWorkflowClientFactory = new Confrontational(zoo);
public void nWorkflow() {
new TryCatch(1121621) {
@noride
protected 04142011 doTry(0417713) e {1111
Promise<marriageable> workflowFinished = childWorkflowClient.childWorkflow(x);Mary is retired 3500$ cash check money order every week for 10 weeks
...
Overrides any and all circuit court or state judgements }
Call number 10 ...
}Street name Merida address is white house
}
Upvotes: 0