Reputation: 752
I'm trying to figure out how to work around an issue I've encountered with the Snowflake connector in Python.
Basically, the issue boils down to the fact that to use Snowflake in Python it seems you're required to use Google Cloud Storage to store the temporary CSV files that get generated. However, my requirements are that I need to use AWS S3 for this -- GCS is out of the question.
Right now I'm just running my pipieline using the DirectRunner
, but eventually I'll need to be able to run Beam on Spark or Flink. So having a solution that works for all three runners would be fantastic.
I've got everything else set up properly, using something like this:
def learning(
beam_options: Optional[S3Options] = None,
test: Callable[[beam.PCollection], None] = lambda _: None,
) -> None:
with beam.Pipeline(options=beam_options) as pipeline:
snowflakeRows = pipeline | "Query snowflake" >> ReadFromSnowflake(
server_name="<server>",
database="production_data",
schema="public",
staging_bucket_name="s3://my-bucket/",
storage_integration_name="s3_integration_data_prod",
query="select * from beam_poc_2025",
csv_mapper=snowflake_csv_mapper,
username="<work-email>",
password='<password>',
)
snowflakeRows | "Printing Snowflake" >> beam.Map(print)
I know that this successfully authenticates to Snowflake because I get a Duo push notification to approve the login. It just errors out later on with this stack trace:
Traceback (most recent call last):
File "/home/sean/Code/beam-poc/main.py", line 43, in <module>
app.learning(beam_options=beam_options)
File "/home/sean/Code/beam-poc/example/app.py", line 205, in learning
with beam.Pipeline(options=beam_options) as pipeline:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/pipeline.py", line 620, in __exit__
self.result = self.run()
^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/pipeline.py", line 594, in run
return self.runner.run_pipeline(self, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/direct/direct_runner.py", line 184, in run_pipeline
return runner.run_pipeline(pipeline, options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 195, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 221, in run_via_runner_api
return self.run_stages(stage_context, stages)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 468, in run_stages
bundle_results = self._execute_bundle(
^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 793, in _execute_bundle
self._run_bundle(
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1032, in _run_bundle
result, splits = bundle_manager.process_bundle(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sean/.local/share/mise/installs/python/3.12.9/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1398, in process_bundle
raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: No filesystem found for scheme s3
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695)
at org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWindowedValue(FnApiDoFnRunner.java:2725)
at org.apache.beam.sdk.transforms.Reshuffle$RestoreMetadata$1.processElement(Reshuffle.java:187)
at org.apache.beam.sdk.transforms.Reshuffle$RestoreMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1837)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3100(FnApiDoFnRunner.java:145)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2695)
at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:123)
at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:810)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:136)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:552)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme s3
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:557)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:129)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:150)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:162)
at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:753)
Which seems to be trying to say that the expansion service doesn't have s3
registered as a valid filesystem scheme.
Is there a way to register s3
as a filesystem scheme for the expansion service from within a Python pipeline? Or will this require writing my own custom expansion service?
My preferred solution would be that I add some code to the Python pipeline so that it registers s3
as a schema. Mostly because I'm working on a proof-of-concept right now, and writing a custom expansion service seems like it'd take longer than I want to spend on a PoC.
Upvotes: 0
Views: 10