Reputation: 1209
I want to read filtered data from a Mysql instance using AWS glue job. Since a glue jdbc connection doesnt allow me to push down predicate, I am trying to explicitly create a jdbc connection in my code.
I want to run a select query with where clause against a Mysql database using jdbc connection as shown below
import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object TryMe {
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(sc)
val spark: SparkSession = glueContext.getSparkSession
// Read data into a DynamicFrame using the Data Catalog metadata
val t = glueContext.read.format("jdbc").option("url","jdbc:mysql://serverIP:port/database").option("user","username").option("password","password").option("dbtable","select * from table1 where 1=1").option("driver","com.mysql.jdbc.Driver").load()
}
}
It fails with error
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'select * from table1 where 1=1 WHERE 1=0' at line 1
Shouldn't this work? How do I retrieve filtered data using JDBC connection without reading the whole table into a data frame?
Upvotes: 5
Views: 14936
Reputation: 4829
this are 5 different code snippets that i tried for performance comparison, only 2 actually filtered data on the server level when using profiler, it seems at the moment without creating a custom connector or buying from marketplace the only way to get this to work is using glueContext.read
You can convert DynamicFrames to and from DataFrames (See example)
rds_datasink_temp = DynamicFrame.fromDF(rds_dataframe, glueContext, "nested")
you should also check this while running Sql Server Profiler with all the events from: OLEDB, Stored Procedure, TSQL and Transactions
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.context import DynamicFrame
# list parameter with 2 leading hyphens --param_server_url
args = getResolvedOptions(sys.argv,['JOB_NAME'])
print("JOB_NAME: ", args['JOB_NAME'])
job_server_url="SERVER URL"
job_db_name="DB NAME"
job_db_user="DB USER"
job_db_password="DB PASSWORD"
job_table_name="TABLE NAME"
job_glue_db_name="GLUE DATA CATALOG DATABASE NAME"
job_glue_conn_name="GLUE DATA CATALOG CONNECTION NAME"
job_glue_table_name="GLUE DATA CATALOG TABLE NAME"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
region = "us-east-1"
#### aws glue data catalog table info (from ) ####
# Name job_glue_table_name
# Database job_glue_db_name
# Classification sqlserver
# Location job_db_name.dbo.job_table_name
# Connection job_glue_conn_name
#### GlueContext Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html
#### DynamicFrame Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html
#### Connection Api ####
# https://docs.aws.amazon.com/glue/latest/webapi/API_Connection.html
#### Using connectors and connections with AWS Glue Studio ####
# Link : https://docs.aws.amazon.com/glue/latest/ug/connectors-chapter.html
# Use AWS Secrets Manager for storing credentials
# Filtering the source data with row predicates and column projections
#### Connection options for type custom.jdbc or marketplace.jdbc ####
# Link : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-jdbc
# className – String, required, driver class name.
# connectionName – String, required, name of the connection that is associated with the connector.
# secretId or user/password – String, required, used to retrieve credentials for the URL.
# dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.
# filterPredicate – String, optional, extra condition clause to filter data from source. For example:
# using \ for new line with more commands
# query="recordid<=5", -- filtering !
print("0001 - df_read_query")
df_read_query = glueContext.read \
.format("jdbc") \
.option("url","jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";") \
.option("query","select recordid from "+job_table_name+" where recordid <= 5") \
.option("user",job_db_user) \
.option("password",job_db_password) \
.load()
print("df_read_query count: ", df_read_query.count())
df_read_query.show(10)
df_read_query.printSchema()
# query="recordid<=5", -- not filtering
print("0002 - df_from_catalog_query")
df_from_catalog_query = glueContext.create_dynamic_frame.from_catalog(
database = job_glue_db_name,
table_name = job_glue_table_name,
additional_options={
"query":"select recordid from "+job_table_name+" where recordid <= 5;",
},
transformation_ctx = "df_from_catalog_query",
)
print("df_from_catalog_query count: ", df_from_catalog_query.count())
df_from_catalog_query.show(10)
# push_down_predicate="recordid<=5", -- not filtering
print("0003 - df_from_catalog_push_down_predicate")
df_from_catalog_push_down_predicate = glueContext.create_dynamic_frame.from_catalog(
database = job_glue_db_name,
table_name = job_db_name+'_dbo_'+job_table_name,
push_down_predicate = "recordid<=5",
transformation_ctx = "df_from_catalog_push_down_predicate",
)
print("df_from_catalog_push_down_predicate count: ", df_from_catalog_push_down_predicate.count())
df_from_catalog_push_down_predicate.show(10)
# filterPredicate="recordid<=5", -- not filtering
print("0004 - df_from_options_sqlserver")
df_from_options_sqlserver = glueContext.create_dynamic_frame.from_options(
connection_type = "sqlserver",
connection_options = {
"url":"jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";",
"username":job_db_user,
"password":job_db_password,
"location":job_db_name+".dbo."+job_table_name,
"filterPredicate":"recordid<=5",
},
transformation_ctx = "df_from_options_sqlserver",
)
print("df_from_options_sqlserver count: ", df_from_options_sqlserver.count())
df_from_options_sqlserver.show(10)
# dbtable="recordid<=5", -- filtering !
print("0005 - df_read_dbtable")
df_read_dbtable = glueContext.read \
.format("jdbc") \
.option("url","jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";") \
.option("user",job_db_user) \
.option("password",job_db_password) \
.option("dbtable","(select recordid from "+job_table_name+" where recordid<=5) as t1") \
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
print("df_read_dbtable count: ", df_read_dbtable.count())
df_read_dbtable.show(10)
job.commit()
Upvotes: 1
Reputation: 844
For anyone who is still searching for further answers/examples, I can confirm that the push_down_predicate
option works with ODBC data sources. Here's how I read from SQL Server (in Python).
df = glueContext.read.format("jdbc")
.option("url","jdbc:sqlserver://server-ip:port;databaseName=db;")
.option("user","username")
.option("password","password")
.option("dbtable","(select t1.*, t2.name from dbo.table1 t1 join dbo.table2 t2 on t1.id = t2.id) as users")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
This also works but NOT as I expected. The predicate is not pushed down to the data source.
df = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "db_dbo_table1", push_down_predicate = "(id >= 2850700 AND statusCode = 'ACT')")
The documentation on pushDownPredicate
states: The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible.
Upvotes: 2
Reputation: 1939
I think the problem occured because you didn't use the query in parentheses and provide an alias. In my opinion it should look like in the following example:
val t = glueContext.read.format("jdbc").option("url","jdbc:mysql://serverIP:port/database").option("user","username").option("password","password").option("dbtable","(select * from table1 where 1=1) as t1").option("driver","com.mysql.jdbc.Driver").load()
More information about parameters in SQL data sources:
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
When it comes to the Glue and the framework which the Glue provides, there is also the option "push_down_predicate", but I have only used this option on the data sources based on S3. I think it doesn't work on other sources than on S3 and non-partitioned data.
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
Upvotes: 9