Source code for idstools.packet

# Copyright (c) 2014 Jason Ish
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""Provides basic packet decoding."""

import struct
import socket

from idstools import util

ETHERTYPE_IP = 0x0800
ETHERTYPE_IP6 = 0x86dd

IPPROTO_ICMP = 1
IPPROTO_TCP = 6
IPPROTO_UDP = 17
IPPROTO_ICMPV6 = 58

ETHER_HDR_LEN = 14
IP_HDR_LEN = 20
IP6_HDR_LEN = 40
ICMP4_HDR_LEN = 4
ICMP6_HDR_LEN = 4
UDP_HDR_LEN = 8
TCP_HDR_LEN = 20

# IPv6 Extension Headers
IP6_EXT_HOP_BY_HOP = 0
IP6_EXT_DEST_OPTS = 60
IP6_EXT_ROUTING = 43
IP6_EXT_FRAGMENT = 44
IP6_EXT_AH = 51
IP6_EXT_ESP = 50
IP6_EXT_MOBILITY = 135

IP6_EXT_HEADER_TYPES = [
    IP6_EXT_HOP_BY_HOP,
    IP6_EXT_DEST_OPTS,
    IP6_EXT_ROUTING,
    IP6_EXT_FRAGMENT,
    IP6_EXT_AH,
    IP6_EXT_ESP,
    IP6_EXT_MOBILITY,
]

[docs]def printable_ethernet_addr(addr): """Return a formatted ethernet address from its raw form.""" return ":".join(["%02x" % (x) for x in struct.unpack("BBBBBB", addr)])
[docs]def decode_icmp(pkt): """ Decode an ICMP packet. """ icmp = {} (icmp["icmp_type"], icmp["icmp_code"], icmp["icmp_chksum"]) = struct.unpack(">BBH", pkt[0:ICMP4_HDR_LEN]) icmp["icmp_payload"] = pkt[ICMP4_HDR_LEN:] return icmp
[docs]def decode_icmp6(pkt): """ Decode an ICMPv6 packet. """ icmp = {} (icmp["icmp_type"], icmp["icmp_code"], icmp["icmp_chksum"]) = struct.unpack(">BBH", pkt[0:ICMP4_HDR_LEN]) icmp["icmp_payload"] = pkt[ICMP6_HDR_LEN:] return icmp
[docs]def decode_udp(pkt): """Decode a UDP packet.""" udp = {} (udp["udp_sport"], udp["udp_dport"], udp["udp_length"], udp["udp_chksum"]) = struct.unpack(">HHHH", pkt[0:UDP_HDR_LEN]) udp["udp_payload"] = pkt[UDP_HDR_LEN:] return udp
[docs]def decode_tcp(pkt): """Decode a TCP packet.""" tcp = {} (tcp["tcp_sport"], tcp["tcp_dport"], tcp["tcp_seq"], tcp["tcp_ack"], tcp["tcp_flags"], tcp["tcp_window"], tcp["tcp_chksum"], tcp["tcp_urgptr"]) = struct.unpack(">HHLLHHHH", pkt[0:TCP_HDR_LEN]) tcp["tcp_offset"] = tcp["tcp_flags"] >> 12 tcp["tcp_flags"] = tcp["tcp_flags"] & 0x1f data_offset = tcp["tcp_offset"] * 4 if data_offset > TCP_HDR_LEN: tcp["tcp_options_raw"] = pkt[TCP_HDR_LEN:data_offset] tcp["tcp_payload"] = pkt[data_offset:] return tcp
[docs]def decode_ip(pkt): """Decode an IP packet.""" ip = {} (ip["ip_version"], ip["ip_dscp"], ip["ip_length"], ip["ip_id"], ip["ip_offset"], ip["ip_ttl"], ip["ip_protocol"], ip["ip_chksum"], ip["ip_source"], ip["ip_destination"]) = struct.unpack(">BBHHHBBH4s4s", pkt[0:IP_HDR_LEN]) ip["ip_ihl"] = ip["ip_version"] & 0xf ip["ip_version"] = ip["ip_version"] >> 4 ip["ip_flags"] = ip["ip_offset"] >> 13 ip["ip_offset"] = ip["ip_offset"] & 0x1fff ip["ip_source"] = socket.inet_ntoa(ip["ip_source"]) ip["ip_destination"] = socket.inet_ntoa(ip["ip_destination"]) ihl = ip["ip_ihl"] * 4 if ihl > IP_HDR_LEN: ip["ip_options_raw"] = pkt[IP_HDR_LEN:ihl] if ip["ip_protocol"] == IPPROTO_ICMP: icmp = decode_icmp(pkt[ihl:]) ip.update(icmp) elif ip["ip_protocol"] == IPPROTO_UDP: udp = decode_udp(pkt[ihl:]) ip.update(udp) elif ip["ip_protocol"] == IPPROTO_TCP: tcp = decode_tcp(pkt[ihl:]) ip.update(tcp) return ip
[docs]def decode_ip6(pkt): """Decode an IPv6 packet.""" ip6 = {} (ip6["ip6_label"], ip6["ip6_length"], ip6["ip6_nh"], ip6["ip6_hop_limit"], ip6["ip6_source_raw"], ip6["ip6_destination_raw"]) = struct.unpack( ">LHBB16s16s", pkt[0:IP6_HDR_LEN]) ip6["ip6_version"] = ip6["ip6_label"] >> 28 ip6["ip6_class"] = (ip6["ip6_label"] >> 20) & 0xff ip6["ip6_label"] = ip6["ip6_label"] & 0xfffff ip6["ip6_source"] = util.decode_inet_addr(ip6["ip6_source_raw"]) ip6["ip6_destination"] = util.decode_inet_addr(ip6["ip6_destination_raw"]) offset = IP6_HDR_LEN # Skip over known extension headers. while True: if ip6["ip6_nh"] in IP6_EXT_HEADER_TYPES: ip6["ip6_nh"], ext_len = struct.unpack(">BB", pkt[offset:offset+2]) offset += 8 + (ext_len * 8) else: break if ip6["ip6_nh"] == IPPROTO_UDP: ip6.update(decode_udp(pkt[offset:])) elif ip6["ip6_nh"] == IPPROTO_TCP: ip6.update(decode_tcp(pkt[offset:])) elif ip6["ip6_nh"] == IPPROTO_ICMPV6: ip6.update(decode_icmp6(pkt[offset:])) return ip6
[docs]def decode_ethernet(pkt): """Decode an ethernet packet.""" ether = {} ether["ether_dst"], ether["ether_src"], ether["ether_type"] = struct.unpack( ">6s6sH", pkt[0:ETHER_HDR_LEN]) ether["ether_dst"] = printable_ethernet_addr(ether["ether_dst"]) ether["ether_src"] = printable_ethernet_addr(ether["ether_src"]) if ether["ether_type"] == ETHERTYPE_IP: ether.update(decode_ip(pkt[ETHER_HDR_LEN:])) elif ether["ether_type"] == ETHERTYPE_IP6: ether.update(decode_ip6(pkt[ETHER_HDR_LEN:])) return ether