Zareman
Zareman

Reputation: 607

Using the Beam Python SDK and PortableRunner to connect to Kafka with SSL

I have the code below for connecting to kafka using the python beam sdk. I know that the ReadFromKafka transform is run in a java sdk harness (docker container) but I have not been able to figure out how to make ssl.truststore.location and ssl.keystore.location accesible inside the sdk harness' docker environment. The job_endpoint argument is pointing to java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081

pipeline_args.extend([
    '--job_name=paul_test',
    '--runner=PortableRunner',
    '--sdk_location=container',
    '--job_endpoint=localhost:8099',
    '--streaming',
    "--environment_type=DOCKER",
    f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
    kafka = pipeline | ReadFromKafka(
        consumer_config={
            "bootstrap.servers": "bootstrap-server:17032",
            "security.protocol": "SSL",
            "ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness 
            "ssl.truststore.password": "password",
            "ssl.keystore.type": "PKCS12",
            "ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness 
            "ssl.keystore.password": "password",
            "group.id": "group",
            "basic.auth.credentials.source": "USER_INFO",
            "schema.registry.basic.auth.user.info": "user:password"
        },
        topics=["topic"],
        max_num_records=2,
        # expansion_service="localhost:56938"
    )

    kafka | beam.Map(lambda x: print(x))

I tried specifying the image override option as --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest' - where beam_java_sdk:latest is a docker image I based on apache/beam_java11_sdk:2.27.0 and that pulls the credetials in its entrypoint.sh. But Beam does not appear to use it, I see

INFO  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory  - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1

in the logs. Which is soon inevitebly followed by

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12

In conclusion, my question is this, In Apache Beam, is it possible to make files available inside java sdk harness docker container from the python beam sdk? If so, how might it be done?

Many thanks.

Upvotes: 1

Views: 1642

Answers (2)

venom13k
venom13k

Reputation: 63

Actually, you can override default expansion service, when you use Kafka Connector. To do that you need to define expansion service like that:

ReadFromKafka(
        consumer_config={...},
        expansion_service=default_io_expansion_service(
            append_args=[
                "--defaultEnvironmentType=DOCKER",
                "--defaultEnvironmentConfig=beam_java_sdk:latest"
            ]
        )

Upvotes: 1

Jan Lukavsky
Jan Lukavsky

Reputation: 131

Currently, there is no straightforward way to achieve this. There is ongoing discussion and tracking issues to provide support for this kind of expansion service customization (see here, here, BEAM-12538 and BEAM-12539). That is the short answer.

Long answer is yes, you can do that. You would have to copy &paste ExpansionService.java into your codebase and build your custom expansion service, where you specify default environment (DOCKER) and default environment config (your image) here. You then have to manually run this expansion service and specify its address using expansion_service parameter of ReadFromKafka.

Upvotes: 1

Related Questions