Reputation: 3
I manage to ingest data successfully using below code
var kcsbDM = new KustoConnectionStringBuilder(
"https://test123.southeastasia.kusto.windows.net",
"testdb")
.WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);
using (var ingestClient = KustoIngestFactory.CreateDirectIngestClient(kcsbDM))
{
var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;
//generate datastream and columnmapping
ingestProps.IngestionMapping = new IngestionMapping() {
IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStream(memStream, ingestProps);
}
when I try to use QueuedClient and IngestFromStreamAsync, the code is executed successfully but no any data is ingested into database even after 30 minutes
var kcsbDM = new KustoConnectionStringBuilder(
"https://ingest-test123.southeastasia.kusto.windows.net",
"testdb")
.WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsbDM))
{
var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;
//generate datastream and columnmapping
ingestProps.IngestionMapping = new IngestionMapping() {
IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStreamAsync(memStream, ingestProps);
}
Upvotes: 0
Views: 2447
Reputation: 1175
Could you please change the line to:
var ingestionResult = await ingestClient.IngestFromStreamAsync(memStream, ingestProps);
Also please note that queued ingestion has a batching stage of up to 5 minutes before the data is actually ingested: IngestionBatching policy .show table ingestion batching policy
Upvotes: 1
Reputation: 1088
enable streamingestion policy is actually only needed if
CreateManagedStreamingIngestClient
the ManagedStreamingIngestClient will first try stream ingesting the data, if it fails a few times, then it will use the QueuedClient
if the ingesting data is smaller, under 4MB, it's recommended to use this client.
if using QueuedClient, you can try
.show commands-and-queries | | where StartedOn > ago(20m) and Text contains "{YourTableName}" and CommandType =="DataIngestPull"
This can give you the command executed; however it could have latency > 5 mins
Finally, you can check the status with any client you use, do this
StreamDescription description = new StreamDescription
{
SourceId = Guid.NewGuid(),
Stream = dataStream
};
then you have the source id
ingesting by calling this:
var checker = await client.IngestFromStreamAsync(description, ingestProps);
after that, call
var statusCheck = checker.GetIngestionStatusBySourceId(description.sourceId.Value);
You can figure out the status of this ingestion job. It's better wrapped in a separate thread, so you can keep checking once a few seconds, for example.
Upvotes: 0
Reputation: 3
I find the reason finally, need to enable stream ingestion in the table:
.alter table TraceLog policy streamingingestion enable
See the Azure documentation for details.
Upvotes: 0
Reputation: 36
Try running .show ingestion failures on "https://test123.southeastasia.kusto.windows.net" endpoint, see if there are ingestion error. Also, you set Queue reporting method, you can get the detailed result by reading from the queue.
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
(On the first example you used KustoQueuedIngestionProperties
, you should use KustoIngestionProperties
. KustoQueuedIngestionProperties
has additional properties that will be ignored by the ingest client, ReportLevel and ReportMethod for example)
Upvotes: 1