Reputation: 23
I have this code t hat I'm using to execute queries in Dremio using pyarrow flight:
class DremioConnector:
env: str
auth_token: str
def __init__(self, env: str, auth_token: str):
self.env = env
self.auth_token = auth_token
def get_dremio_client(self):
dremio_url = get_url_from_template(DREMIO_URL, self.env)
return FlightClient(f"grpc+tls://{dremio_url}:*****", tls_root_certs=get_cert(),
middleware=[DremioClientAuthMiddlewareFactory(), CookieMiddlewareFactory()], **{}, )
def get_dremio_credentials(self):
...
...
return username, password
def __create_flight_call_options(self, username: str, password: str, client: FlightClient) -> FlightCallOptions:
headers: list[Any] = []
bearer_token = client.authenticate_basic_token(username, password, FlightCallOptions(headers=headers))
headers.append(bearer_token)
return FlightCallOptions(headers=headers)
def run_query(self, query: str, username: str, password: str, client: FlightClient) -> FlightStreamReader:
flight_desc = FlightDescriptor.for_command(query)
flight_info = client.get_flight_info(flight_desc, self.__create_flight_call_options(username, password, client))
reader = client.do_get(flight_info.endpoints[0].ticket, self.__create_flight_call_options(username, password, client))
return reader
connector = DremioConnector('env', "auth_token")
dremio_client = connector.get_dremio_client()
recently I've started getting this error: E pyarrow._flight.FlightInternalError: Could not finish writing before closing
when calling
bearer_token = client.authenticate_basic_token(username, password, FlightCallOptions(headers=headers))
Does anyone know what might be the issue? I have no idea why this error started popping up, since it was working before. Am I doing something wrong here?
Upvotes: 0
Views: 959
Reputation: 51
What is your support setting key value flight.client.readiness.timeout.millis
? You can try to set this key to higher value.
Upvotes: 0
Reputation: 150
Here is the code I use for connecting to Dremio using PyArrow, works pretty well for Dremio Cloud, hopefully this helps.
Simple PyArrow Client for Dremio
#----------------------------------
# IMPORTS
#----------------------------------
## Import Pyarrow
from pyarrow import flight
from pyarrow.flight import FlightClient
import duckdb
import pyarrow.dataset as ds
import polars as pl
import pandas as pd
class DremioConnection:
def __init__(self, token, location):
self.token = token
self.location = location
self.headers = [
(b"authorization", f"bearer {token}".encode("utf-8"))
]
self.client = FlightClient(location=(location))
def query(self, query, client, headers):
## Options for Query
options = flight.FlightCallOptions(headers=headers)
## Get ticket to for query execution, used to get results
flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options)
## Get Results (Return Value a FlightStreamReader)
results = client.do_get(flight_info.endpoints[0].ticket, options)
return results
# Returns a FlightStreamReader
def toArrow(self, query):
return self.query(query, self.client, self.headers)
#Returns a DuckDB Relation
def toDuckDB(self, querystring):
streamReader = self.query(querystring, self.client, self.headers)
table = streamReader.read_all()
my_ds = ds.dataset(source=[table])
return duckdb.arrow(my_ds)
#Returns a Polars Dataframe
def toPolars(self, querystring):
streamReader = self.query(querystring, self.client, self.headers)
table = streamReader.read_all()
df = pl.from_arrow(table)
return df
#Returns a Polars Dataframe
def toPolars(self, querystring):
streamReader = self.query(querystring, self.client, self.headers)
df = streamReader.read_pandas()
return df
Upvotes: 0