Sean Hagen
Sean Hagen

Reputation: 752

Apache Beam: Use S3 bucket for Snowflake CSV output when using apache_beam.io.snowflake.ReadFromSnowflake in Python

I'm trying to figure out how to work around an issue I've encountered with the Snowflake connector in Python.

Basically, the issue boils down to the fact that to use Snowflake in Python it seems you're required to use Google Cloud Storage to store the temporary CSV files that get generated. However, my requirements are that I need to use AWS S3 for this -- GCS is out of the question.

Right now I'm just running my pipieline using the DirectRunner, but eventually I'll need to be able to run Beam on Spark or Flink. So having a solution that works for all three runners would be fantastic.

I've got everything else set up properly, using something like this:

def learning(
    beam_options: Optional[S3Options] = None,
    test: Callable[[beam.PCollection], None] = lambda _: None,
) -> None:
    with beam.Pipeline(options=beam_options) as pipeline:
        snowflakeRows = pipeline | "Query snowflake" >> ReadFromSnowflake(
            server_name="<server>",
            database="production_data",
            schema="public",
            staging_bucket_name="s3://my-bucket/",
            storage_integration_name="s3_integration_data_prod",
            query="select * from beam_poc_2025",
            csv_mapper=snowflake_csv_mapper,
            username="<work-email>",
            password='<password>',
        )

        snowflakeRows | "Printing Snowflake" >> beam.Map(print)

I know that this successfully authenticates to Snowflake because I get a Duo push notification to approve the login. It just errors out later on with this stack trace:

Traceback (most recent call last):                                                                                                                                                                                                           
  File "/home/sean/Code/beam-poc/main.py", line 43, in <module>                                     
    app.learning(beam_options=beam_options)                                                                           
  File "/home/sean/Code/beam-poc/example/app.py", line 205, in learning                                                                                                                                                      
    with beam.Pipeline(options=beam_options) as pipeline:                                                                                                                                                                                    
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                          
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/pipeline.py", line 620, in __exit__                                                                                                     
    self.result = self.run()                                                                                                                                                                                                                 
                  ^^^^^^^^^^                                                                                          
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/pipeline.py", line 594, in run                                                                                                          
    return self.runner.run_pipeline(self, self._options)                                                              
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                              
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/direct/direct_runner.py", line 184, in run_pipeline                                                                             
    return runner.run_pipeline(pipeline, options)                                                                     
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                     
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 195, in run_pipeline                                                              
    self._latest_run_result = self.run_via_runner_api(                                                                
                              ^^^^^^^^^^^^^^^^^^^^^^^^     
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 221, in run_via_runner_api                                                        
    return self.run_stages(stage_context, stages)                                                                                                                                                                                            
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 468, in run_stages                                                                
    bundle_results = self._execute_bundle(                                                                            
                     ^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                   
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 793, in _execute_bundle                                                           
    self._run_bundle(                                                                                                                                                                                                                        
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1032, in _run_bundle                                                              
    result, splits = bundle_manager.process_bundle(        
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^        
  File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1398, in process_bundle                                                           
    raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: No filesystem found for scheme s3                                                                                                              
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)                                 
        at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn$DoFnInvoker.invokeProcessElement(Unknown Source)            
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)                
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)                                                                                             
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)                                                                                             
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)                             
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)                           
        at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695)                                                                                                               
        at org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)                          
        at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)              
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)                
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)                                                                                             
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)                                                                                             
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWindowedValue(FnApiDoFnRunner.java:2725)
        at org.apache.beam.sdk.transforms.Reshuffle$RestoreMetadata$1.processElement(Reshuffle.java:187)
        at org.apache.beam.sdk.transforms.Reshuffle$RestoreMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source) 
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695)
        at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:123)
        at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172) 
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:136)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:552)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1570) 
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme s3
        at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:557)
        at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:129)
        at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:150)
        at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:162)
        at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:753)

Which seems to be trying to say that the expansion service doesn't have s3 registered as a valid filesystem scheme.

Is there a way to register s3 as a filesystem scheme for the expansion service from within a Python pipeline? Or will this require writing my own custom expansion service?

My preferred solution would be that I add some code to the Python pipeline so that it registers s3 as a schema. Mostly because I'm working on a proof-of-concept right now, and writing a custom expansion service seems like it'd take longer than I want to spend on a PoC.

Upvotes: 0

Views: 10

Answers (0)

Related Questions