Reputation: 13965
I am creating a little console app that is just in proof-of-concept stage right now. The idea is that it will query 3rd party API's for events on their end. (That part is all working just fine).
When an event arrives, we need to stick the json string for that event into a sqlite db on our side. At the same time, I need a process running in the same app that is continuously looking for records in the sqlite table, and when it finds one, it handles it.
Here's what I have so far... (again, this is just a proof of concept while I learn the basics of how to set this up)...
static void Main(string[] args)
{
var t = CheckEventsAsync();
while (true)
{
System.Threading.Thread.Sleep(5000);
DataAccess.InsertData("INSERT INTO events(value) VALUES ('this is an event');");
}
}
static private async Task CheckEventsAsync()
{
DataTable dt = await DataAccess.LoadDataAsync("SELECT * FROM events");
foreach (var dr in dt.Rows)
{
Console.WriteLine("got an event!");
}
}
The problem is that CheckEventsAsync() only ever fires once, and then when the 5 second timer goes off an a record is added to the table, nothing happens.
Am I going to need to use an EventHandler (with a delegate, etc) to make this work? Or is it possible w/o doing it that way?
Thanks!
Oh, almost forgot... (this works in that it returns a data table if there is data, and null if not. But I'm not sure I'm doing it right for the overall goal)...
public static async Task<DataTable> LoadDataAsync(string txtQuery)
{
var dataTable = new DataTable();
var connection = new SQLiteConnection(_source);
using (connection)
{
connection.Open();
var command = connection.CreateCommand();
command.CommandText = txtQuery;
var task = await command.ExecuteReaderAsync();
dataTable = new DataTable();
dataTable.Load(task);
connection.Close();
}
if (dataTable != null & dataTable.Rows.Count > 0)
return dataTable;
else
return null;
}
EDIT
I see from the answers so far that I did a bad job of asking the question. I need to have two processes running:
While(true)
loop. It will be a WebSocket Listener. The two need to operate within the same app, but independent of each other.
I hope I am making sense?
Upvotes: 2
Views: 1391
Reputation: 117175
Use Microsoft's Reactive Framework for this:
static readonly BlockingCollection<ItemToProcess> _queue = new BlockingCollection<ItemToProcess>(new ConcurrentQueue<ItemToProcess>());
static void Main(string[] args)
{
IDisposable inserts =
Observable
.Interval(TimeSpan.FromSeconds(5.0))
.Subscribe(_ =>
{
_queue.Add(new ItemToProcess() { Value = "value" });
});
IDisposable checks =
new CompositeDisposable(
Disposable.Create(() => _queue.CompleteAdding()),
_queue
.GetConsumingEnumerable()
.ToObservable(Scheduler.Default)
.Subscribe(item =>
{
Console.WriteLine(item.Value);
}));
}
This handles all of the threading and makes it easy to close the app. Just call inserts.Dispose()
and checks.Dispose()
just before exiting.
NuGet "System.Reactive" to get the bits.
Upvotes: 4
Reputation: 101633
Since all that is running in the same OS process - you don't need to constantly poll database for new events. Instead - use in-memory queue to which all producers push data and consumer, well, consumes. You can still insert data into sqlite table to handle the case when process crashes and some items are left unprocessed. Here is some code (for illustration purposes - in your real application you will not have Thread.Sleep
and such of course):
class Program {
static readonly BlockingCollection<ItemToProcess> _queue = new BlockingCollection<ItemToProcess>(new ConcurrentQueue<ItemToProcess>());
static readonly CancellationTokenSource _cts = new CancellationTokenSource();
static void Main(string[] args) {
// get all unprocessed items from database here and insert into queue
var producerThread = StartProducer(_cts.Token);
var consumerThread = StartConsumer();
Console.ReadKey();
// cancel
_cts.Cancel();
// wait for producer thread to finish
producerThread.Join();
// notify there will be no more items
_queue.CompleteAdding();
// wait for consumer to finish
consumerThread.Join();
}
static Thread StartProducer(CancellationToken ct) {
var thread = new Thread(() => {
while (!ct.IsCancellationRequested) {
// also insert into database here
_queue.Add(new ItemToProcess() {
Value = "value"
});
// imitate delay
Thread.Sleep(5000);
}
}) {
IsBackground = true
};
thread.Start();
return thread;
}
static Thread StartConsumer() {
var thread = new Thread(() => {
foreach (var item in _queue.GetConsumingEnumerable()) {
try {
// process
Console.WriteLine(item.Value);
// delete from database here
}
catch (Exception ex) {
// handle
}
}
}) {
IsBackground = true
};
thread.Start();
return thread;
}
}
class ItemToProcess
{
public string Value { get; set; }
}
Upvotes: 3
Reputation: 662
Your task is outside the five second loop, so of course it only fires once.
Upvotes: 0