Reputation: 6572
I have an existing Apache Beam project with Java 8, Apache Beam 2.27.0, Maven and Dagger 2.
I migrated this project in Kotlin : Kotlin JDK 8 with version 1.5.0.
I used the 1.5.0 version of Kotlin because the 1.4.3 had an issue with Beam and Maven plugin (Could not read class: VirtualFile : Kotlin 1.4.30 Apache beam compilation error)
Everything seems to be good except the use of native MapElement or FlatMapElement with Typedescriptor and lambda expression.
A part of my pom.xml file
<properties>
<beam.version>2.27.0</beam.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.version>1.5.0</kotlin.version>
<serialization.version>1.2.0</serialization.version>
<java.version>1.8</java.version>
<dagger.version>2.35.1</dagger.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<properties>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-serialization-json</artifactId>
<version>${serialization.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-redis</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>kapt</id>
<goals>
<goal>kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>compile</id>
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-kapt</id>
<goals>
<goal>test-kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
<sourceDir>target/generated-sources/kapt/test</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
<configuration>
<compilerPlugins>
<plugin>kotlinx-serialization</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
An object that implements Serializable (java.io)
data class MyObject(
val field: String = ""
) : Serializable {
And basically i want to execute a FlatMapElement with Typedescriptor and a lambda (behind the scene a SerializableFunction)
class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
PTransform<PBegin, PCollection<MyObject>>() {
override fun expand(input: PBegin): PCollection<MyObject> {
return input
.apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*"))
.apply(
FlatMapElements.into(of(MyObject::class.java))
.via(SerializableFunction<KV<String, String>, List<MyObject>> { toMyObjects(it) })
)
}
fun toMyObjects(entry: KV<String, String>): List<MyObject> {
val key = entry.key
val value = entry.value
val ref = object : TypeReference<List<MyObject>>() {}
return OBJECT_MAPPER.readValue(value, ref)
}
I volontary changed the code and put some part of code in method "toMyObjects" in order to give the maximum of elements. The "OBJECT_MAPPER" object is a Jackson Object Mapper.
With Java 8 and Beam 2.27.0 this basic code works perfectly fine.
With Kotlin this code doesn't works with the following error :
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218)
at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at myPackage.MyApp.main (MyApp.kt:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.io.NotSerializableException: Non-serializable lambda
at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project:
An exception occured while executing the Java class. unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}: Non-serializable lambda -> [Help 1]
The SerializableUtils.serializeToByteArray method in Beam sdk sends this error : java.io.NotSerializableException: Non-serializable lambda
MyObject is Serializable and the lambda is wrapped in a Beam SerializableFunction (function that implements Serializable).
Normally in this case, Beam take a SerializableCoder from the Serializable object. I don't understand why Beam saw the lambda as non Serializable.
I don't have this kind of behaviour directly in Java.
I precise, if i replace the FlatMapElement/descriptor/lambda by a ParDo.of(DoFn), this works fine, but in some cases for a better concision and readabilty, i want to use the built in MapElement and FlatMapElement with lambda expressions.
Thanks in advance for your help.
Upvotes: 0
Views: 601
Reputation: 11806
Try this workaround if you want to use Kotlin 1.5 :
use -Xsam-conversions=class
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<configuration>
<args>
<arg>-Xsam-conversions=class</arg>
</args>
</configuration>
</plugin>
</plugins>
Reference : https://youtrack.jetbrains.com/issue/KT-46359#focus=Comments-27-4862857.0-0
Upvotes: 1
Reputation: 6572
Finally i found the solution, i downgraded the Kotlin version (dependencies + plugin) to 1.4.21.
In this case the problem of Lambda non Serializable disapeared and the kotlin Maven plugin doesn't have the virtual file problem at compile time : Kotlin 1.4.30 Apache beam compilation error
This topic helped me a lot, thanks : https://youtrack.jetbrains.com/issue/KT-45067
Maybe in the future it would be great, if the Kotlin maven plugin works correctly with the version 1.4.x greater than 1.4.21.
Beam developers with Kotlin and Maven must be carreful with this issue, 1.4.32 doesn't compile with Beam and 1.5.0 have a problem at runtime with Lambda non Serializable.
Upvotes: 1
Reputation: 6572
When i replace the lambda by a class that implements the SerializableFunction function, this works
class MapString : SerializableFunction<KV<String, String>, List<MyObject>> {
override fun apply(input: KV<String, String>): List<MyObject> {
....
}
}
I keep the issue opened because i want to have a solution working with lambda expressions.
Upvotes: 0