Amit_Hora
Amit_Hora

Reputation: 708

Unable to convert Streamz Kafka Stream to Dask Stream

I am unable to convert the Streamz stream to Dask Stream which is generated using Kafka source.PFB code

from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
       {'bootstrap.servers': 'kafkaXX:9092',
        'group.id': 'streamz'}) 
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()

I am getting this error message

ValueError: Two different event loops active

Upvotes: 2

Views: 478

Answers (1)

mdurant
mdurant

Reputation: 28684

The kafka source, if not otherwise instructed, will start its own event loop in a thread. The call to Client() also does this. To pass the loop from the one to the other, you can do

Stream.from_kafka(..., loop=client.loop)

Note that the call to .scatter() also needs explicit access to an event loop, but since this is dask-specific, it knows to use the loop of any client you have active.

Upvotes: 4

Related Questions