Reputation: 413
I'm new to databricks so hope my question is not too off. I'm trying to run the following sql pushdown query in databricks notebook to get data from an on-premise sql server using following python code:
pushdown_query1 = """(select * from dbo.myTabel X
INNER JOIN
(select Interval_Time, max(Run_Time) as Run_Time
from dbo.myTabel
group by Interval_Time) Y
on X.Interval_Time = y.Interval_Time
and X.Run_Time = y.Run_Time
WHERE RUNTYPE='TYPE_A') df"""
pushdown_query2 = """(select * from dbo.myTabel where RUNTYPE='TYPE_A') df"""
# connection information
jdbc_hostname = '<...>'
database = '<..>'
port = '1433'
username = '<...>'
password = '<...>'
connection_details = {
"user": username,
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(jdbc_hostname, port, database)
# load stpas regionsolution data
df = spark.read.format("jdbc") \
.option("url",jdbc_url) \
.option("dbtable", pushdown_query1 )\
.option("user", username) \
.option("password", password) \
.load()
df.show(100, truncate=False)
I don't have any issue when I choose pushdown_query2
and I get what I want, but pushdown_query1
returns an error, which I'm not sure if it's because I have JOINs and Sub-queries in the statement or there are other issues.
error after choosing pushdown_query1
:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-2825826808916915> in <module>
18 .option("dbtable", query)\
19 .option("user", username) \
---> 20 .option("password", password) \
21 .load()
22
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o586.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: The column 'Interval_Time' was specified multiple times for 'df'.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1632)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:600)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:522)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:247)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:222)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:444)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:373)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 1
Views: 2207
Reputation: 3008
You are getting the error because you are doing the join on the same table and using '*' in the select statement. If you specify the columns explicitly based on the aliases you specify for each queries then you won't see the error that you are getting.
In your case the column Interval_Time
seems to be getting duplicated as you are selecting that in the both the queries used in the joins. So specify the columns explicitly and it should work.
Upvotes: 1