Reputation: 80
I'm new in Apache Beam and I'm working on a job that will run in GCP Dataflow. I need to fetch some data from BigQuery transform it and write a CSV file with headers as result. But I've found myself in a funny scenario. See, the headers of my CSV file are dynamic, they depends on the data that is fetched from BigQuery. So, when I'm constructing the pipeline and trying to define the headers I find a problem, I don't have the headers yet:
somePCollection.apply("writing stuff", TextIO.write()
.to("gs://some_bucket/somefile_name")
.withSuffix(".csv").withHeader(I CAN'T SET THE HEADERS HERE BECAUSE I DON'T HAVE THEM));
Probably you wondering how the data looks like? At this point my Pcollection structure looks like this:
user_id, fst_name, lst_name, team_list
PCOllection Example:
1111,DANNY,CRUISE, TEAM34,TEAM12,TEAM4
2222,CARLOS,SMITH, TEAM34,TEAM44,TEAM12
33333,SASHA,CONOR, TEAM5,TEAM34,TEAM44
The expected CSV file with headers would look like this:
USER_ID,FST_NAME,LST_NAME,TEAM34,TEAM12,TEAM4,TEAM44,TEAM5
1111, DANNY, CRUISE, 1, 1, 1, 0, 0
2222, CARLOS, ,SMITH, 1, 1, 0, 1, 0
33333, SASHA, ,CONOR, 1, 0, 0, 1, 1
As you can see, in the headers I need all the unique teams as columns (obviously the columns can vary between executions) and each row would have 1 or 0 depending of the user is in that team or not.
It looks like the headers can only be defined at pipeline construction time.
I've been trying to find a way to "cheat" apache beam and accomplish this in a single pipeline, but I'm starting to think that the only way to get this is by executing a separated job/pipeline to "calculate" the headers and write them in somewhere so I can use them as input in other pipeline.
I refuse to think I'm the first person who has had to deal with this scenario, so I was wondering if somebody has any idea to solve this.
Doing this with plain Java is quite simple.... but with Apache Beam it's another story. I appreciate any help.
Upvotes: 2
Views: 782
Reputation: 1428
I don't think this can be accomplished with TextIO today. Sounds like you need some processing prior to grab all possible teams, not per-record, so it's not easy to pull that off going more custom with FileIO.
The separate pipeline should work fine, but you'll be reading all the data twice.
I'm not too familiar with Python SDK / Beam DataFrames yet, but what you are trying to do (one-hot encode) sounds reasonable to do with pandas and is even mentioned at Data pipeline for ML if a switch to Python is allowed.
Upvotes: 2