Reputation: 1
So, I have a simple python script which aims at detecting syn flood attacks in a virtual network environment called mininet using scapy.
from scapy.all import *
from collections import defaultdict
import threading
import time
import sys
# Data structures
syn_to_synack = defaultdict(lambda: [0, 0]) # SYN to SYN-ACK ratio tracker
blocked_ips = set() # Blocked IPs
# Thresholds
SYN_SYNACK_RATIO_THRESHOLD = 3 # Lower ratio for easier detection during testing
CHECK_INTERVAL = 1 # Interval in seconds to check thresholds
def monitor_packets(packet):
"""
Function to process sniffed packets and update tracking data.
"""
global blocked_ips
if packet.haslayer(TCP):
tcp = packet[TCP]
ip = packet[IP]
# If the source IP is blocked, drop the packet
if ip.src in blocked_ips:
print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True)
return
# Track SYN packets
if tcp.flags == "S":
syn_to_synack[(ip.src, tcp.dport)][0] += 1 # Increment SYN count
time.sleep(0.01)
print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
# Track SYN-ACK packets
if tcp.flags == "SA":
syn_to_synack[(ip.src, tcp.sport)][1] += 1 # Increment SYN-ACK count
time.sleep(0.01) # Artificial delay for debugging
print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
def sniff_packets():
"""
Function to run sniffing in a separate thread.
"""
print("[*] Starting packet sniffing...", flush=True)
sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1)
def check_thresholds():
"""
Function to check thresholds for SYN flood detection and block offending IPs.
"""
global blocked_ips
print(f"[info] Checking thresholds...", flush=True)
print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr)
print(f"[info] Current blocked IPs: {blocked_ips}", flush=True)
# Check SYN to SYN-ACK ratio
for key, counts in syn_to_synack.items():
syn_count, synack_count = counts
print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True)
if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD:
print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True)
src_ip = key[0] # Extract the source IP from the key
print(f"[info] src_ip extracted: {src_ip}", flush=True)
print(f"[info] Checking if {src_ip} is already blocked...", flush=True)
if src_ip not in blocked_ips: # Block the source IP based on the ratio
print(f"[ACTION] Blocking IP: {src_ip}", flush=True)
blocked_ips.add(src_ip) # Block the IP
time.sleep(0.1)
else:
print(f"[info] {src_ip} is already blocked.", flush=True)
if __name__ == "__main__":
print("[*] Starting SYN flood detection with enhanced debugging...", flush=True)
try:
# Start sniffing in a separate thread
sniff_thread = threading.Thread(target=sniff_packets, daemon=True)
sniff_thread.start()
# Run the threshold checking loop
while True:
time.sleep(CHECK_INTERVAL)
print("[info] running threshold loop...", flush=True)
check_thresholds()
except KeyboardInterrupt:
print("\n[!] Stopping SYN flood detection.", flush=True)
This script is supposed to run in the router, sniffing tcp packets and monitoring the syn/syn-ack ratio to flag those attacks, then blocking the bad source ips. I have tried:
[*] Starting SYN flood detection with dual-flag logic...
[INFO] SYN from 192.168.1.2 to 192.168.2.2:12345
...Message keeps showing, until this message starts to show
[ALERT] High SYN rate detected for 192.168.1.2 (29 SYNs/sec)
...This message keeps showing until I terminate the program
[!] Stopping SYN flood detection.
This is a fairly simple script, but I have been failing to understand why some messages get printed and some don't. I understand it might be a logical error, but then why some other messages that come before those don't get printed either way?
Upvotes: 0
Views: 20
Reputation: 5421
The issue is that you have two threads that are both writing to the same output (stdout). This leads to race conditions where the messages mix together.
What you could to fix this is to have only one thread write to stdout. You could put your while True:
in a separate thread and replace the calls to print() by something that adds the content to a queue. Then have your main thread read from the queue and print it.
import queue
OUTOUT = queue.Queue()
def log(x):
OUTPUT.put(x)
[... Your code replacing print() by log()]
if __name__ == "__main__":
[Start thread 1]
[Start thread 2]
while True:
msg = OUTPUT.get()
print(msg)
Another option (dirtier) would be to use:
os.write(1, b"The data you want to print")
Which in my experience works better in multi threaded environments.
Upvotes: 1