Reputation: 509
I'm trying to read data from CSV file in GCS and save it in a BigQuery table.
This my csv file :
1,Marc,B12,2017-03-24
2,Marc,B12,2018-01-31
3,Marc,B21,2017-03-17
4,Jeam,B12,2017-12-30
5,Jeam,B12,2017-09-02
6,Jeam,B11,2018-06-30
7,Jeam,B21,2018-03-02
8,Olivier,B20,2017-12-30
And this is my code :
val spark = SparkSession
.builder()
.appName("Hyp-session-bq")
.config("spark.master","local")
.getOrCreate()
val sc : SparkContext = spark.sparkContext
val conf=sc.hadoopConfiguration
//Input Parameters
val projectId = conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")
val inputTable = s"$projectId:rpc.testBig"
//Input Configuration
conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,bucket)
BigQueryConfiguration.configureBigQueryInput(conf,inputTable)
//Output Parameters
val outPutTable = s"$projectId:rpc.outTestBig"
// Temp output bucket that is deleted upon completion of job
val outPutGcsPath = ("gs://"+bucket+"/hadoop/tmp/outTestBig")
BigQueryOutputConfiguration.configure(conf,
outPutTable,
null,
outPutGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
classOf[TextOutputFormat[_,_]])
conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName)
// Truncate the table before writing output to allow multiple runs.
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")
val text_file = sc.textFile("gs://test_files/csvfiles/test.csv")
val lignes = text_file.flatMap(x=>x.split(" "))
case class schemaFile(id: Int, name: String, symbole: String, date: String)
def parseStringWithCaseClass(str: String): schemaFile = schemaFile(
val id = str.split(",")(0).toInt,
val name = str.split(",")(1),
val symbole = str.split(",")(2),
val date = str.split(",")(3)
)
val result1 = lignes.map(x=>parseStringWithCaseClass(x))
val x =result1.map(elem =>(null,new Gson().toJsonTree(elem)))
val y = x.saveAsNewAPIHadoopDataset(conf)
When I run the Code I get this Error :
ERROR org.apache.spark.internal.io.SparkHadoopMapReduceWriter: Aborting job job_20180226083501_0008.
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "Load configuration must specify at least one source URI",
"reason" : "invalid"
} ],
"message" : "Load configuration must specify at least one source URI"
}
at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.insertJobOrFetchDuplicate(BigQueryHelper.java:306)
at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:160)
at com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
at jeam.BigQueryIO$.main(BigQueryIO.scala:115)
at jeam.BigQueryIO.main(BigQueryIO.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I think the probleme is about the case class and parseStringWithCaseClass but I don't Know How to resolve this. I don't have a problème in the configuration because i get the perfect result when i'm trying with wordcount example : https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
Upvotes: 1
Views: 2112
Reputation: 8178
I have been performing some tests running your code with my own BigQuery tables and CSV files, and it has worked for me without needing any additional modification.
I see that when you changed CaseClass
to Tuple4
, as suggested by @jean-marc, your code started working, so it is a strange behavior, even more considering that for both him and me, your code is actually working without needing further modifications. The error Load configuration must specify at least one source URI
usually appears when the load job in BigQuery is not properly configured, and it is not receiving a correct Cloud Storage object URL. However, if the same exact code works when only changing to Tuple4
and the CSV file you are using is the same and has not changed (i.e. the URL is valid), it may have been a transient issue, possibly related to Cloud Storage or BigQuery and not to the Dataproc job itself.
Finally, given the case that this issue is specific to you (it has worked for at least two more users with the same code), once you have checked that there is no issue related to the Cloud Storage object (permissions, wrong location, etc.), you may be interested in creating a bug in the Public Issue Tracker.
Upvotes: 1
Reputation: 68
Try to work with Tuple4 :
def parseStringWithTuple(str: String): Tuple4[Int, String, String, String] = {
val id = str.split(",")(0).toInt
val name = str.split(",")(1)
val symbole = str.split(",")(2)
val date = str.split(",")(3)
(id, name, symbole, date)
}
val result1 = lignes.map(x=>parseStringWithTuple(x))
But I tested your Code and it works fine.
Upvotes: 1