Reputation: 61
I am trying to create a DataStream of type bytes in PyFlink. The following code produces an error:
#!/bin/python
# -*- coding: utf-8 -*-
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
# create Stream environment for execution (DAG)
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
message = 'Python is fun'
# convert string to bytes
byte_message = bytes(message, 'utf-8')
ds = env.from_collection(collection=[byte_message], type_info=Types.PRIMITIVE_ARRAY(Types.BYTE())).print()
env.execute()
Error:
ds = env.from_collection([byte_message], type_info=Types.PRIMITIVE_ARRAY(Types.BYTE())).print()
File "/home/user/Flink/Python_Projects/pyflink20/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 1009, in from_collection
return self._from_collection(collection, type_info)
File "/home/user/Flink/Python_Projects/pyflink20/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 1026, in _from_collection
j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name)
File "/home/user/Flink/Python_Projects/pyflink20/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/user/Flink/Python_Projects/pyflink20/lib/python3.10/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/home/user/Flink/Python_Projects/pyflink20/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.api.common.python.PythonBridgeUtils.readPythonObjects.
: java.lang.ClassCastException: class [B cannot be cast to class [Ljava.lang.Object; ([B and [Ljava.lang.Object; are in module java.base of loader 'bootstrap')
at org.apache.flink.api.common.python.PythonBridgeUtils.getObjectArrayFromUnpickledData(PythonBridgeUtils.java:83)
at org.apache.flink.api.common.python.PythonBridgeUtils.lambda$readPythonObjects$0(PythonBridgeUtils.java:125)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1242)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at org.apache.flink.api.common.python.PythonBridgeUtils.readPythonObjects(PythonBridgeUtils.java:131)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:840)
However, the official Flink documentation points out that the casting is correct: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/python/datastream/data_types/
PyFlink Array Type | Python Type | Java Type |
---|---|---|
Types.PRIMITIVE_ARRAY(Types.BYTE()) | bytes | byte[] |
I am using Python 3.10.13 and Java 17. What am I missing or is it really a bug?
Upvotes: 0
Views: 42
Reputation: 682
You probably need to remove []
in collection
ds = env.from_collection(collection=message, type_info=Types.PRIMITIVE_ARRAY(Types.BYTE()))
ds.print()
will give:
[B@1f85c447
[B@3d8db04e
[B@27941843
[B@41247101
[B@7032229e
[B@206df316
[B@27bcb158
[B@457260d7
[B@24359c35
[B@6b0f2ee6
[B@29fb74ae
[B@7f59dd9e
[B@77052274
or making a Row:
ds = env.from_collection(collection=[(1, message), (2, message), (3, message)], type_info=Types.ROW([Types.INT(), Types.PRIMITIVE_ARRAY(Types.BYTE())]))
ds.print()
Result:
+I[1,b'Python is fun']
+I[2,b'Python is fun']
+I[3,b'Python is fun']
By the way
The parameter type_info is optional, if not specified, the output type of the returned DataStream will be Types.PICKLED_BYTE_ARRAY()
so you can simply do
ds = env.from_collection([message])
ds.print()
and you'll have
Python is fun
Upvotes: 0