Reputation: 11
my application requires me to detect multicast streams and react to them. Defining the sockets and igmp is fine but once the stream(s) ends , the select seems to stay active for 60 seconds.
readable, _, _ = select.select(sockets, [], [], 0)
I've tried to zero out the timeouts I know about but to no affect.
===============================
import socket
import struct
import time
import select
multicast_addresses = {'224.0.1.10': [50002, 50003,50010,50024]}
sockets = []
active_ports=[]
active_MC ={
"50002":[False],"50003":[False],
"50010":[False],"50024":[False]
}
for multicast_address, ports in multicast_addresses.items():
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((multicast_address, port))
group = socket.inet_aton(multicast_address)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
#sock.setblocking(False)
sockets.append(sock)
#print (sockets)
while True:
# List of sockets to monitor for read events
readable, _, _ = select.select(sockets, [], [], 0) # Timeout set to 1 second
print("just after readable", readable)
# If any of the multicast sockets are in the readable list, it means there's incoming data
#print(f'the lenght of readable : {len(readable)} {readable}')
#print(readable,end='/r')
#print (f"streams dected = {len(readable)}")
#print(active_MC)
port_check = '50003'
length = len(readable)
pass
for sock in readable: # Create List of active ports
port = sock.getsockname()[1] # get the name of one of the ports
active_ports.append(port)
print(f"active_ports array = {active_ports} type={type(active_ports)} {type(active_ports[0])}")
#note active_ports get stored as integers
#print("After For loop acive Ports:",active_ports)
#for port_check in active_MC.keys(): # loop through all ports defined
#print ("Ports For loop read ",port_check,type(port_check))
#print(f"Array Port_check = {port_check} ")
# port_check is a str
if int(port_check) in active_ports : # stream is active if true
#print(f"Vars in first test port_check= {port_check} active_MC = {active_MC[str(port_check)][0]}")
if active_MC[port_check][0] == False :# First time detected
print(f"stream started on port: {port_check}\n\n")
active_MC[port_check][0] = True
print("MAIN: ",active_MC)
else:
# True Events may be ignored or Not
# thinking I may need to time duration of stream
#print("Failed First Test")
pass
else:
print("main if statement was false\n",port_check,active_MC[port_check][0])
# here Port is NOT an active stream so set to false and log
if active_MC[port_check][0] == True :# Stream was live but no longer
print(f"stream STOPPED on port: {port_check}")
active_MC[port_check][0] = False
print("POST: ",active_MC)
else:
print("POST if statement was false\n")
# True Events may be ignored or Not
# thinking I may need to time duration of stream
pass
active_ports.clear() # clear the array for the next text
time.sleep(1)
In a previous generation of this code I setup 10 different threads each listening to one unique socket and using the recvfrom to determine if I have traffic or not. I'm pretty sure that caused a memory leak because I had crashes every 38 days from a reboot (almost exactly) once that was implemented. I figured a more efficient way is required.
Upvotes: 0
Views: 13