Reputation: 21
I have filename and filestream saved in the SQL Server database. I want to upload this filestream as a file on AWS S3 Bucket using Apache NIFI.
Currently I am following below processors in the same sequence:
Now problem is, This approach doesn't convert file stream to file on S3 Bucket. It just uploads a file on s3 bucket having filestream column data. It should work like if filestream is of "png" image type then it should upload png image to s3 bucket. and If filestream is of "xlsx" type then it should upload xlsx file on s3 bucket.
Upvotes: 0
Views: 590
Reputation: 21
This script worked!
import groovy.sql.Sql
def ff=session.create()
def sqlIns = Sql.newInstance('jdbc:sqlserver://servername:port;databaseName=dbname;encrypt=true;trustServerCertificate=true', 'username', 'password', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')
// Query the database to fetch the data
def query = 'SELECT FileName, FileStream FROM table'
def outFiles = [] //list of files for output
sqlIns.eachRow(query){ row->
log.info "${row.FileName}"
def binFile = ff.clone(false)
binFile.filename = row.FileName
binFile.write{ stream-> stream << row.FileStream }
outFiles << binFile
}
REL_SUCCESS << outFiles
session.remove(ff)
Upvotes: 0
Reputation: 28564
maybe there is a nifi native way to insert read blob column however you could use ExecuteGroovyScript
processor instead.
add SQL.mydb
parameter on the level of processor and link it to required DBCP pool.
use following script body (have no chance to test):
def ff=session.get()
if(!ff)return
//just assumption - i don't know your table structure...
def query = '''
select from myTable (file_name,bin_content) where update_time > :p_timestamp
'''
def params = [
p_timestamp: ff.last_timestamp
]
def outFiles = [] //list of files for output
//SQL.mydb is a reference to groovy.sql.Sql instance
SQL.mydb.eachRow(query, params){ row->
def binFile = ff.clone(false) //clone incoming file but without content
binFile.filename = row.file_name
binFile.write{ stream-> stream << row.bin_content.getBinaryStream() }
outFiles << binFile
}
REL_SUCCESS << outFiles //transfer list of new files to success
session.remove(ff) //drop incoming file
the script above will execute sql select and for each received record will produce a new flow file with name and content received from db.
details about ExecuteGroovyScript processor features:
Upvotes: 0