Reputation: 3911
In an usual structured_kafka_wordcount.py code,
When I split lines into words by udf
like below,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
the warning will keep showing:
WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894
On the other hand, when I split the lines into words by pyspark.sql.functions.split
, everything works well.
words = lines.select(
explode(
split(lines.value, ' ')
)
)
Why this happened and how to fix the warning?
This is the code I am trying to execute in practice:
pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
Upvotes: 8
Views: 6153
Reputation: 35229
Other than rejecting Python UDFs *, there is nothing you can do about this problem in you code. As you can read in the exception message UninterruptibleThread
is a workaround to Kafka bug (KAFKA-1894) and is designed to prevent infinite loop, when interrupting KafkaConsumer
.
It is not used with PythonUDFRunner
(it probably wouldn't makes sense, to introduce special case there).
Personally I wouldn't worry about it unless you experience some related issues. Your Python code will never interact directly with KafkaConsumer
. And if you experience any issues, there should fixed upstream - in that case I recommend creating a JIRA ticket.
* Your unfold
function can be rewritten with SQL functions, but it will be a hack. Add message count as integer:
from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when
p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
lines = spark.createDataFrame(
["asd message repeated 3 times: [ 12]", "some other message"], "string"
)
lines_with_count = lines.withColumn(
"message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))
Use it to explode
exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
and extract:
exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)
# +------------------+
# |value |
# +------------------+
# |asd 12 |
# |asd 12 |
# |asd 12 |
# |some other message|
# +------------------+
Upvotes: 6