Ewan Walker
Ewan Walker

Reputation: 184

Bigquery query failure handling

I am trying to make sure that all my streaming queries are inserted successfully, and it seems as though some of these queries are not being inserted correctly.

There are no errors being thrown however I am not using the data returned by the insert request as I do not know how errors are structured.

Here is my insertion code:

    $rows = array();
    $row = new Google_Service_Bigquery_TableDataInsertAllRequestRows;
    $row->setJson($data);
    $rows[0] = $row;
    $request = new Google_Service_Bigquery_TableDataInsertAllRequest;
    $request->setKind('bigquery#tableDataInsertAllRequest');
    $request->setRows($rows);
    return $this->service->tabledata->insertAll($project, $dataset, $tableid, $request);

The above code excludes the authentication/setting up the client & service

Upvotes: 1

Views: 310

Answers (1)

Pentium10
Pentium10

Reputation: 207830

You can use this method

/**
 * 
 * @param Google_Client $client
 * @param type $project_id
 * @param type $dataset_id
 * @param type $rows
 * @return mixed
 * @throws Google_Service_Exception
 */
public function BQ_Tabledata_InsertAll($client, $project_id, $dataset_id, $rows) {
    $success = true;
    $failed_lines = array();
    $last_reason = '';
    $ret = array(
        'success' => &$success,
        'last_reason' => &$last_reason,
        'failed_lines' => &$failed_lines,
    );
    $bq = new Google_Service_Bigquery($client);
    $request = new Google_Service_Bigquery_TableDataInsertAllRequest();
    $request->setRows($rows);
    try {
        $resp = new Google_Service_Bigquery_TableDataInsertAllResponse();
        $resp = $bq->tabledata->insertAll($project_id, $dataset_id, static::tableId(), $request);
        $errors = new Google_Service_Bigquery_TableDataInsertAllResponseInsertErrors();
        $errors = @$resp->getInsertErrors();
        if (!empty($errors)) {
            $error_msg = "\r\nRequest Headers: \r\n" . json_encode($client->request->getRequestHeaders()) . "\r\nResponse Headers: \r\n" . json_encode($client->request->getResponseHeaders()) . "\r\nRequest Body:\r\n" . $client->request->getPostBody() . "\r\nResponse Body:\r\n" . $client->request->getResponseBody() . "\r\n";
            if (is_array($errors)) {
                foreach ($errors as $eP) {
                    $arr = $eP->getErrors();
                    $line = $eP->getIndex();
                    if (is_array($arr)) {
                        foreach ($arr as $e) {
                            switch ($e->getReason()) {
                                case "stopped":
                                    break;
                                case "timeout":
                                    $failed_lines[] = $line;
                                    $last_reason = $e->getReason();
                                    $error_msg.= sprintf("Timeout on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
                                    break;
                                default:
                                    $error_msg.= sprintf("Error on line %s, reason: %s, msg: %s\r\n", $line, $e->getReason(), $e->getMessage());
                                    break;
                            }
                        }
                    } else {
                        $error_msg.= json_encode($arr) . "\r\n";
                    }
                }
                $this->setErrorMessage($error_msg);
            } else {
                $this->setErrorMessage($errors);
            }
            //print_r($errors);
            //exit;
            $success = false;
        }
        return $ret;
    } catch (Google_Service_Exception $e) {
        $this->setErrors($e->getErrors())->setErrorMessage($e->getMessage());
        throw $e;
    }
}

Upvotes: 2

Related Questions