Reputation: 2357
offer:
async def offer(self, request):
try:
params = await request.json()
peer_connection = {
"name": params["name"],
"surname": params["surname"],
"pc": None,
"is_closed": False,
"dc": None,
"uid": uuid.uuid4(),
"audio_track": None,
"audio_track_for_local_use": None,
"audio_blackhole": None,
"video_track": None,
"video_track_for_local_use": None,
"video_blackhole": None,
"offer_in_progress": True,
"call_answered": False,
"manage_call_end_thread": None,
"stop_in_progress": False,
"call_number": None
}
if len(list(self.pcs.keys())) == 3:
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
reserved_call_numbers = list(self.pcs.keys())
if 1 not in reserved_call_numbers:
call_number = 1
elif 2 not in reserved_call_numbers:
call_number = 2
else:
call_number = 3
peer_connection["call_number"] = call_number
self.pcs[call_number] = peer_connection
self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})
timer = 0
while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
if request.transport is None or request.transport.is_closing():
self.to_emitter.send({"type":"transport-error","call-number":call_number})
try:
request.transport.close()
except:
pass
del self.pcs[call_number]
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
timer += 0.1
await asyncio.sleep(0.1)
self.pcs[call_number]["call_answered"] = True
if self.queue.qsize() == 0:
return self.reject_offer(call_number,peer_connection)
else:
data = self.queue.get()
correct_types = ["call-1","call-2","call-3"]
if data["type"] in correct_types:
if (data["call"] == "reject"):
return self.reject_offer(call_number, peer_connection)
elif (data["call"] == "answer"):
while not self.queue.empty():
self.queue.get()
self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))
@self.pcs[call_number]["pc"].on("iceconnectionstatechange")
async def on_ice_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"ICE connection state: {pc.iceConnectionState}")
if pc.iceConnectionState == "failed":
print("ICE connection failed. Attempting to restart ICE.")
await pc.restartIce()
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
if pc.iceConnectionState == "disconnected":
# Wait before handling disconnection
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("connectionstatechange")
async def on_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"Connection state: {pc.connectionState}")
if pc.connectionState in ["failed", "disconnected", "closed"]:
print("Connection failed, disconnected, or closed. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("datachannel")
async def on_datachannel(channel):
self.pcs[call_number]["dc"] = channel
# Send UID to the connecting peer
try:
channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
except Exception as e:
print(f"Error sending UID: {e}")
# Inform about other peers
try:
for uid, pc in self.pcs.items():
if pc['uid'] != peer_connection['uid']:
try:
pc['dc'].send(json.dumps({
"type": "other-uid",
"uid": str(peer_connection['uid']),
"name": peer_connection["name"],
"surname": peer_connection["surname"]
}))
except Exception as e:
print(f"Error sending other-uid message: {e}")
except Exception as e:
print(f"Error informing about other peers: {e}")
@channel.on("message")
async def on_message(message):
try:
message = json.loads(message)
msg_type = message.get("type")
if msg_type == "disconnected":
print('disconnected message from client')
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
elif msg_type in ["offer", "answer", "ice-candidate"]:
target_uid = message["to_uid"]
for uid, pc in self.pcs.items():
if str(pc['uid']) == target_uid:
try:
pc["dc"].send(json.dumps(message))
except Exception as e:
print(f"Error relaying {msg_type}: {e}")
else:
print(f"Unhandled message type: {msg_type}")
except Exception as e:
print(f"Error handling message: {e}")
@channel.on("close")
async def on_close():
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
# audio from server to client
if self.server_audio_stream_offer == None:
self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
#self.server_audio_stream_offer = AudioStreamTrack()
self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)
# video from server to client
if self.server_video_stream_offer is None:
self.server_video_stream_offer = self.create_local_tracks()
self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)
# Attach video from server to QLabel
if self.server_video_track is None:
self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
if self.server_video_blackholde is None:
self.server_video_blackholde = MediaBlackhole()
self.server_video_blackholde.addTrack(self.server_video_track)
await self.server_video_blackholde.start()
@self.pcs[call_number]["pc"].on("track")
async def on_track(track):
if track.kind == "audio":
self.pcs[call_number]["audio_track"] = track
# audio from client (server use)
if call_number == 1:
correct_queue = self.ip_call_1_packet_queue
elif call_number == 2:
correct_queue = self.ip_call_2_packet_queue
else:
correct_queue = self.ip_call_3_packet_queue
self.put_to_q = True
self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
self.pcs[call_number]["audio_blackhole"].addTrack(
self.pcs[call_number]["audio_track_for_local_use"])
await self.pcs[call_number]["audio_blackhole"].start()
else:
self.pcs[call_number]["video_track"] = track
# video from client (server use)
self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
call_number, self)
self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
self.pcs[call_number]["video_blackhole"].addTrack(
self.pcs[call_number]["video_track_for_local_use"])
await self.pcs[call_number]["video_blackhole"].start()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
# handle offer
await self.pcs[call_number]["pc"].setRemoteDescription(offer)
# send answer
answer = await self.pcs[call_number]["pc"].createAnswer()
await self.pcs[call_number]["pc"].setLocalDescription(answer)
return web.Response(content_type="application/json", text=json.dumps(
{"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
"type": self.pcs[call_number]["pc"].localDescription.type}))
else:
return self.reject_offer(call_number, peer_connection)
else:
return self.reject_offer(call_number, peer_connection)
except:
error = traceback.format_exc()
self.to_emitter.send({"type": "error", "error_message": error})
stop_peer_connection:
async def stop_peer_connection(self, call_number, uid):
try:
if call_number not in self.pcs:
# Peer connection does not exist
return None
if self.pcs[call_number]["stop_in_progress"]:
# Stop process is already in progress
return None
self.pcs[call_number]["stop_in_progress"] = True
# 1. Notify PyQt5 about the stop
self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
# 2. Empty the correct queue
queue = self.call_queues[call_number - 1]
while not queue.empty():
queue.get()
# 4. Close data channel
try:
self.pcs[call_number]["dc"].close()
except Exception:
pass
# 5. Stop client audio track
try:
await self.pcs[call_number]["audio_blackhole"].stop()
except Exception:
pass
# 6. Notify PyQt5 about call status
self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
# 7. Stop client video track
try:
self.pcs[call_number]["video_track_for_local_use"].stop()
await self.pcs[call_number]["video_blackhole"].stop()
except Exception:
pass
# 9. Release server resources if there are no remaining connections
if len(self.pcs.keys()) == 1: # No active peer connections
# Stop video relay and related blackholes
try:
if self.server_video_blackholde:
await self.server_video_blackholde.stop()
self.server_video_blackholde = None
if self.server_video_track:
await self.server_video_track.stop()
self.server_video_track = None
except Exception:
pass
# Stop server audio stream
try:
if self.server_audio_stream_offer:
self.server_audio_stream_offer.stop()
self.server_audio_stream_offer = None
except Exception:
pass
# Stop and release the webcam
try:
if self.webcam:
if self.webcam.video:
self.webcam.video.stop()
await asyncio.sleep(0.1) # Give time for cleanup
self.webcam = None
except Exception as e:
self.to_emitter.send({"type": "error", "error_message": str(e)})
# Attempt final resource cleanup
gc.collect() # Force garbage collection to release resources
self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})
# 3. Close peer connection
try:
await self.pcs[call_number]["pc"].close()
print('Peer connection closed correctly!!!')
except Exception:
print(traceback.format_exc())
# 8. Cleanup
try:
del self.pcs[call_number]
except Exception:
pass
except Exception:
error = traceback.format_exc()
print(error)
self.to_emitter.send({"type": "error", "error_message": error})
The problem with this code is that when client is disconnected, so it's send disconnect message from dc, then stop_peer_connection async method is runned, then this message:
print('Peer connection closed correctly!!!')
never printed because await self.pcs[call_number]["pc"].close()
never returns
Upvotes: 0
Views: 46