Reputation: 1
I am currently working on securing IPv6 packet fragments using a custom security header that contains cryptographic information, such as a MAC and nonce. Here is the overall process I am following:
Fragmentation Process: 1. Break the Packet into Fragments: The packet is split based on the network's MTU. Each fragment includes:
a) Standard IPv6 header Standard Fragment Header
b) A custom Security Header containing: - MAC: Ensures payload integrity. - Nonce: Unique identifier for each packet. - Security Type: Flag indicating encryption or authentication.
2. Attach the Headers and Send Fragments:
Each fragment includes the custom Security Header (serialized as JSON) and the payload.
```
Reassembly Process: 1.Fragments are received in no specific order. 2. Each fragment's custom Security Header is parsed and verified: - MAC verification using HMAC-SHA256. - Nonce consistency check to ensure fragments belong to the same packet. 3.Once all fragments are verified, they are buffered, sorted, and reassembled.
```
The Issue: During reassembly, I am unable to decode the custom Security Header properly. The reassembly logic fails when parsing the header from the fragment payload.
Fragmentation logic:
from scapy.all import *
import hashlib
import hmac
import json
import os
# Function to generate a MAC for fragment integrity
def generate_mac(data, key):
"""Generate HMAC-SHA256 for integrity."""
return hmac.new(key.encode(), data, hashlib.sha256).hexdigest()
# Function to create a custom IPv6 Security Extension Header (serialized as JSON)
def create_security_header(nonce, mac, encryption_flag):
"""Create a custom security header and serialize it."""
header = {
"Next Header": 44, # Fragment Header
"Header Length": 4, # Example fixed size (4 = 32 bytes)
"Security Type": 1, # 1 = MAC-based integrity
"Encryption Flag": encryption_flag,
"Nonce": nonce,
"MAC": mac
}
return header # Return the header dictionary
# Function to fragment an IPv6 packet and save the fragments to a PCAP file
def fragment_packet(packet, max_size, filename, key="secretkey"):
"""
Fragment the packet, attach security headers, and save to PCAP.
:param packet: The IPv6 packet to fragment.
:param max_size: The MTU size for fragmentation.
:param filename: The name of the .pcap file to save fragments.
:param key: The key used for MAC generation.
:return: List of fragments.
"""
fragments = []
fragment_size = max_size - 40 # IPv6 header size is 40 bytes
payload = bytes(packet[Raw]) if Raw in packet else b""
total_payload_len = len(payload)
nonce = os.urandom(4).hex() # Generate a 32-bit random nonce
for i in range(0, total_payload_len, fragment_size):
fragment_payload = payload[i:i + fragment_size]
# Generate MAC for this fragment
mac = generate_mac(fragment_payload, key)
# Create the custom security header and serialize it to JSON
sec_header = create_security_header(nonce, mac, encryption_flag=0)
# Serialize the security header as JSON and ensure proper encoding
security_header_json = json.dumps(sec_header)
# Create the fragment header
frag_header = IPv6(dst=packet[IPv6].dst) / \
IPv6ExtHdrFragment(id=12345, offset=i // 8,
m=1 if i + fragment_size < total_payload_len else 0)
# Embed the serialized JSON security header and the fragment payload together
frag_with_sec = frag_header / Raw(load=security_header_json.encode() + fragment_payload)
fragments.append(frag_with_sec)
# Save all fragments to a PCAP file
wrpcap(filename, fragments)
print(f"Fragments saved to {filename}")
return fragments
# Example usage
if __name__ == "__main__":
# Create a large IPv6 packet
large_packet = IPv6(dst="2001:db8::1") / UDP(sport=1234, dport=80) / Raw(load="A" * 3000)
# Fragment the packet and save to PCAP
max_mtu = 1280 # IPv6 MTU
fragments = fragment_packet(large_packet, max_mtu, "fragments.pcap")
# Optionally, send the fragments
print("Sending fragments...")
for frag in fragments:
frag.show()
send(frag)
Reassembly logic:
from scapy.all import *
import hashlib
import hmac
import json
# Function to verify the MAC of a fragment
def verify_mac(fragment_payload, key, received_mac):
"""Verify the MAC of a fragment's payload."""
calculated_mac = hmac.new(key.encode(), fragment_payload, hashlib.sha256).hexdigest()
return calculated_mac == received_mac
def parse_security_header(sec_header_raw):
"""Parse the security header, which is expected to be in JSON byte format"""
if not sec_header_raw:
print("Error: Security header is empty or malformed.")
return None
try:
# Inspecting the raw security header data before parsing
print(f"Raw security header data: {sec_header_raw[:256]}") # Inspect the first 256 bytes
return json.loads(sec_header_raw.decode()) # Convert the byte string back to a dictionary
except json.JSONDecodeError as e:
print(f"Error: Failed to decode security header. {e}")
return None
# Function to reassemble fragments
def reassemble_fragments(fragments, key="secretkey"):
"""Reassemble fragments into the original payload."""
fragments.sort(key=lambda frag: frag[IPv6ExtHdrFragment].offset) # Sort by offset
payload = b"" # Placeholder for reassembled payload
nonce = None # Store the nonce for consistency check
for frag in fragments:
# Extract the raw payload and security header (the first part of the fragment)
raw_payload = bytes(frag[Raw]) # Extract the raw payload from the fragment
# Extract the security header (first part of the raw payload)
header_len = len(json.dumps({"Next Header": 44, "Header Length": 4, "Security Type": 1, "Encryption Flag": 0}).encode())
sec_header = parse_security_header(raw_payload[:header_len]) # Adjusted to handle proper header length
fragment_payload = raw_payload[header_len:] # The actual fragment payload starts after the security header
# Ensure security header is valid
if sec_header is None:
print(f"Invalid security header for fragment with offset {frag[IPv6ExtHdrFragment].offset}")
return None
# Verify MAC
if not verify_mac(fragment_payload, key, sec_header["MAC"]):
print(f"MAC verification failed for fragment with offset {frag[IPv6ExtHdrFragment].offset}")
return None
# Check nonce (ensure all fragments belong to the same packet)
if nonce is None:
nonce = sec_header["Nonce"] # Set nonce from the first fragment
elif nonce != sec_header["Nonce"]:
print(f"Nonce mismatch for fragment with offset {frag[IPv6ExtHdrFragment].offset}")
return None
# Append fragment payload to the full payload
payload += fragment_payload
return payload
# Example usage
if __name__ == "__main__":
# Simulate receiving fragments
fragments = rdpcap("fragments.pcap") # Read the fragments from the PCAP file
# Reassemble fragments
print("Reassembling fragments...")
reassembled_payload = reassemble_fragments(fragments, key="secretkey")
if reassembled_payload:
print("Packet reassembled successfully:")
print(reassembled_payload.decode()) # Decode and print the reassembled packet payload
else:
print("Failed to reassemble packet.")
Things I have tried:
Inspecting the Raw Security Header: I used print(sec_header_raw[:256]) to examine the data. It seems there is extra padding or misalignment.
Header Length Calculations: I set the expected header size explicitly to match the serialized JSON size.
JSON Decoding: The error occurs during json.loads() when parsing the security header. It appears the raw payload contains unexpected bytes.
Questions
Upvotes: 0
Views: 28