Reputation: 27
I have been searching various sources but it is not clear to this neewbie. How do I load data (CSV file) from Cloud Storage to Cloud Datastore from an AppEngine PHP application? I do have an existing method which downloads the file and then loads each row as a transaction. It takes a few hours for a few million rows so this does not seem the best method and have been searching for a more efficient method. I appreciate any guidance.
Editing this as I have switched to trying to use a remote URL from which to load the JSON data into Datastore from GAE. Code is not working though I do not know why (yet):
<?php
require 'vendor/autoload.php';
use Google\Auth\ApplicationDefaultCredentials;
use Google\Cloud\Datastore\DatastoreClient;
/**
* Create a new product with a given SKU.
*
* @param DatastoreClient $datastore
* @param $sku
* @param $product
* @return Google\Cloud\Datastore\Entity
*/
function add_product(DatastoreClient $datastore, $sku, $product)
{
$productKey = $datastore->key('SKU', $sku);
$product = $datastore->entity(
$productKey,
[
'created' => new DateTime(),
'name' => strtolower($product)
]);
$datastore->upsert($product);
return $product;
}
/*
Load Cloud DataStore Kind from remote URL
@param $projectId
@param $url
*/
function load_datastore($projectId, $url) {
// Create Datastore client
$datastore = new DatastoreClient(['projectId' => $projectId]);
// Enable `allow_url_fopen` to allow reading file from URL
ini_set("allow_url_fopen", 1);
// Read the products listing and load to Cloud Datastore.
// Use batches of 20 for a transaction
$json = json_decode(file_get_contents($url), true);
$count = 1;
foreach($json as $sku_key => $product_val) {
if ($count == 1) {
$transaction = $datastore->transaction();
}
add_product($datastore, $sku_key, $product_val);
if ($count == 20) {
$transaction->commit();
$count = 0;
} catch (Exception $err) {
echo 'Caught exception: ', $err->getMessage(), "\n";
$transaction->rollback();
}
$count++;
}
}
try
{
$projectId = 'development';
$url = 'https://raw.githubusercontent.com/BestBuyAPIs/open-data-set/master/products.json';
load_datastore($projectId, $url);
} catch (Exception $err) {
echo 'Caught exception: ', $err->getMessage(), "\n";
$transaction->rollback();
}
?>
Upvotes: 1
Views: 970
Reputation: 51
Google provides pre-written dataflow templates. You can use the GCS to Datastore Dataflow Template to read in the CSV, convert the CSV into Datastore Entity JSON, and write the results to datastore.
Let's assume you have a CSV of the following:
username, first, last, age, location.zip, location.city, location.state
samsmith, Sam, Smith, 33, 94040, Mountain View, California
johndoe, John, Doe, 50, 30075, Roswell, Georgia
dannyboy, Danny, Mac, 94040, Mountain View, California
You could have the following UDF to transform this CSV into a Datastore Entity of Kind People. This UDF assumes the following Schema:
This UDF outputs a JSON encoded Entity. This is the same JSON payload as used by the Cloud Datastore REST API. Values can be of the following types.
function myTransform(csvString) {
var row = csvString.split(",");
if (row.length != 4) { return; }
return JSON.stringify({
"key": {
"partition_id": {
// default namespace is an empty string
"namespace_id": ""
},
"path": {
"kind": "People",
"name": row[0]
}
},
"properties": {
"username": { "stringValue": row[0] },
"first": { "stringValue": row[1] },
"last": { "stringValue": row[2] },
"age": { "integerValue": row[3] },
"location": {
"entityValue": {
"properties": {
"zip": { "integerValue": row[4] },
"city": { "stringValue": row[5] },
"state": { "stringValue": row[6] }
}
}
}
}
});
}
To run the dataflow template. First save that UDF to a GCS bucket using gsutil.
gsutil cp my_csv_udf.js gs://mybucket/my_csv_udf.js
Now head into the Google Cloud Platform Console. Head to the dataflow page. Click on Create Job From Template and select "GCS Text to Datastore". You can also refer to this doc.
You job parameters would look like as follows:
Note: The UDF transform only supports JavaScript ECMAScript 5.1. So only basic javascript, no fancy arrow functions / promises...etc.
Upvotes: 2
Reputation: 2887
This question is similar to Import CSV into google cloud datastore and Google Cloud Datastore: Bulk Importing w Node.js .
The quick answer is that you can use Apache Beam or Cloud Dataflow to import CSV data into Cloud Datastore.
Upvotes: 0
Reputation: 39824
Sorry for not being more specific, but I'm a python standard env GAE user, rather unfamiliar with the PHP environment(s).
In general your current approach is serialized and synchronous - you're processing the rows one at a time (or, at best, in batches of 20 if all the upsert
calls inside a transactions actually go to the datastore in a single batch), blocking for every the datastore interaction and advancing to the next row only after that interaction completes.
I'm unsure if the PHP environment supports async datastore operations and/or true batch operations (the python ndb
library can batch up to 500 writes into one datastore call) - those could help speed things up.
Another thing to consider if your rows are entirely independent - do you actually need transactions for writing them? If PHP supports plain writing you could do that instead (transactions take longer to complete).
Even without the above-mentioned support, you can still speed things up considerably by decoupling the row reading from the waiting for completion of datastore ops:
in the current request handler you keep just the row reading and creating batches of 20 rows somehow passed for processing on other threads (task queue, pub/sub, separate threads - whatever you can get in PHP)
on a separate request handler (or task queue or pub/sub handler, depending on how you choose to pass your batch data) you receive those batches and make the actual datastore calls. This way you can have multiple batches processed in parallel, the amount of time they're blocked waiting for the datastore replies becoming irrelevant from the overall processing time perspective.
With such approach your performance would be limited only by the speed at which you can read the rows and enqueue those batches. If you want to be even faster - you could also split the single CSV file into multiple smaller ones, thus also having multiple row readers that could work in parallel, feeding those batch processing workers.
Side note: maybe you want to retry the failed/rolled-back transactions or save those entities for a later retry, currently it appears you're losing them.
Upvotes: 0