Reputation: 31
I am trying to deploy a Python Flink application on AWS Kinesis Data Analytics. I followed the official documentation on https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-python-creating.html
I want to create a source table using the TableAPI that read from Confluent Kafka and deserialize the messages using avro-confluent format.
Following the connectors documentation https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/ and https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/ I will need to include two jar files as dependencies. But the property jarfile
on
"ApplicationConfiguration": {
"EnvironmentProperties": {
"PropertyGroups": [
{
"PropertyGroupId": "kinesis.analytics.flink.run.options",
"PropertyMap": {
"python": "MyApplication/main.py",
"jarfile": "MyApplication/lib/myJarFile.jar",
"pyFiles": "MyApplication/lib/myDependentFile.py",
"pyArchives": "MyApplication/lib/myArchive.zip"
}
},
it seems to accept only one jar file as dependencies.
Any idea on how to include these two jar files as dependecies ?
Thank you
Upvotes: 3
Views: 1669
Reputation: 8055
Documentation on this is quite sparse, specially for those of us coming from a Python background. Here are the important points (after hours of research...):
Packaging Instructions for PyFlink on Kinesis Data Analytics
If you have multiple dependencies, you have to create a fat jar and then include it using the jarfile property as described below. This is a Flink requirement as described here.
You must create a fat/uber/large jar (a large .jar
file with all dependencies included inside of it.)
Transform table connector/format resources
Flink uses Java’s Service Provider Interfaces (SPI) to load the table connector/format factories by their identifiers. Since the SPI resource file named org.apache.flink.table.factories.Factory for every table connector/format is under the same directory META-INF/services, these resource files will override each other when build the uber-jar of the project which uses more than one table connector/format, which will cause Flink to fail to load table connector/format factories.
In this situation, the recommended way is transforming these resource files under the directory META-INF/services by ServicesResourceTransformer of maven shade plugin. Given the pom.xml file content of example that contains connector flink-sql-connector-hive-3.1.3 and format flink-parquet in a project.
<code>
After configured the ServicesResourceTransformer, the table connector/format resource files under the directory META-INF/services would be merged rather than overwritten each other when build the uber-jar of above project.
When you create your uber jar, you must make sure that the META-INF/services
is merged for all depedencies - not overwritten. Some techniques for combining .jar
files out there (e.g. unzip .jar files to same directory, then re-zip directory) don't work.
Packaging dependencies with your usercode with Maven
To provide these dependencies not included by Flink we suggest two options with Maven.
- The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies. The assembly configuration is straight-forward, but the resulting jar might become bulky. See maven-assembly-plugin for further information.
- The maven unpack plugin unpacks the relevant parts of the dependencies and then packages it with your code.
In this portion of the docs they recommend using maven-assembly-plugin
to do this, but in the previous portion ("Transform table connector/format resources") they provide a better solution. See below for answer.
So in your root directory add a pom.xml
file and add the following:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>myProject</artifactId> <! -- PROJECT NAME -->
<version>1.0</version> <! -- PROJECT VERSION -->
<dependencies>
<! -- YOUR DEPENDENCIES HERE -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services
files -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Where it says 'YOUR DEPENDENCIES HERE', you want to add what you are using. This is found throughout the Flink documentation. In my case, I needed to add Confluent Avro Format, so I added the given Maven dependency in that section (along side other things);
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.14.4</version>
</dependency><dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.14.4</version>
</dependency>
Once the pom.xml
file has all needed dependencies, running
mvn package
Creates a new folder target/
that will have a myProject.jar
file. That is the jar file that I then use with my Flink application. With the following directory structure:
.
├── README.md
├── compile.sh
├── insights
│ └── main.py
└── pom.xml
Running the following for compile.sh
(my project name is insights
):
# Clean up of previous build.
rm insights.zip
echo "Building Insights..."
sleep 1
# Build the project.
mvn package
# Housekeeping.
mkdir insights/lib
mv target/insights-1.0.jar insights/lib/dependencies.jar
# Zip the project.
cd insights
zip -r ../insights.zip * lib/*
# Clean-up build
cd ../
rm -rf dependency-reduced-pom.xml insights/lib/ target
I get the following output directory:
.
├── README.md
├── commands.sh
├── insights
│ └── main.py
├── insights.zip
└── pom.xml
Where insights.zip
is the project I then upload to AWS.
Upvotes: 2
Reputation: 21
According to some hidden documentation here: https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/python/PythonPackages
"If you have multiple dependencies, you have to create a fat jar and then include it using the jarfile property as described below. This is a Flink requirement as described here."
This restriction seems to be the cause of the kinesis property PropertyMap.jarfile
using the flink command line argument --jarfile
.
For me this worked after building a new java project/jar file combining a kinesis, jdbc and postgres connector together.
Upvotes: 2