Casey Crookston
Casey Crookston

Reputation: 13965

How to make an async event run continuously?

I am creating a little console app that is just in proof-of-concept stage right now. The idea is that it will query 3rd party API's for events on their end. (That part is all working just fine).

When an event arrives, we need to stick the json string for that event into a sqlite db on our side. At the same time, I need a process running in the same app that is continuously looking for records in the sqlite table, and when it finds one, it handles it.

Here's what I have so far... (again, this is just a proof of concept while I learn the basics of how to set this up)...

static void Main(string[] args)
{
    var t = CheckEventsAsync();
    while (true)
    {
        System.Threading.Thread.Sleep(5000);
        DataAccess.InsertData("INSERT INTO events(value) VALUES ('this is an event');");
    }
}

static private async Task CheckEventsAsync()
{
    DataTable dt = await DataAccess.LoadDataAsync("SELECT * FROM events");
    foreach (var dr in dt.Rows)
    {
        Console.WriteLine("got an event!");
    }
}

The problem is that CheckEventsAsync() only ever fires once, and then when the 5 second timer goes off an a record is added to the table, nothing happens.

Am I going to need to use an EventHandler (with a delegate, etc) to make this work? Or is it possible w/o doing it that way?

Thanks!

Oh, almost forgot... (this works in that it returns a data table if there is data, and null if not. But I'm not sure I'm doing it right for the overall goal)...

    public static async Task<DataTable> LoadDataAsync(string txtQuery)
    {
        var dataTable = new DataTable();
        var connection = new SQLiteConnection(_source);
        using (connection)
        {
            connection.Open();
            var command = connection.CreateCommand();
            command.CommandText = txtQuery;
            var task = await command.ExecuteReaderAsync();
            dataTable = new DataTable();
            dataTable.Load(task);
            connection.Close();
        }
        if (dataTable != null & dataTable.Rows.Count > 0)
            return dataTable;
        else
            return null;
    }

EDIT

I see from the answers so far that I did a bad job of asking the question. I need to have two processes running:

  1. The first process will listen for 3rd party events and add them to the sqlite table. This is simulated with the 5 second timer. But in reality, there will not be a timer inside a While(true) loop. It will be a WebSocket Listener.
  2. The second process will watch for records in the database and handle them. So, CheckEventsAsync() needs to be a listener that triggers without the method being called.

The two need to operate within the same app, but independent of each other.

I hope I am making sense?

Upvotes: 2

Views: 1391

Answers (3)

Enigmativity
Enigmativity

Reputation: 117175

Use Microsoft's Reactive Framework for this:

static readonly BlockingCollection<ItemToProcess> _queue = new BlockingCollection<ItemToProcess>(new ConcurrentQueue<ItemToProcess>());

static void Main(string[] args)
{
    IDisposable inserts =
        Observable
            .Interval(TimeSpan.FromSeconds(5.0))
            .Subscribe(_ =>
            {
                _queue.Add(new ItemToProcess() { Value = "value" });
            });

    IDisposable checks =
        new CompositeDisposable(
            Disposable.Create(() => _queue.CompleteAdding()),
            _queue
                .GetConsumingEnumerable()
                .ToObservable(Scheduler.Default)
                .Subscribe(item =>
                {
                    Console.WriteLine(item.Value);
                }));
}

This handles all of the threading and makes it easy to close the app. Just call inserts.Dispose() and checks.Dispose() just before exiting.

NuGet "System.Reactive" to get the bits.

Upvotes: 4

Evk
Evk

Reputation: 101633

Since all that is running in the same OS process - you don't need to constantly poll database for new events. Instead - use in-memory queue to which all producers push data and consumer, well, consumes. You can still insert data into sqlite table to handle the case when process crashes and some items are left unprocessed. Here is some code (for illustration purposes - in your real application you will not have Thread.Sleep and such of course):

class Program {
    static readonly BlockingCollection<ItemToProcess> _queue = new BlockingCollection<ItemToProcess>(new ConcurrentQueue<ItemToProcess>());
    static readonly CancellationTokenSource _cts = new CancellationTokenSource();

    static void Main(string[] args) {
        // get all unprocessed items from database here and insert into queue
        var producerThread = StartProducer(_cts.Token);
        var consumerThread = StartConsumer();
        Console.ReadKey();
        // cancel
        _cts.Cancel();
        // wait for producer thread to finish
        producerThread.Join();
        // notify there will be no more items
        _queue.CompleteAdding();
        // wait for consumer to finish
        consumerThread.Join();
    }

    static Thread StartProducer(CancellationToken ct) {
        var thread = new Thread(() => {
            while (!ct.IsCancellationRequested) {                    
                // also insert into database here
                _queue.Add(new ItemToProcess() {
                    Value = "value"
                });
                // imitate delay
                Thread.Sleep(5000);
            }
        }) {
            IsBackground = true
        };
        thread.Start();
        return thread;
    }

    static Thread StartConsumer() {
        var thread = new Thread(() => {
            foreach (var item in _queue.GetConsumingEnumerable()) {
                try {
                    // process
                    Console.WriteLine(item.Value);
                    // delete from database here
                }
                catch (Exception ex) {
                    // handle
                }
            }
        }) {
            IsBackground = true
        };
        thread.Start();
        return thread;
    }
}

class ItemToProcess
{
    public string Value { get; set; }
}

Upvotes: 3

Aaron
Aaron

Reputation: 662

Your task is outside the five second loop, so of course it only fires once.

Upvotes: 0

Related Questions