Chris P
Chris P

Reputation: 2357

Cannot close correctly aiortc RTCPeerConnection after client disconnect

offer:


    async def offer(self, request):
        try:
            params = await request.json()

            peer_connection = {
                "name": params["name"],
                "surname": params["surname"],
                "pc": None,
                "is_closed": False,
                "dc": None,
                "uid": uuid.uuid4(),
                "audio_track": None,
                "audio_track_for_local_use": None,
                "audio_blackhole": None,
                "video_track": None,
                "video_track_for_local_use": None,
                "video_blackhole": None,
                "offer_in_progress": True,
                "call_answered": False,
                "manage_call_end_thread": None,
                "stop_in_progress": False,
                "call_number": None
            }

            if len(list(self.pcs.keys())) == 3:
                return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))

            reserved_call_numbers = list(self.pcs.keys())

            if 1 not in reserved_call_numbers:
                call_number = 1
            elif 2 not in reserved_call_numbers:
                call_number = 2
            else:
                call_number = 3

            peer_connection["call_number"] = call_number
            self.pcs[call_number] = peer_connection

            self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})

            timer = 0
            while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
                if request.transport is None or request.transport.is_closing():
                    self.to_emitter.send({"type":"transport-error","call-number":call_number})
                    try:
                        request.transport.close()
                    except:
                        pass
                    del self.pcs[call_number]
                    return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
                timer += 0.1
                await asyncio.sleep(0.1)

            self.pcs[call_number]["call_answered"] = True
            if self.queue.qsize() == 0:
                return self.reject_offer(call_number,peer_connection)
            else:
                data = self.queue.get()
                correct_types = ["call-1","call-2","call-3"]
                if data["type"] in correct_types:
                    if (data["call"] == "reject"):
                        return self.reject_offer(call_number, peer_connection)
                    elif (data["call"] == "answer"):
                        while not self.queue.empty():
                            self.queue.get()

                        self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))

                        @self.pcs[call_number]["pc"].on("iceconnectionstatechange")
                        async def on_ice_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"ICE connection state: {pc.iceConnectionState}")

                            if pc.iceConnectionState == "failed":
                                print("ICE connection failed. Attempting to restart ICE.")
                                await pc.restartIce()
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])


                            if pc.iceConnectionState == "disconnected":
                                # Wait before handling disconnection
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("connectionstatechange")
                        async def on_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"Connection state: {pc.connectionState}")
                            if pc.connectionState in ["failed", "disconnected", "closed"]:
                                print("Connection failed, disconnected, or closed. Stopping connection.")
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("datachannel")
                        async def on_datachannel(channel):
                            self.pcs[call_number]["dc"] = channel

                            # Send UID to the connecting peer
                            try:
                                channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
                            except Exception as e:
                                print(f"Error sending UID: {e}")

                            # Inform about other peers
                            try:
                                for uid, pc in self.pcs.items():
                                    if pc['uid'] != peer_connection['uid']:
                                        try:
                                            pc['dc'].send(json.dumps({
                                                "type": "other-uid",
                                                "uid": str(peer_connection['uid']),
                                                "name": peer_connection["name"],
                                                "surname": peer_connection["surname"]
                                            }))
                                        except Exception as e:
                                            print(f"Error sending other-uid message: {e}")
                            except Exception as e:
                                print(f"Error informing about other peers: {e}")

                            @channel.on("message")
                            async def on_message(message):
                                try:
                                    message = json.loads(message)
                                    msg_type = message.get("type")
                                    if msg_type == "disconnected":
                                        print('disconnected message from client')
                                        await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
                                    elif msg_type in ["offer", "answer", "ice-candidate"]:
                                        target_uid = message["to_uid"]
                                        for uid, pc in self.pcs.items():
                                            if str(pc['uid']) == target_uid:
                                                try:
                                                    pc["dc"].send(json.dumps(message))
                                                except Exception as e:
                                                    print(f"Error relaying {msg_type}: {e}")
                                    else:
                                        print(f"Unhandled message type: {msg_type}")
                                except Exception as e:
                                    print(f"Error handling message: {e}")

                            @channel.on("close")
                            async def on_close():
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        # audio from server to client
                        if self.server_audio_stream_offer == None:
                            self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
                            #self.server_audio_stream_offer = AudioStreamTrack()

                        self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)

                        # video from server to client
                        if self.server_video_stream_offer is None:
                            self.server_video_stream_offer = self.create_local_tracks()
                        self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)

                        # Attach video from server to QLabel
                        if self.server_video_track is None:
                            self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
                        if self.server_video_blackholde is None:
                            self.server_video_blackholde = MediaBlackhole()
                            self.server_video_blackholde.addTrack(self.server_video_track)
                            await self.server_video_blackholde.start()

                        @self.pcs[call_number]["pc"].on("track")
                        async def on_track(track):
                            if track.kind == "audio":
                                self.pcs[call_number]["audio_track"] = track
                                # audio from client (server use)
                                if call_number == 1:
                                    correct_queue = self.ip_call_1_packet_queue
                                elif call_number == 2:
                                    correct_queue = self.ip_call_2_packet_queue
                                else:
                                    correct_queue = self.ip_call_3_packet_queue
                                self.put_to_q = True
                                self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
                                self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["audio_blackhole"].addTrack(
                                    self.pcs[call_number]["audio_track_for_local_use"])
                                await self.pcs[call_number]["audio_blackhole"].start()
                            else:
                                self.pcs[call_number]["video_track"] = track
                                # video from client (server use)
                                self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
                                                                                                     call_number, self)
                                self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["video_blackhole"].addTrack(
                                    self.pcs[call_number]["video_track_for_local_use"])
                                await self.pcs[call_number]["video_blackhole"].start()

                        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

                        # handle offer
                        await self.pcs[call_number]["pc"].setRemoteDescription(offer)

                        # send answer
                        answer = await self.pcs[call_number]["pc"].createAnswer()
                        await self.pcs[call_number]["pc"].setLocalDescription(answer)

                        return web.Response(content_type="application/json", text=json.dumps(
                            {"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
                             "type": self.pcs[call_number]["pc"].localDescription.type}))
                    else:
                        return self.reject_offer(call_number, peer_connection)
                else:
                    return self.reject_offer(call_number, peer_connection)
        except:
            error = traceback.format_exc()
            self.to_emitter.send({"type": "error", "error_message": error})

