Reputation: 2672
Minimal reproducible example:
Interop.cs
public static class Interop
{
[DllImport("kernel32.dll")]
public static extern IntPtr CreateIoCompletionPort(
[In] IntPtr fileHandle,
[In] IntPtr existingCompletionPort,
[In] UInt32 completionKey,
[In] UInt32 numberOfConcurrentThreads);
[DllImport("kernel32.dll")]
public static extern UInt32 GetLastError();
[DllImport("kernel32.dll")]
public static unsafe extern bool GetQueuedCompletionStatus(
[In] IntPtr completionPort,
[Out] out UInt32 ptrBytesTransferred,
[Out] out UInt32 ptrCompletionKey,
[Out] NativeOverlapped** lpOverlapped,
[In] UInt32 dwMilliseconds);
[DllImport("kernel32.dll")]
public static extern IntPtr CreateFile(
[In] string fileName,
[In] UInt32 dwDesiredAccess,
[In] UInt32 dwShareMode,
[In] IntPtr lpSecurityAttributes,
[In] UInt32 dwCreationDisposition,
[In] UInt32 dwFlagsAndAttributes,
[In] IntPtr hTemplateFile);
[DllImport("kernel32.dll")]
public static unsafe extern bool ReadFile(
[In] IntPtr hFile,
[Out] byte[] lpBuffer,
[In] uint maxBytesToRead,
[Out] out UInt32 bytesActuallyRead,
[In] NativeOverlapped* lpOverlapped);
[DllImport("kernel32.dll")]
public static extern bool PostQueuedCompletionStatus(
[In] IntPtr completionPort,
[In] UInt32 bytesTrasferred,
[In] UInt32 completionKey,
[In] IntPtr lpOverlapped);
}
Program.cs
class Program
{
static unsafe void Main(string[] args)
{
// create completion port
var completionPortHandle = Interop.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, 0, 0);
ThreadLogger.Log("Completion port handle: {0}", completionPortHandle);
var completionPortThread = new Thread(() => new IOCompletionWorker().Start(completionPortHandle))
{
IsBackground = true
};
completionPortThread.Start();
const uint Flags = 128 | (uint)1 << 30;
var fileHandle = Interop.CreateFile("test.txt", (uint)1 << 31, 0, IntPtr.Zero, 3,
/*FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED */ Flags,
IntPtr.Zero);
ThreadLogger.Log("File handle: {0}", fileHandle);
Interop.CreateIoCompletionPort(
fileHandle,
completionPortHandle,
(uint)fileHandle.ToInt64(),
0);
ThreadLogger.Log("Associated file handle with completion port");
var readBuffer = new byte[1024];
uint bytesRead;
var overlapped = new Overlapped
{
AsyncResult = new FileReadAsyncResult()
{
ReadCallback = (bytesCount, buffer) =>
{
var contentRead = Encoding.UTF8.GetString(buffer, 0, (int)bytesCount);
ThreadLogger.Log(contentRead);
},
Buffer = readBuffer
}
};
NativeOverlapped* nativeOverlapped = overlapped.UnsafePack((uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) =>
{
ThreadLogger.Log("Why am I not getting printed?");
}, readBuffer);
ThreadLogger.Log("Before read in main thread");
Interop.ReadFile(fileHandle, readBuffer, (uint)readBuffer.Length, out bytesRead, nativeOverlapped);
ThreadLogger.Log("After read in main thread");
Console.ReadLine();
}
}
FileReadAsyncResult.cs
class FileReadAsyncResult : IAsyncResult
{
public bool IsCompleted { get; private set; }
public WaitHandle AsyncWaitHandle { get; private set; }
public object AsyncState { get; private set; }
public bool CompletedSynchronously { get; private set; }
public Action<uint, byte[]> ReadCallback { get; set; }
public byte[] Buffer { get; set; }
}
IOCompletionWorker.cs
public class IOCompletionWorker
{
public unsafe void Start(IntPtr completionPort)
{
while (true)
{
uint bytesRead;
uint completionKey;
NativeOverlapped* nativeOverlapped;
ThreadLogger.Log("About to get queued completion status on {0}", completionPort);
var result = Interop.GetQueuedCompletionStatus(
completionPort,
out bytesRead,
out completionKey,
&nativeOverlapped,
uint.MaxValue);
var overlapped = Overlapped.Unpack(nativeOverlapped);
if (result)
{
var asyncResult = ((FileReadAsyncResult)overlapped.AsyncResult);
asyncResult.ReadCallback(bytesRead, asyncResult.Buffer);
}
else
{
ThreadLogger.Log(Interop.GetLastError().ToString());
}
Overlapped.Free(nativeOverlapped);
}
}
}
I'm aware that if I use Threadpool.BindHandle
with the corresponding file handle, my callback will be ran - but I'm trying to learn why it's not being executed when it is being registered on my own IOCP which a thread is waiting for completion packages. (Furthermore, the Thread Pool won't know how to handle my custom AsyncResult - the callback there won't get executed.)
Upvotes: 5
Views: 581
Reputation: 13535
IO Completion Ports work by creating a completion port, binding it to a file handle and then spin up n IO completion port threads which wait for GetQueuedCompletionStatus to return in this blocking call.
The callback you did mention in your code is not part of the IO Completion port infrastructure. It is a service of the .NET Threadpool where you can call ThreadPool.BindHandle(SafeHandle osHandle) which calls you back when for this handle an IO completion has completed.
But if you do everything on your own then the .NET Threadpool service will not be available to you. The calling call stack is
IOCompletionPort.Sample!IOCompletionPort.Sample.Program+<>c.<Main>b__0_0(UInt32, UInt32, System.Threading.NativeOverlapped*)+0x45[c:\Source\async-io-talk\src\IOCompletionPorts\IOCompletionPort.Sample\Program.cs @ 63]
mscorlib!System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32, UInt32, System.Threading.NativeOverlapped*)+0x84
clr!CallDescrWorkerInternal+0x83
clr!CallDescrWorkerWithHandler+0x4e
clr!DispatchCallSimple+0x67
clr!BindIoCompletionCallBack_Worker+0xee
clr!ManagedThreadBase_DispatchInner+0x40
clr!ManagedThreadBase_DispatchMiddle+0x6c
clr!ManagedThreadBase_DispatchOuter+0x4c
clr!ManagedThreadBase_FullTransitionWithAD+0x2f
clr!BindIoCompletionCallbackStubEx+0xb9
clr!BindIoCompletionCallbackStub+0x9
clr!ThreadpoolMgr::CompletionPortThreadStart+0x604
clr!Thread::intermediateThreadProc+0x8b
KERNEL32!BaseThreadInitThunk+0x14
ntdll!RtlUserThreadStart+0x21
The method BindIoCompletionCallbackStub is called on another thread when the IO completion did complete
FCIMPL1(FC_BOOL_RET, ThreadPoolNative::CorPostQueuedCompletionStatus, LPOVERLAPPED lpOverlapped)
{
FCALL_CONTRACT;
OVERLAPPEDDATAREF overlapped = ObjectToOVERLAPPEDDATAREF(OverlappedDataObject::GetOverlapped(lpOverlapped));
BOOL res = FALSE;
HELPER_METHOD_FRAME_BEGIN_RET_1(overlapped); // Eventually calls BEGIN_SO_INTOLERANT_CODE_NOTHROW
// OS doesn't signal handle, so do it here
overlapped->Internal = 0;
res = ThreadpoolMgr::PostQueuedCompletionStatus(lpOverlapped,
BindIoCompletionCallbackStub);
this is called by
ThreadPoolNative::CorPostQueuedCompletionStatus, LPOVERLAPPED lpOverlapped)
ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue
to execute the IO completion callbacks some time after one or several IO completions have finished. Since you handle everything on your own there is no one calling ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue to notify your completed IO completion initiated read request.
Upvotes: 2