LES
LES

Reputation: 3

IngestFromStreamAsync method does not work

I manage to ingest data successfully using below code

var kcsbDM = new KustoConnectionStringBuilder(
    "https://test123.southeastasia.kusto.windows.net", 
    "testdb")
  .WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);

using (var ingestClient = KustoIngestFactory.CreateDirectIngestClient(kcsbDM))            
{

    var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
    ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
    ingestProps.ReportMethod = IngestionReportMethod.Queue;
    ingestProps.Format = DataSourceFormat.json;
 
    //generate datastream and columnmapping

    ingestProps.IngestionMapping = new IngestionMapping() { 
      IngestionMappings = columnMappings };
    var ingestionResult = ingestClient.IngestFromStream(memStream, ingestProps);
}

when I try to use QueuedClient and IngestFromStreamAsync, the code is executed successfully but no any data is ingested into database even after 30 minutes

var kcsbDM = new KustoConnectionStringBuilder(
    "https://ingest-test123.southeastasia.kusto.windows.net", 
    "testdb")
  .WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);

using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsbDM))          
{

    var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
    ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
    ingestProps.ReportMethod = IngestionReportMethod.Queue;
    ingestProps.Format = DataSourceFormat.json;
 
    //generate datastream and columnmapping

    ingestProps.IngestionMapping = new IngestionMapping() { 
      IngestionMappings = columnMappings };
    var ingestionResult = ingestClient.IngestFromStreamAsync(memStream, ingestProps);
}

Upvotes: 0

Views: 2447

Answers (4)

Vladik Branevich
Vladik Branevich

Reputation: 1175

Could you please change the line to:

var ingestionResult = await ingestClient.IngestFromStreamAsync(memStream, ingestProps);

Also please note that queued ingestion has a batching stage of up to 5 minutes before the data is actually ingested: IngestionBatching policy .show table ingestion batching policy

Upvotes: 1

sowen
sowen

Reputation: 1088

enable streamingestion policy is actually only needed if

  • stream ingestion is turned on in the cluster (azure portal)
  • the code is using CreateManagedStreamingIngestClient

the ManagedStreamingIngestClient will first try stream ingesting the data, if it fails a few times, then it will use the QueuedClient

if the ingesting data is smaller, under 4MB, it's recommended to use this client.

if using QueuedClient, you can try

.show commands-and-queries | | where StartedOn > ago(20m) and Text contains "{YourTableName}" and CommandType =="DataIngestPull" 

This can give you the command executed; however it could have latency > 5 mins

Finally, you can check the status with any client you use, do this

            StreamDescription description = new StreamDescription
            {
                SourceId = Guid.NewGuid(),
                Stream = dataStream
            };

then you have the source id

ingesting by calling this:

var checker = await client.IngestFromStreamAsync(description, ingestProps);

after that, call

var statusCheck = checker.GetIngestionStatusBySourceId(description.sourceId.Value);

You can figure out the status of this ingestion job. It's better wrapped in a separate thread, so you can keep checking once a few seconds, for example.

Upvotes: 0

LES
LES

Reputation: 3

I find the reason finally, need to enable stream ingestion in the table:

.alter table TraceLog policy streamingingestion enable

See the Azure documentation for details.

Upvotes: 0

Keren
Keren

Reputation: 36

Try running .show ingestion failures on "https://test123.southeastasia.kusto.windows.net" endpoint, see if there are ingestion error. Also, you set Queue reporting method, you can get the detailed result by reading from the queue.

ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;

(On the first example you used KustoQueuedIngestionProperties, you should use KustoIngestionProperties. KustoQueuedIngestionProperties has additional properties that will be ignored by the ingest client, ReportLevel and ReportMethod for example)

Upvotes: 1

Related Questions