Reputation: 17367
In short, I'm attempting to use the Reactive Library to implement a simple tail utility to actively monitor new lines as they're appended to a file. Here is what I got so far:
static void Main(string[] args)
{
var filePath = @"C:\Users\wbrian\Documents\";
var fileName = "TestFile.txt";
var fullFilePath = filePath + fileName;
var fs = new FileStream(fullFilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
var sr = new StreamReader(fs, true);
sr.ReadToEnd();
var lastPos = fs.Position;
var watcher = new FileSystemWatcher(filePath, fileName);
watcher.NotifyFilter = NotifyFilters.Size;
watcher.EnableRaisingEvents = true;
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
action => watcher.Changed += action,
action => watcher.Changed -= action)
.Throttle(TimeSpan.FromSeconds(1))
.Select(e =>
{
var curSize = new FileInfo(fullFilePath).Length;
if (curSize < lastPos)
{
//we assume the file has been cleared,
//reset the position of the stream to the beginning.
fs.Seek(0, SeekOrigin.Begin);
}
var lines = new List<string>();
string line;
while((line = sr.ReadLine()) != null)
{
if(!string.IsNullOrWhiteSpace(line))
{
lines.Add(line);
}
}
lastPos = fs.Position;
return lines;
}).Subscribe(Observer.Create<List<string>>(lines =>
{
foreach (var line in lines)
{
Console.WriteLine("new line = {0}", line);
}
}));
Console.ReadLine();
sr.Close();
fs.Close();
}
As you can see, I create an Observable from a FileWatcher event, an event that fires when the size of the file is changed. From there, I determine which lines are new and the observable returns a list of new lines. Ideally, the observable sequence would just be a string that represents each new line. The only reason the observable returns a List is because I simply don't know how to massage it to do as such. Any help would be greatly appreciated.
Upvotes: 3
Views: 83
Reputation: 144126
You can use SelectMany:
SelectMany(lines => lines)
.Subscribe(Observer.Create<string>(line => { Console.WriteLine("new line = {0}", line); });
Upvotes: 1