stop_peer_connection:

    async def stop_peer_connection(self, call_number, uid):
        try:
            if call_number not in self.pcs:
                # Peer connection does not exist
                return None
            if self.pcs[call_number]["stop_in_progress"]:
                # Stop process is already in progress
                return None
            self.pcs[call_number]["stop_in_progress"] = True
            # 1. Notify PyQt5 about the stop
            self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
            # 2. Empty the correct queue
            queue = self.call_queues[call_number - 1]
            while not queue.empty():
                queue.get()
            # 4. Close data channel
            try:
                self.pcs[call_number]["dc"].close()
            except Exception:
                pass
            # 5. Stop client audio track
            try:
                await self.pcs[call_number]["audio_blackhole"].stop()
            except Exception:
                pass
            # 6. Notify PyQt5 about call status
            self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
            # 7. Stop client video track
            try:
                self.pcs[call_number]["video_track_for_local_use"].stop()
                await self.pcs[call_number]["video_blackhole"].stop()
            except Exception:
                pass
            # 9. Release server resources if there are no remaining connections
            if len(self.pcs.keys()) == 1:  # No active peer connections
                # Stop video relay and related blackholes
                try:
                    if self.server_video_blackholde:
                        await self.server_video_blackholde.stop()
                        self.server_video_blackholde = None
                    if self.server_video_track:
                        await self.server_video_track.stop()
                        self.server_video_track = None
                except Exception:
                    pass

                # Stop server audio stream
                try:
                    if self.server_audio_stream_offer:
                        self.server_audio_stream_offer.stop()
                        self.server_audio_stream_offer = None
                except Exception:
                    pass

                # Stop and release the webcam
                try:
                    if self.webcam:
                        if self.webcam.video:
                            self.webcam.video.stop()
                        await asyncio.sleep(0.1)  # Give time for cleanup
                        self.webcam = None
                except Exception as e:
                    self.to_emitter.send({"type": "error", "error_message": str(e)})

                # Attempt final resource cleanup
                gc.collect()  # Force garbage collection to release resources
                self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})

            # 3. Close peer connection
            try:
                await self.pcs[call_number]["pc"].close()
                print('Peer connection closed correctly!!!')
            except Exception:
                print(traceback.format_exc())
            # 8. Cleanup
            try:
                del self.pcs[call_number]
            except Exception:
                pass
        except Exception:
            error = traceback.format_exc()
            print(error)
            self.to_emitter.send({"type": "error", "error_message": error})

The problem with this code is that when client is disconnected, so it's send disconnect message from dc, then stop_peer_connection async method is runned, then this message:

print('Peer connection closed correctly!!!')

never printed because await self.pcs[call_number]["pc"].close() never returns

Upvotes: 0

Views: 46

Answers (0)

Related Questions