Reputation: 1524
I am trying to process data with 60 minutes session interval using Apache Beam Python SDK. But the actual session interval was inaccurate such as 3:00:00
or 1:01:00
or 1:50:00
when I run my application.
Would you help me find a solution to fix this issue and process data with 60 minutes session?
I built my pipeline as bellow.
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
| "Convert" >> ParDo(Convert())
| "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, get_timestamp_from_element(x).timestamp()))
| "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
| "Apply Session Window" >> WindowInto(window.Sessions(known_args.session_interval))
| "Group" >> GroupByKey()
| "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
)
result = pipeline.run()
result.wait_until_finish()
session_interval
(60 minutes) is provided as bellow.
parser.add_argument(
"--session_interval",
help="Interval of each session",
default=60*60) # 60 mins
WriteToCSV
function process data per session. I logged the session duration but it was not accurate.
class WriteToCSV(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, element, window=DoFn.WindowParam):
window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
duration = window_end - window_start
logging.info(">>> new %s record(s) in %s session (start %s end %s)", len(click_records), duration, window_start, window_end)
....
Then I got this log messages when I run this application locally with DirectRunner.
new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 05:00:00)
new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 03:03:00)
new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 05:00:00)
I also deployed the pipeline to Dataflow then got the same result.
new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end 2018-10-19 13:00:00)
new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end 2018-10-19 11:03:00)
new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end 2018-10-19 13:00:00)
Upvotes: 1
Views: 203
Reputation: 2825
In your beam pipeline, the variable ``known_args.session_intervalin
window.Sessions` defined the gap duration i.e. the duration in which if no further events come for a specific key the window is closed. Each session can be of different start and end duration based on the number of events that are processed by the pipeline for a given key. This is explained pictorially here
For example
Key 1 - 10:00 AM ----|
Key 1 - 10:45 AM |
Key 1 - 11:30 AM |====> One Session Window for Key 1 of Duration 4hours 30 minutes
Key 1 - 12:15 PM |
Key 1 - 01:00 PM ----|
Key 1 - 02:30 PM =========> Start of new session window for Key 1
Key 2 - 10:00 AM-----|
Key 2 - 10:30 AM |====> One Session window for key 2 of Duration 1:00 hour
Key 2 - 11:00 PM-----|
Key 2 - 12:30 PM =========> Start of new session window for Key 2
If you are interested in grouping and processing events every 60 minutes then you need to use FixedWindows.
Upvotes: 1