Adrian Chang Alcover
Adrian Chang Alcover

Reputation: 31

Deploy a Python Flink application on AWS Kinesis

I am trying to deploy a Python Flink application on AWS Kinesis Data Analytics. I followed the official documentation on https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-python-creating.html

I want to create a source table using the TableAPI that read from Confluent Kafka and deserialize the messages using avro-confluent format.

Following the connectors documentation https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/ and https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/ I will need to include two jar files as dependencies. But the property jarfile on

"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },

it seems to accept only one jar file as dependencies.

Any idea on how to include these two jar files as dependecies ?

Thank you

Upvotes: 3

Views: 1669

Answers (2)

felipe
felipe

Reputation: 8055

Documentation on this is quite sparse, specially for those of us coming from a Python background. Here are the important points (after hours of research...):

Packaging Instructions for PyFlink on Kinesis Data Analytics

If you have multiple dependencies, you have to create a fat jar and then include it using the jarfile property as described below. This is a Flink requirement as described here.

You must create a fat/uber/large jar (a large .jar file with all dependencies included inside of it.)

Transform table connector/format resources

Flink uses Java’s Service Provider Interfaces (SPI) to load the table connector/format factories by their identifiers. Since the SPI resource file named org.apache.flink.table.factories.Factory for every table connector/format is under the same directory META-INF/services, these resource files will override each other when build the uber-jar of the project which uses more than one table connector/format, which will cause Flink to fail to load table connector/format factories.

In this situation, the recommended way is transforming these resource files under the directory META-INF/services by ServicesResourceTransformer of maven shade plugin. Given the pom.xml file content of example that contains connector flink-sql-connector-hive-3.1.3 and format flink-parquet in a project.

<code>

After configured the ServicesResourceTransformer, the table connector/format resource files under the directory META-INF/services would be merged rather than overwritten each other when build the uber-jar of above project.

When you create your uber jar, you must make sure that the META-INF/services is merged for all depedencies - not overwritten. Some techniques for combining .jar files out there (e.g. unzip .jar files to same directory, then re-zip directory) don't work.

Packaging dependencies with your usercode with Maven

To provide these dependencies not included by Flink we suggest two options with Maven.

  1. The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies. The assembly configuration is straight-forward, but the resulting jar might become bulky. See maven-assembly-plugin for further information.
  2. The maven unpack plugin unpacks the relevant parts of the dependencies and then packages it with your code.

In this portion of the docs they recommend using maven-assembly-plugin to do this, but in the previous portion ("Transform table connector/format resources") they provide a better solution. See below for answer.


So in your root directory add a pom.xml file and add the following:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>myProject</artifactId> <! -- PROJECT NAME -->
    <version>1.0</version> <! -- PROJECT VERSION -->

    <dependencies>
        
        <! -- YOUR DEPENDENCIES HERE -->

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services
                                files -->
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Where it says 'YOUR DEPENDENCIES HERE', you want to add what you are using. This is found throughout the Flink documentation. In my case, I needed to add Confluent Avro Format, so I added the given Maven dependency in that section (along side other things);

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.14.4</version>
</dependency><dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.14.4</version>
</dependency>

Once the pom.xml file has all needed dependencies, running

mvn package

Creates a new folder target/ that will have a myProject.jar file. That is the jar file that I then use with my Flink application. With the following directory structure:

.
├── README.md
├── compile.sh
├── insights
│   └── main.py
└── pom.xml

Running the following for compile.sh (my project name is insights):

# Clean up of previous build.
rm insights.zip

echo "Building Insights..."
sleep 1

# Build the project.
mvn package

# Housekeeping.
mkdir insights/lib
mv target/insights-1.0.jar insights/lib/dependencies.jar

# Zip the project.
cd insights
zip -r ../insights.zip * lib/*

# Clean-up build
cd ../
rm -rf dependency-reduced-pom.xml insights/lib/ target

I get the following output directory:

.
├── README.md
├── commands.sh
├── insights
│   └── main.py
├── insights.zip
└── pom.xml

Where insights.zip is the project I then upload to AWS.

Upvotes: 2

dolfno
dolfno

Reputation: 21

According to some hidden documentation here: https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/python/PythonPackages

"If you have multiple dependencies, you have to create a fat jar and then include it using the jarfile property as described below. This is a Flink requirement as described here."

This restriction seems to be the cause of the kinesis property PropertyMap.jarfile using the flink command line argument --jarfile.

For me this worked after building a new java project/jar file combining a kinesis, jdbc and postgres connector together.

Upvotes: 2

Related Questions