Reputation: 3050
I'm stressing a service I'm writing that uses a WebSocket
taken from AcceptWebSocketAsync
. The code I'm using to send messages through the WebSocket is this:
static bool
SendMessage(WebSocket webSocket, WebSocketMessage message, byte[] buffer, CancellationToken cancellationToken)
{
try {
var endOfMessage = false;
do {
using(var timeout = new CancellationTokenSource(webSocketsTimeout))
using(var lcts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeout.Token)) {
var count = message.Content.Read(buffer, 0, buffer.Length);
endOfMessage = count < buffer.Length;
// ReSharper disable once MethodSupportsCancellation
webSocket
.SendAsync(new ArraySegment<byte>(buffer, 0, count), message.Type, endOfMessage, lcts.Token)
.Wait() // SendAsync should be canceled using the Token.
;
}
} while(endOfMessage == false);
return true;
}
catch(Exception e) {
TraceConnectionError(e);
return false;
}
finally {
message.Dispose();
}
}
My problem is that under "stress" (I'm opening and closing 6 connections every 30 seconds until the system fails), I'm getting:
Unhandled Exception: System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. ---> System.Net.HttpListenerException: An operation was attempted on a nonexistent network connection
at System.Net.WebSockets.WebSocketHttpListenerDuplexStream.WriteAsyncFast(HttpListenerAsyncEventArgs eventArgs)
at System.Net.WebSockets.WebSocketHttpListenerDuplexStream.<MultipleWriteAsyncCore>d__38.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Net.WebSockets.WebSocketBase.<SendFrameAsync>d__48.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at System.Net.WebSockets.WebSocketBase.WebSocketOperation.<Process>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Net.WebSockets.WebSocketBase.<SendAsyncCore>d__47.MoveNext()
--- End of inner exception stack trace ---
at System.Threading.Tasks.TaskExceptionHolder.Finalize()
Shouldn't the Wait()
I'm using be enough to "observe" the Task exception?
Upvotes: 0
Views: 619
Reputation: 3050
The problem was a race condition in .NET framework code.
I've reported the bug here.
As a workaround, I keep a list of used WebSockets which I regularly check for State != Open
and then call this code:
public static class WebSocketXs
{
readonly static Assembly assembly = typeof(WebSocket).Assembly;
readonly static FieldInfo m_InnerStream = assembly.GetType("System.Net.WebSockets.WebSocketBase").GetField(nameof(m_InnerStream), BindingFlags.NonPublic | BindingFlags.Instance);
readonly static FieldInfo m_ReadTaskCompletionSource = assembly.GetType("System.Net.WebSockets.WebSocketHttpListenerDuplexStream").GetField(nameof(m_ReadTaskCompletionSource), BindingFlags.NonPublic | BindingFlags.Instance);
readonly static FieldInfo m_WriteTaskCompletionSource = assembly.GetType("System.Net.WebSockets.WebSocketHttpListenerDuplexStream").GetField(nameof(m_WriteTaskCompletionSource), BindingFlags.NonPublic | BindingFlags.Instance);
readonly static FieldInfo[] completionSourceFields = {m_ReadTaskCompletionSource, m_WriteTaskCompletionSource };
/// <summary>
/// This fixes a race that happens when a <see cref="WebSocket"/> fails and aborts after failure.
/// The <see cref="completionSourceFields"/> have an Exception that is not observed as the <see cref="WebSocket.Abort()"/>
/// done to WebSocketBase <see cref="m_InnerStream"/> is just <see cref="TaskCompletionSource{TResult}.TrySetCanceled()"/> which
/// does nothing with the completion source <see cref="Task.Exception"/>.
/// That in turn raises a <see cref="TaskScheduler.UnobservedTaskException"/>.
/// </summary>
public static void
CleanUpAndDispose(this WebSocket ws)
{
foreach(var completionSourceField in completionSourceFields) {
m_InnerStream
.GetValue(ws)
.Maybe(completionSourceField.GetValue)
.Maybe(s => s as TaskCompletionSource<object>)?
.Task
.Exception
.Maybe(_ => {}) // We just need to observe any exception.
;
}
ws.Dispose();
}
}
Upvotes: 2