Reputation: 708
I am unable to convert the Streamz stream to Dask Stream which is generated using Kafka source.PFB code
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
{'bootstrap.servers': 'kafkaXX:9092',
'group.id': 'streamz'})
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()
I am getting this error message
ValueError: Two different event loops active
Upvotes: 2
Views: 478
Reputation: 28684
The kafka source, if not otherwise instructed, will start its own event loop in a thread. The call to Client()
also does this. To pass the loop from the one to the other, you can do
Stream.from_kafka(..., loop=client.loop)
Note that the call to .scatter()
also needs explicit access to an event loop, but since this is dask-specific, it knows to use the loop of any client you have active.
Upvotes: 4