Reputation: 83
I am trying to practice some Kafka producing / consuming and am trying to set up a simulated 'stream' of data. I have tried looping through with time.sleep(0.0000001) but it is too slow to catch the entries. Here is what I am trying to do:
offsets = acc_df.offset.unique()
time_start = time.time()
for x in offsets:
while time.time() - time_start != x:
if time.time() - time_start == x:
send_df = acc_df[acc_df['offset'] == x].to_dict()
for x in send_df:
send_data('accelerations', x)
else:
time.sleep(0.0000001)
If I am going about this the wrong way, please let me know!! Basically, once the elapsed time has reached the offset, I want to dump those rows to my Kafka topic.
Thanks in advance!
Edit: Here is some raw data that was requested instead of a picture
id ride_id uuid timestamp offset x y z timelapse filename
58682c5d48cad9d9e103431d773615bf c9a2b46c9aa515b632eddc45c4868482 19b9aa10588646b3bf22c9b4865a7995 25:03.9 0.822061 -0.994 0.045 -0.036 FALSE e2f795a7-6a7d-4500-b5d7-4569de996811.mov
58682c5d48cad9d9e103431d773615bf c9a2b46c9aa515b632eddc45c4868482 19b9aa10588646b3bf22c9b4865a7995 25:03.9 0.842061 -0.998 0.046 -0.04 FALSE e2f795a7-6a7d-4500-b5d7-4569de996811.mov
58682c5d48cad9d9e103431d773615bf c9a2b46c9aa515b632eddc45c4868482 19b9aa10588646b3bf22c9b4865a7995 25:03.9 0.862061 -0.999 0.047 -0.036 FALSE e2f795a7-6a7d-4500-b5d7-4569de996811.mov
58682c5d48cad9d9e103431d773615bf c9a2b46c9aa515b632eddc45c4868482 19b9aa10588646b3bf22c9b4865a7995 25:03.9 0.882061 -0.999 0.045 -0.034 FALSE e2f795a7-6a7d-4500-b5d7-4569de996811.mov
58682c5d48cad9d9e103431d773615bf c9a2b46c9aa515b632eddc45c4868482 19b9aa10588646b3bf22c9b4865a7995 25:03.9 0.902061 -0.999 0.048 -0.033 FALSE e2f795a7-6a7d-4500-b5d7-4569de996811.mov
Upvotes: 0
Views: 446
Reputation: 191738
Try playing with this example.
My computer outputs around 500 empty iterations before getting the first "offset", then about 15-20 loop iterations between each. Notice that I use <
rather than ==
since timestamp subtraction will need rounded to have micro-second precision...
import time
# order doesn't matter... List will be iterated each time through while loop
offsets = [0.822061, 0.842061, 0.862061, 0.882061, 0.902061]
time_start = time.time()
counter = 0 # for debugging
running = True
while running:
to_send = [o for o in offsets if o < time.time() - time_start]
if to_send == offsets:
running = False
print(f'{counter}:{to_send}') # TODO: replace with Kafka producer
time.sleep(1 / 1000)
counter += 1
Example output
1:[]
...
657:[]
658:[]
659:[]
660:[]
661:[0.822061]
662:[0.822061]
663:[0.822061]
...
675:[0.822061]
676:[0.822061]
677:[0.822061, 0.842061]
678:[0.822061, 0.842061]
679:[0.822061, 0.842061]
...
692:[0.822061, 0.842061]
693:[0.822061, 0.842061, 0.862061]
694:[0.822061, 0.842061, 0.862061]
...
720:[0.822061, 0.842061, 0.862061, 0.882061]
721:[0.822061, 0.842061, 0.862061, 0.882061]
722:[0.822061, 0.842061, 0.862061, 0.882061]
723:[0.822061, 0.842061, 0.862061, 0.882061, 0.902061]
If you want to limit to data between two timestamps, you'll need to store the timestamp each time over the loop. For example, when you get to 0.822061
, pop and subtract that value from the remaining values in the list (e.g. 0.842061 - 0.822061 = 0.02
) and reset the start time. Then the loop is still running
and after 0.02
seconds, you'll pop the next event as you did for the first event, and so on.
Couldn't make a good example of that without duplicating offset values, so deleting the elements from the list is another solution.
import time
offsets = [0.822061, 0.842061, 0.862061, 0.882061, 0.902061]
time_start = time.time()
to_send = [] # buffer data between time.time() calls
counter = 0 # for debugging
running = True
while running:
if not offsets:
running = False
break
to_send.clear()
for i, o in enumerate(offsets):
if o < time.time() - time_start:
# build the buffer and remove from the main list
to_send.append(o)
del offsets[i]
print(f'{counter}:{to_send}') # TODO: replace with Kafka producer
time.sleep(1 / 1000)
counter += 1
Regarding Kafka itself - Datagen source connector can simulate streaming data loads
Upvotes: 1