#!/usr/bin/env python ################################################################################ # # # IPv6 OS Detection Test Result Analysis Tool # # # # # # Luis MartinGarcia # # {luis.mgarc@gmail.com} # # # ################################################################################ import getopt import sys from scapy.all import * import string import os import glob #################### # GLOBAL VARIABLES # #################### path_to_dir_g=None results_g=None # List of HostResults objects result_file_extension_g=None # Extension of result files debug_g=False # Debug output enabled? ############################### # CONSTANTS AND DEFAULT VALUE # ############################### DEFAULT_RESULT_FILE_EXTENSION='6fp' DEFAULT_SEARCH_PATH="./" ##################### # CLASS DEFINITIONS # ##################### class HostResults: hostaddr6='' # Host IPv6 Addr hostaddr4='' # Host IPv4 Addr timeout=0 # Response Timeout retries=0 # Test Retransmissions interface='' # Network Interface delay=0 # Inter-test Delay debug=False # Test results have debug info? ostype='' # Target OS type ossubtype='' # Target OS sub-type osversion='' # Target OS version testdate='' # Date when the results where gathered test6_ids=() # IPv6 test identifiers test4_ids=() # IPv4 test identifiers result_vector6=[] # Vector that indicates which tests received response result_vector4=[] # Vector that indicates which tests received response probe_sent_pkts6=() # Tuple of test probes (as IPv6 objects) probe_sent_raw6=() # Tuple IPv6 test probes (as a raw hex buff) probe_sent_pkts4=() # Tuple of test probes (as IPv6 objects) probe_sent_raw4=() # Tuple IPv6 test probes (as a raw hex buff) probe_result_pkts6=() # Tuple of test responses (as IPv6 objects) probe_result_pkts6_more=() # Stores rest of packets when more than one packet was received for a given test probe_result_pkts4=() # Tuple of test responses (as IPv4 objects) probe_result_raw6=() # Tuple IPv6 test responses (as a raw hex buff) probe_result_raw6_more=() # Stores rest of packets when more than one packet was received for a given test probe_result_raw4=() # Tuple IPv4 test responses (as a raw hex buff) timed_results_pkts6=() # Results for special, time dependent tests (as IPv6 objects) timed_results_raw6=() # Results for special, IPv6 time dependent tests (as a raw hex buff) timed_results_pkts4=() # Results for special, time dependent tests (as IPv4 objects) timed_results_raw4=() # Results for special, IPv4 time dependent tests (as a raw hex buff) # This function processes the supplied results file and stores the # information in the object's attributes. def parse_results_file(self, results_file): # Open results file try: o = open(results_file,"r") except: print "ERROR: File \"" + results_file+ "\" does not exist" sys.exit(1) # Extract relevant information for line in o : if string.find(line, "#PARSE#")!=-1 : # Line cleanup line=line.replace("#PARSE# ", '') line=line.replace("\r",'') line=line.replace("\n",'') # Target Host IPv6 Address param="hostaddr6=" if string.find(line, param )!=-1 : self.hostaddr6=line[len(param):] print_debug("[+] Parsed " + param + self.hostaddr6) continue # Target Host IPv4 Address param="hostaddr4=" if string.find(line, param )!=-1 : self.hostaddr4=line[len(param):] print_debug("[+] Parsed " + param + self.hostaddr4) continue # Response Timeout param="timeout=" if string.find(line, param )!=-1 : self.timeout=int(line[len(param):]) print_debug("[+] Parsed " + param + str(self.timeout)) continue # Test Retransmissions param="retries=" if string.find(line, param )!=-1 : self.retries=int(line[len(param):]) print_debug("[+] Parsed " + param + str(self.retries)) continue # Network Interface param="interface=" if string.find(line, param )!=-1 : self.interface=line[len(param):] print_debug("[+] Parsed " + param + str(self.interface)) continue # Inter-test Delay param="delay=" if string.find(line, param )!=-1 : self.delay=int(line[len(param):]) print_debug("[+] Parsed " + param + str(self.delay)) continue # Test results have debug info? param="debug=" if string.find(line, param )!=-1 : if line[len(param):]=='False' : self.debug=False else: self.debug=True print_debug("[+] Parsed " + param + str(self.debug)) continue # Test result for IPv6 param="result6=" if string.find(line, param)!=-1: line=line[len(param):] # Extract test number test_no=int(line[1:line.find(',')]) # Extract packet number myline=line[line.find(',')+1:-1] pktno=int(myline[:myline.find(',')]) # Extract byte vector byte_vector=myline[myline.find(',')+1:] if len(byte_vector) > 0 : # We got resposne print_debug("[+] Parsed result6={test="+str(test_no)+ ", pkt=" + str(pktno) + " vector=" + byte_vector[:5]+" [...] "+byte_vector[-5:]+"}") mypkt=result_to_packet(byte_vector, 6) # Only store the first response in the probe_results tuple if pktno==0 : self.probe_result_pkts6 = self.probe_result_pkts6 + (mypkt,) myrawhex=result_to_hex(byte_vector) self.probe_result_raw6= self.probe_result_raw6 + (myrawhex,) self.probe_result_pkts6_more = self.probe_result_pkts6_more + ((test_no,mypkt),) myrawhex=result_to_hex(byte_vector) self.probe_result_raw6_more = self.probe_result_raw6_more + ((test_no, myrawhex),) else: print_debug("[+] Parsed result6={test="+str(test_no)+ ", 0, vector=None}") self.probe_result_pkts6 = self.probe_result_pkts6 + (None,) self.probe_result_raw6= self.probe_result_raw6 + (None,) self.probe_result_pkts6_more = self.probe_result_pkts6_more + ((test_no,None),) self.probe_result_raw6_more= self.probe_result_raw6_more + ((test_no, None),) continue # Test result for IPv4 param="result4=" if string.find(line, param)!=-1: line=line[len(param):] # Extract test number test_no=int(line[1:line.find(',')]) # Extract packet number myline=line[line.find(',')+1:-1] pktno=int(myline[:myline.find(',')]) # Extract byte vector byte_vector=myline[myline.find(',')+1:] if len(byte_vector) > 0 : # We got resposne print_debug("[+] Parsed result4={test="+str(test_no)+ ", pkt=" + str(int(pktno)) + " vector=" + byte_vector[:5]+" [...] "+byte_vector[-5:]+"}") mypkt=result_to_packet(byte_vector, 4) # Only store the first response in the probe_results tuple if pktno==0 : self.probe_result_pkts4=self.probe_result_pkts4 + (mypkt,) myrawhex=result_to_hex(byte_vector) self.probe_result_raw4=self.probe_result_raw4+ (myrawhex,) else: print_debug("[+] Parsed result4={test="+str(test_no)+ ", vector=None}") self.probe_result_pkts4=self.probe_result_pkts4 + (None,) self.probe_result_raw4=self.probe_result_raw4+ (None,) continue # Timed test results for IPv6 param="timed6_result=" if string.find(line, param)!=-1: line=line[len(param):] # Extract test number test_no=int(line[1:line.find(',')]) # Extract byte vector byte_vector=line[line.find(',')+1:-1] if len(byte_vector) > 0 : # We got response print_debug("[+] Parsed timed6_result={test="+str(test_no)+ ", vector=" + byte_vector[:5]+" [...] "+byte_vector[-5:]+"}") mypkt=result_to_packet(byte_vector, 6) self.timed_results_pkts6 = self.timed_results_pkts6 + (mypkt,) myrawhex=result_to_hex(byte_vector) self.timed_results_raw6= self.timed_results_raw6 + (myrawhex,) else: print_debug("[+] Parsed timed6_result={test="+str(test_no)+ ", vector=None}") self.timed_results_pkts6 = self.timed_results_pkts6 + (None,) self.timed_results_raw6= self.timed_results_raw6 + (None,) continue # Timed test results for IPv4 param="timed4_result=" if string.find(line, param)!=-1: line=line[len(param):] # Extract test number test_no=int(line[1:line.find(',')]) # Extract byte vector byte_vector=line[line.find(',')+1:-1] if len(byte_vector) > 0 : # We got response print_debug("[+] Parsed timed4_result={test="+str(test_no)+ ", vector=" + byte_vector[:5]+" [...] "+byte_vector[-5:]+"}") mypkt=result_to_packet(byte_vector, 4) self.timed_results_pkts4 = self.timed_results_pkts4 + (mypkt,) myrawhex=result_to_hex(byte_vector) self.timed_results_raw4= self.timed_results_raw4 + (myrawhex,) else: print_debug("[+] Parsed timed4_result={test="+str(test_no)+ ", vector=None}") self.timed_results_pkts4 = self.timed_results_pkts4 + (None,) self.timed_results_raw4= self.timed_results_raw4 + (None,) continue # Test probe for IPv6 param="sent6=" if string.find(line, param)!=-1: line=line[len(param):] # Extract test number test_no=int(line[1:line.find(',')]) # Extract packet number myline=line[line.find(',')+1:-1] pktno=int(myline[:myline.find(',')]) # Extract byte vector byte_vector=myline[myline.find(',')+1:] if len(byte_vector) > 0 : # We got resposne print_debug("[+] Parsed sent6={test="+str(test_no)+ ", pkt=" + str(pktno) + " vector=" + byte_vector[:5]+" [...] "+byte_vector[-5:]+"}") mypkt=result_to_packet(byte_vector, 6) # Only store the first response in the probe_results tuple if pktno==0 : self.probe_sent_pkts6 = self.probe_sent_pkts6 + (mypkt,) myrawhex=result_to_hex(byte_vector) self.probe_sent_raw6= self.probe_sent_raw6 + (myrawhex,) else: print_debug("[+] Parsed sent6={test="+str(test_no)+ ", 0, vector=None}") self.probe_sent_pkts6 = self.probe_sent_pkts6 + (None,) self.probe_sent_raw6= self.probe_sent_raw6 + (None,) continue # Test probes for IPv4 param="sent4=" if string.find(line, param)!=-1: line=line[len(param):] # Extract test number test_no=int(line[1:line.find(',')]) # Extract packet number myline=line[line.find(',')+1:-1] pktno=int(myline[:myline.find(',')]) # Extract byte vector byte_vector=myline[myline.find(',')+1:] if len(byte_vector) > 0 : # We got resposne print_debug("[+] Parsed sent4={test="+str(test_no)+ ", pkt=" + str(pktno) + " vector=" + byte_vector[:5]+" [...] "+byte_vector[-5:]+"}") mypkt=result_to_packet(byte_vector, 4) # Only store the first response in the probe_results tuple if pktno==0 : self.probe_sent_pkts4 = self.probe_sent_pkts4 + (mypkt,) myrawhex=result_to_hex(byte_vector) self.probe_sent_raw4= self.probe_sent_raw4 + (myrawhex,) else: print_debug("[+] Parsed sent4={test="+str(test_no)+ ", 0, vector=None}") self.probe_sent_pkts4 = self.probe_sent_pkts4 + (None,) self.probe_sent_raw4= self.probe_sent_raw4 + (None,) continue # Result vector IPv6 param="rvector6=" if string.find(line, param )!=-1 : vectorstr= line[len(param):] self.result_vector6=parse_resultvector_str(vectorstr) print_debug("[+] Parsed rvecto6r=" + str(self.result_vector6)) continue # Result vector IPv4 param="rvector4=" if string.find(line, param )!=-1 : vectorstr= line[len(param):] self.result_vector4=parse_resultvector_str(vectorstr) print_debug("[+] Parsed rvector4=" + str(self.result_vector4)) continue # Test ID param="test6_id=" if string.find(line, param )!=-1 : id=line[len(param):] self.test6_ids=self.test6_ids+(id,) print_debug("[+] Parsed test6_id=" + str(id)) continue # Test ID (IPv4) param="test4_id=" if string.find(line, param )!=-1 : id=line[len(param):] self.test4_ids=self.test6_ids+(id,) print_debug("[+] Parsed test4_id=" + str(id)) continue # OS Type param="ostype=" if string.find(line, param )!=-1 : self.ostype=line[len(param):] print_debug("[+] Parsed ostype=" + self.ostype) continue # OS Sub-Type param="ossubtype=" if string.find(line, param )!=-1 : self.ossubtype=line[len(param):] print_debug("[+] Parsed ossubtype=" + self.ossubtype) continue # OS Type param="osversion=" if string.find(line, param )!=-1 : self.osversion=line[len(param):] print_debug("[+] Parsed osversion=" + self.osversion) continue # Test date param="currtime=" if string.find(line, param )!=-1 : self.testdate=line[len(param):] print_debug("[+] Parsed currtime=" + self.testdate) continue o.close() ######################################### # DATA PARSING AND CONVERSION FUNCTIONS # ######################################### # This function takes a string like '60 00 00 00 00 28 3a 40 00 [...]" and # returns an equivalent hex buffer. def result_to_hex(rawhex_str): # Remove whitespace mystr=rawhex_str.replace(' ', '') # Remove \n or \r mystr=mystr.replace('\n', '') mystr=mystr.replace('\r', '') # Convert from string to a hex buffer myhex=mystr.decode('hex') return myhex # This function takes a string like '60 00 00 00 00 28 3a 40 00 [...]" which # represents the raw hex output of an IPv6 datagram and returns an IPv6 object # that is usable in Scapy. Note that the returned object will probably contain # more headers after the IPv6 one (extension headers, transport layer header, # payload, etc. def result_to_packet(rawhex_str, ip_version): # Convert from string to a hex buffer myhex=result_to_hex(rawhex_str) # Initialize an IP packet with the raw hex if ip_version==4: mypkt=IP(myhex) else: mypkt=IPv6(myhex) return mypkt def test_result_to_packet(): a='60 00 00 00 00 28 3a 40 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 '+\ '01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 81 00 35 1f ab cd '+\ '00 01 30 31 32 33 34 35 36 37 38 39 41 42 43 44 45 46 30 31 32 33 34 '+\ '35 36 37 38 39 41 42 43 44 45 46' mypkt=result_to_packet(a, 6) mypkt.show2() # This function takes a string like [0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1]" which # represents a result vector, and returns the equivalent python list object. def parse_resultvector_str(result_vector_str): result_vector=[] # Remove whitespace mystr=result_vector_str.replace(' ', '') # Remove commas mystr=mystr.replace(',', '') # Remove "[" and "]" mystr=mystr.replace('[', '') mystr=mystr.replace(']', '') # Remove \n or \r mystr=mystr.replace('\n', '') mystr=mystr.replace('\r', '') for i in mystr: if i=='0': result_vector.append(0) else : result_vector.append(1) return result_vector def test_parse_resultvector_str(): vectorstr='[0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1]' b=parse_resultvector_str(vectorstr) print vectorstr print b # This function returns the list of file names in the "path" directory, that # have the "ext" extension. If no parameters are passed to it, returns the # list of .txt files in the current directory. def get_file_names_from_dir(path, ext): myfiles=list() for myfile in glob.glob( os.path.join(path, '*.'+ext) ): myfiles.append(myfile) return myfiles # This function parses all result files of a given directory. The parsed results # are returned as a list of HostResults objects. If there was no result file # in the directory, None is returned. def parse_dir_files(path, ext): myfiles=get_file_names_from_dir(path, ext) myresults=list() print "[+] Parsing " + str( len(myfiles) ) + " result", if len(myfiles) > 1 : print "files: ", else : print "file: ", sys.stdout.flush() for i in myfiles: sys.stdout.write("#") sys.stdout.flush() a=HostResults() a.parse_results_file(i) myresults.append(a) print "" if len(myresults)>0 : return myresults else: return None ############################# # STANDARD OUTPUT FUNCTIONS # ############################# def print_usage(f = sys.stdout): print >> f, """\ Usage: %(progname)s [Options] OPTIONS: -h, --help Show this help. -d, --debug Display debug output --path=DIR Process result files in the DIR directory. --ext=ABC Select result files with the ABC file extension """ % { "progname": sys.argv[0] } def print_debug(debug_msg): if( debug_g==True and debug_msg!=None): print debug_msg def print_fatal(fatal_msg): print fatal_msg exit(1) def print_welcome_banner(): print "=================================================================" print "== NMAP IPv6 OS DETECTION RESULT ANALYSIS TOOL ==" print "=================================================================" def print_analysis_intro(test_text): print "=================================================================" print "[+] " + str(test_text) ############################# # RESULT ANALYSIS FUNCTIONS # ############################# # Tests the correctness and consistency of all result files. Returns None on # success and an error string in case of failure def verify_correctness(results): if results==None or len(results)<1 : return "Result vector is empty" elif len(results)==1 : return "Only one result found" # Ensure all rvector6's have the same size rvectlen6=len(results[0].result_vector6) rvectlen4=len(results[0].result_vector4) for host in results: if len(host.result_vector6)!=rvectlen6 : return "IPv6 result vectors have different sizes" if len(host.result_vector4)!=rvectlen4 and len(host.result_vector4)!=0: return "IPv4 result vectors have different sizes" if len(host.probe_result_pkts6)!=rvectlen6 : return "IPv6 result probe vectors have different sizes" if len(host.probe_result_pkts4)!=rvectlen4 and len(host.probe_result_pkts4)!=0: return "IPv4 result probe vectors have different sizes" if len(host.probe_result_raw6)!=rvectlen6 : return "IPv6 result probe vectors raw have different sizes" if len(host.probe_result_raw4)!=rvectlen4 and len(host.probe_result_raw4)!=0: return "IPv4 result probe vectors raw have different sizes" return None def list_to_unique_tuple(mylist): mytuple=tuple() for element in mylist : if mytuple.count(element)<=0 : mytuple=mytuple+(element,) return mytuple def count_unique(mylist): mytuple=tuple() for element in mylist : if mytuple.count(element)<=0 : mytuple=mytuple+(element,) return len(mytuple) # For a given list of IPv6 packets, it determines in which fields they differ. # It returns a tuple that contains the names of the fields that differ. # E.g: For two packets IPv6(hlim=64, tc=0, fl=25) and IPv6(hlim=128, tc=0, fl=0), # it returs this tuple ("Hop Limit", "Flow Label"). # If the packets are identical the () empty tuple is returned. # NOTE: IPv6 source and destination address are not included in the comparison, # as they are expected to differ in most cases. def ipv6_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check IP version temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.version) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("IP Version", list_to_unique_tuple(temp_vector)), ) # Check Traffic Class temp_vector=[] for packet in packet_vector: temp_vector.append(packet.tc) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Traffic Class", list_to_unique_tuple(temp_vector)), ) # Check Flow Label temp_vector=[] for packet in packet_vector: temp_vector.append(packet.fl) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Flow Label", list_to_unique_tuple(temp_vector)), ) # Check Payload Length temp_vector=[] for packet in packet_vector: temp_vector.append(packet.plen) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Payload Length", list_to_unique_tuple(temp_vector)), ) # Check Next Header temp_vector=[] for packet in packet_vector: temp_vector.append(packet.nh) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Next Header", list_to_unique_tuple(temp_vector)), ) # Check Hop Limit temp_vector=[] for packet in packet_vector: temp_vector.append(packet.hlim) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Hop Limit", list_to_unique_tuple(temp_vector)), ) return ("IPv6", result) # For a given list of IPv4 packets, it determines in which fields they differ. # It returns a tuple that contains the names of the fields that differ. # E.g: For two packets IPv4(ttl=64, id=0, MF=1) and IPv4(ttl=128, if=0, MF=0), # it returs this tuple ("TTL", "Flags"). # If the packets are identical the () empty tuple is returned. # NOTE: IPv4 source and destination address are not included in the comparison, # as they are expected to differ in most cases. def ipv4_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check IP version temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.version) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("IP Version", list_to_unique_tuple(temp_vector)), ) # Internet Header Length temp_vector=[] for packet in packet_vector: temp_vector.append(packet.ihl) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Header Length", list_to_unique_tuple(temp_vector)), ) # Check TOS temp_vector=[] for packet in packet_vector: temp_vector.append(packet.tos) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("TOS", list_to_unique_tuple(temp_vector)), ) # Check Total Length temp_vector=[] for packet in packet_vector: temp_vector.append(packet.len) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Total Length", list_to_unique_tuple(temp_vector)), ) # Check Identifier temp_vector=[] for packet in packet_vector: temp_vector.append(packet.id) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Identifier", list_to_unique_tuple(temp_vector)), ) # Check Flags temp_vector=[] for packet in packet_vector: temp_vector.append(packet.flags) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Flags", list_to_unique_tuple(temp_vector)), ) # Check Fragment Offset temp_vector=[] for packet in packet_vector: temp_vector.append(packet.frag) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Fragment Offset", list_to_unique_tuple(temp_vector)), ) # Check TTL temp_vector=[] for packet in packet_vector: temp_vector.append(packet.ttl) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("TTL", list_to_unique_tuple(temp_vector)), ) # Check Next header temp_vector=[] for packet in packet_vector: temp_vector.append(packet.proto) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Next Header", list_to_unique_tuple(temp_vector)), ) # Check Next header temp_vector=[] for packet in packet_vector: temp_vector.append(packet.options) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Options", list_to_unique_tuple(temp_vector)), ) return ("IPv4", result) def icmp4_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check ICMP Type temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.type) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Type", list_to_unique_tuple(temp_vector)), ) return result # If types are different, just return (shouldn't happen) # Check ICMP Code temp_vector=[] for packet in packet_vector: temp_vector.append(packet.code) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Code", list_to_unique_tuple(temp_vector)), ) return ("ICMP", result) def icmp6_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter()") # Check ICMP Type temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.type) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Type", list_to_unique_tuple(temp_vector)), ) # Check ICMP Code temp_vector=[] for packet in packet_vector: temp_vector.append(packet.code) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Code", list_to_unique_tuple(temp_vector)), ) # Check Pointer in parameter problem messages temp_vector=[] for packet in packet_vector: if type(packet)==scapy.layers.inet6.ICMPv6ParamProblem : temp_vector.append(packet.ptr) if len(temp_vector)>0 : if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("ParamPointer", list_to_unique_tuple(temp_vector)), ) temp_vector=[] for packet in packet_vector: if type(packet)==scapy.layers.inet6.ICMPv6EchoReply : temp_vector.append(packet.data) if len(temp_vector)>0 : if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Data", list_to_unique_tuple(temp_vector)), ) return ("ICMPv6", result) def tcp_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Sequence temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.seq) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Sequence", list_to_unique_tuple(temp_vector)), ) # Check Ack temp_vector=[] for packet in packet_vector: temp_vector.append(packet.ack) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Acknowledgement", list_to_unique_tuple(temp_vector)), ) # Check data offset temp_vector=[] for packet in packet_vector: temp_vector.append(packet.dataofs) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Offset", list_to_unique_tuple(temp_vector)), ) # Check reserved temp_vector=[] for packet in packet_vector: temp_vector.append(packet.reserved) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Reserved", list_to_unique_tuple(temp_vector)), ) # Check flags temp_vector=[] for packet in packet_vector: temp_vector.append(packet.flags) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Flags", list_to_unique_tuple(temp_vector)), ) # Check window temp_vector=[] for packet in packet_vector: temp_vector.append(packet.window) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Window", list_to_unique_tuple(temp_vector)), ) # Check urgent pointer temp_vector=[] for packet in packet_vector: temp_vector.append(packet.urgptr) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Urg.Ptr", list_to_unique_tuple(temp_vector)), ) # Check options temp_vector=[] for packet in packet_vector: temp_vector.append(packet.options) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Options", list_to_unique_tuple(temp_vector)), ) return ("TCP", result) def udp_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Length temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.len) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Length", list_to_unique_tuple(temp_vector)), ) # Check if some packets have a zero checksum and others use proper sums. temp_vector=[] for packet in packet_vector: temp_vector.append(packet.chksum) has_zero=False has_nonzero=False for i in temp_vector: if i==0 : has_zero=True elif i!=0 : has_nonzero=True if has_zero==True and has_nonzero==True : result=result+( ("Checksum", list_to_unique_tuple(temp_vector)), ) break return ("UDP", result) def IPv6ExtHdrDestOpt_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Next header temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.nh) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Next Header", list_to_unique_tuple(temp_vector)), ) # Check length temp_vector=[] for packet in packet_vector: temp_vector.append(packet.len) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Length", list_to_unique_tuple(temp_vector)), ) # Check options temp_vector=[] for packet in packet_vector: temp_vector.append(packet.options) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Options", list_to_unique_tuple(temp_vector)), ) return ("DestOpts", result) def IPv6ExtHdrFragment_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Next header temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.nh) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Next Header", list_to_unique_tuple(temp_vector)), ) # Check reserved1 temp_vector=[] for packet in packet_vector: temp_vector.append(packet.res1) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Reserved1", list_to_unique_tuple(temp_vector)), ) # Check reserved2 temp_vector=[] for packet in packet_vector: temp_vector.append(packet.res2) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Reserved2", list_to_unique_tuple(temp_vector)), ) # Check offset temp_vector=[] for packet in packet_vector: temp_vector.append(packet.offset) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Offset", list_to_unique_tuple(temp_vector)), ) # Check identifier temp_vector=[] for packet in packet_vector: temp_vector.append(packet.id) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Identifier", list_to_unique_tuple(temp_vector)), ) return ("FragHdr", result) def IPv6ExtHdrHopByHop_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Next header temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.nh) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Next Header", list_to_unique_tuple(temp_vector)), ) # Check length temp_vector=[] for packet in packet_vector: temp_vector.append(packet.len) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Length", list_to_unique_tuple(temp_vector)), ) # Check options temp_vector=[] for packet in packet_vector: temp_vector.append(packet.options) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Options", list_to_unique_tuple(temp_vector)), ) return ("Hop-by-Hop", result) def IPv6ExtHdrRouting_packet_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Next header temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.nh) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Next Header", list_to_unique_tuple(temp_vector)), ) # Check length temp_vector=[] for packet in packet_vector: temp_vector.append(packet.len) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Length", list_to_unique_tuple(temp_vector)), ) # Check type temp_vector=[] for packet in packet_vector: temp_vector.append(packet.type) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Type", list_to_unique_tuple(temp_vector)), ) # Check segments left temp_vector=[] for packet in packet_vector: temp_vector.append(packet.segleft) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Segments Left", list_to_unique_tuple(temp_vector)), ) # Check reserved field temp_vector=[] for packet in packet_vector: temp_vector.append(packet.reserved) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Reserved", list_to_unique_tuple(temp_vector)), ) # Check reserved field temp_vector=[] for packet in packet_vector: temp_vector.append(packet.addresses) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Addresses", list_to_unique_tuple(temp_vector)), ) return ("Routing", result) def raw_packet_diff(packet_vector) : if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Next header temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.load) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("Payload", list_to_unique_tuple(temp_vector)), ) return ("Raw", result) def padding_packet_diff(packet_vector) : if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check Next header temp_vector=[] result=() for packet in packet_vector: temp_vector.append(packet.load) if temp_vector!=[temp_vector[0]]*len(temp_vector) : result=result+( ("load", list_to_unique_tuple(temp_vector)), ) return ("Padding", result) def get_layer_diff(packet_vector): if packet_vector==None or len(packet_vector)<=1 : print_fatal("ERROR: Wrong parameter") # Check all packets are of the same type layers_found=(packet_vector[0].__class__.__name__, ) for layer in packet_vector : if layers_found.count(layer.__class__.__name__)<=0 : layers_found=layers_found+(layer.__class__.__name__, ) if len(layers_found)>1 : for layer in layers_found: # Except if they are all ICMPv6 if layer.find("ICMPv6")<0 : return ("Different Layers", layers_found ) if type(packet_vector[0]) == scapy.layers.inet.IP : return ipv4_packet_diff(packet_vector) if type(packet_vector[0]) == scapy.layers.inet.IPerror : return ipv4_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.IPv6 : return ipv6_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.IPerror6 : return ipv6_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet.TCP : return tcp_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet.UDP : return udp_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet.UDPerror : return udp_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet.ICMP : return icmp4_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.ICMPv6DestUnreach : return icmp6_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.ICMPv6EchoRequest : return icmp6_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.ICMPv6EchoReply : return icmp6_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.ICMPv6ParamProblem : return icmp6_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.IPv6ExtHdrDestOpt : return IPv6ExtHdrDestOpt_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.IPv6ExtHdrFragment : return IPv6ExtHdrFragment_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.IPv6ExtHdrHopByHop : return IPv6ExtHdrHopByHop_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.layers.inet6.IPv6ExtHdrRouting : return IPv6ExtHdrRouting_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.packet.Raw : return raw_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.packet.Padding : return padding_packet_diff(packet_vector) elif type(packet_vector[0]) == scapy.packet.NoPayload : return None else : print_debug("Unimplemented: " + str(type(packet_vector[0]))) return None def packet_diff(packet_vector): # Diff the network layer result=(get_layer_diff(packet_vector),) # Iterate over the next layers and diff them too current_layer=packet_vector while True : next_layer=[] # Stop when we don't have any more layers if type(current_layer[0].payload)==scapy.packet.NoPayload: break else : for packet in current_layer : next_layer.append(packet.payload) result=result+(get_layer_diff(next_layer),) current_layer=next_layer return result ########################### # MISCELLANEOUS FUNCTIONS # ########################### # Command line argument parsing def argparser(): global path_to_dir_g, result_file_extension_g, debug_g opts, args = getopt.gnu_getopt(sys.argv[1:], "hd", ["help", "debug", "path=", "ext="]) for o, a in opts: if o == "-h" or o == "--help": print_usage() sys.exit() if o == "-d" or o == "--debug": debug_g=True elif o == "--path": path_to_dir_g=str(a) elif o == "--ext": result_file_extension_g=str(a) else : exit(1) # PARAMETER VALIDATION # Check we have enough args #if len(sys.argv)<2 : # print_usage(sys.stderr) # exit(1) #else : # something # If user did not supply a path, use current directory if path_to_dir_g==None: path_to_dir_g=DEFAULT_SEARCH_PATH # If user did not supply a file extension, use default if result_file_extension_g==None : result_file_extension_g=DEFAULT_RESULT_FILE_EXTENSION return None def get_test_index(test_id, ip_version): if ip_version==4 : for i in range(0, len(results_g[0].test4_ids)) : if results_g[0].test4_ids[i]==test_id : return i if ip_version==6 : for i in range(0, len(results_g[0].test6_ids)) : if results_g[0].test6_ids[i]==test_id : return i return -1 def get_test_packets(test_index, ip_version): packet_set=[] for hostresults in results_g : if ip_version==4 : packet_set.append(hostresults.probe_sent_pkts4[test_index]) elif ip_version==6 : packet_set.append(hostresults.probe_sent_pkts6[test_index]) return packet_set def get_original_values(test_id, header_str, field_str, ip_version): test_index=get_test_index(test_id, ip_version) if test_id<0 : return None packet_set=get_test_packets(test_index, ip_version) field_values=[] if header_str=="IPv6" : if field_str=="Traffic Class" : for pkt in packet_set : field_values.append(pkt.tc) elif field_str=="Flow Label" : for pkt in packet_set : field_values.append(pkt.fl) elif field_str=="Payload Length" : for pkt in packet_set : field_values.append(pkt.plen) elif field_str=="Next Header" : for pkt in packet_set : field_values.append(pkt.nh) elif field_str=="Hop Limit" : for pkt in packet_set : field_values.append(pkt.hlim) if header_str=="IPv4" : if field_str=="Header Length" : for pkt in packet_set : field_values.append(pkt.ihl) elif field_str=="TOS" : for pkt in packet_set : field_values.append(pkt.tos) elif field_str=="Total Length" : for pkt in packet_set : field_values.append(pkt.len) elif field_str=="Identifier" : for pkt in packet_set : field_values.append(pkt.id) elif field_str=="Flags" : for pkt in packet_set : field_values.append(pkt.flags) elif field_str=="Flagment Offset" : for pkt in packet_set : field_values.append(pkt.frag) elif field_str=="TTL" : for pkt in packet_set : field_values.append(pkt.ttl) elif field_str=="Next Header" : for pkt in packet_set : field_values.append(pkt.proto) elif field_str=="Options" : for pkt in packet_set : field_values.append(pkt.options) if header_str=="TCP" : if field_str=="Sequence" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].seq) elif field_str=="Acknowledgement" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].ack) elif field_str=="Offset" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].dataofs) elif field_str=="Reserved" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].reserved) elif field_str=="Flags" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].flags) elif field_str=="Window" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].window) elif field_str=="Urg.Ptr" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].urgptr) elif field_str=="Options" : for pkt in packet_set : if TCP in pkt : field_values.append(pkt[TCP].options) if header_str=="UDP" : if field_str=="Length" : for pkt in packet_set : if UDP in pkt : field_values.append(pkt[UDP].len) elif field_str=="Checksum" : for pkt in packet_set : if UDP in pkt : field_values.append(pkt[UDP].cksum) if header_str=="ICMPv6" : if field_str=="Data" : for pkt in packet_set : if ICMPv6EchoReply in pkt : field_values.append(pkt[ICMPv6EchoReply].data) return field_values def get_original_values_unique(test_id, header_str, field_str, ip_version): result=get_original_values(test_id, header_str, field_str, ip_version) result2=() for item in result : if result2.count(item)<=0 : result2=result2+(item ,) return result2 def print_field_variation_summary(diff_results, header_str, field_str): msg="==== " + header_str + ": " + field_str + " " print msg + "="*(80 -len(msg)) count=0 for result in diff_results : for layer in result[1] : if layer!=None : if layer[0]==header_str : for field in layer[1] : if field[0]==field_str: count=count+1 orig=get_original_values_unique(result[0], header_str, field_str, 6) print "#" + result[0].ljust(25) + header_str.ljust(10) + str(field), if len(orig)>0 : print "; OrigValues: " + str(orig) else : print "" if count==0 : print "No differences found" print "\n" def octet_to_flags(myoctet): myflags="" if myoctet & 0x01 : myflags=myflags+"F" if myoctet & 0x02 : myflags=myflags+"S" if myoctet & 0x04 : myflags=myflags+"R" if myoctet & 0x08 : myflags=myflags+"P" if myoctet & 0x10 : myflags=myflags+"A" if myoctet & 0x20 : myflags=myflags+"U" if myoctet & 0x40 : myflags=myflags+"E" if myoctet & 0x80 : myflags=myflags+"C" return myflags def main(): global results_g # Parse command line parameters res=argparser() if res != None : print res exit(1) # Avoid "maximum recursion depth exceeded" errors sys.setrecursionlimit(10000) # Print Welcome banner print_welcome_banner() # Build a result vector from the files in the supplied directory results_g=parse_dir_files(path_to_dir_g, result_file_extension_g) # Sort the list so similar OSes are placed closed to each otherr. results_g.sort(key = lambda x: x.ostype+x.ossubtype+x.osversion) # Ensure our result files are consistent a=verify_correctness(results_g) if a!= None: print "ERROR: " + str(a) exit(1) print "+------------------------------------------------------------------------------+" print "|" + "GENERAL INFORMATION".center(78) + "|" print "+------------------------------------------------------------------------------+" # Print the list of operating systems that were analyzed print "========== OPERATING SYSTEMS ==========" i=0 for host in results_g: print ("["+str(i)+"]").ljust(5) + str([host.ostype, host.ossubtype, host.osversion]) i=i+1 print "" # Print observed header types print "========== OBSERVED PROTOCOLS ==========" layers=[] for test_no in range(0, len(results_g[0].probe_result_pkts6)) : for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: currlayer=hostpkt while True: if currlayer==None : break; if type(currlayer)==scapy.packet.NoPayload: break; # Do not go into ICMP error messages if type(currlayer)==scapy.layers.inet6.IPerror6: break; if type(currlayer)==scapy.layers.inet.IPerror: break # Add layer if we haven't listed before if layers.count(currlayer.__class__.__name__)<=0 : layers.append(currlayer.__class__.__name__) currlayer=currlayer.payload if len(layers)>0 : layers.sort() print "[+] Observed header types:" for item in layers : print " |_" + str(item) print "========== TEST STATISTICS ==========" # Determine the following: # a) which tests are never replied # b) which tests are always replied # c) which tests are sometimes replied never_replied=[] always_replied=[] sometimes_replied=[] times_replied=[] print "[+] Determine the amount of replies for each test" first=results_g[0].result_vector6 for i in range(0, len(first)): temp_vector=[] for host in results_g: temp_vector.append(host.result_vector6[i]) if temp_vector==[1]*len(temp_vector) : # All ones (always replied) always_replied.append(1) never_replied.append(0) sometimes_replied.append(0) times_replied.append( len(results_g) ) elif temp_vector==[0]*len(temp_vector) : # All zeroes (never replied) always_replied.append(0) never_replied.append(1) sometimes_replied.append(0) times_replied.append(0) else : # Mixed (sometimes replied) always_replied.append(0) never_replied.append(0) sometimes_replied.append(1) replies=0 for x in temp_vector: if x==1 : replies=replies+1 times_replied.append( replies ) # Ensure the results make sense: the union of the three sets should be all 1's for i in range(0, len(always_replied)) : a=always_replied[i]+never_replied[i]+sometimes_replied[i] if a!=1: print_fatal("ERROR: Reply sets are wrong") # Print results print "==NEVER REPLIED v6==" print " " + str(never_replied[0:49]).replace(' ', '') print " " + str(never_replied[50:99]).replace(' ', '') print " " + str(never_replied[100:149]).replace(' ', '') print " " + str(never_replied[150:]).replace(' ', '') print "" print "==ALWAYS REPLIED v6==" print " " + str(always_replied[0:49]).replace(' ', '') print " " + str(always_replied[50:99]).replace(' ', '') print " " + str(always_replied[100:149]).replace(' ', '') print " " + str(always_replied[150:]).replace(' ', '') print "" print "==SOMETIMES REPLIED v6==" print " " + str(sometimes_replied[0:49]).replace(' ', '') print " " + str(sometimes_replied[50:99]).replace(' ', '') print " " + str(sometimes_replied[100:149]).replace(' ', '') print " " + str(sometimes_replied[150:]).replace(' ', '') print "" print "==TEST No==+===Test Description====+===================Times Replied===================+==Percentage==+" for i in range(0, len(times_replied)) : test_id_str=results_g[0].test6_ids[i] percent=round( ((times_replied[i]*100.0)/len(results_g)), 2) print "#"+str(i).ljust(11) +test_id_str.ljust(24)+ ("#"* int(percent/2)).ljust(55) + "("+str(percent)+"%)" print "[+] IPv6 Tests that were always replied: " + str(round(100*float(always_replied.count(1))/len(always_replied),2)) + "%" print "[+] IPv6 Tests that were never replied: " + str(round(100*float(never_replied.count(1))/len(never_replied),2)) + "%" print "[+] IPv6 Tests that were sometimes replied: " + str(round(100*float(sometimes_replied.count(1))/len(sometimes_replied),2)) + "%" # Now the same for IPv4 tests never_replied4=[] always_replied4=[] sometimes_replied4=[] times_replied4=[] # Select only those hosts for which we found IPv4 tests results_4=[] for host in results_g : if len(host.result_vector4)>0 : results_4.append(host) if len(results_4)>0 : first=results_4[0].result_vector4 for i in range(0, len(first)): temp_vector=[] for host in results_4: temp_vector.append(host.result_vector4[i]) if temp_vector==[1]*len(temp_vector) : # All ones (always replied) always_replied4.append(1) never_replied4.append(0) sometimes_replied4.append(0) times_replied4.append( len(results_4) ) elif temp_vector==[0]*len(temp_vector) : # All zeroes (never replied) always_replied4.append(0) never_replied4.append(1) sometimes_replied4.append(0) times_replied4.append(0) else : # Mixed (sometimes replied) always_replied4.append(0) never_replied4.append(0) sometimes_replied4.append(1) replies=0 for x in temp_vector: if x==1 : replies=replies+1 times_replied4.append( replies ) # Ensure the results make sense: the union of the three sets should be all 1's for i in range(0, len(always_replied4)) : a=always_replied4[i]+never_replied4[i]+sometimes_replied4[i] if a!=1: print_fatal("ERROR: Reply sets are wrong") # Print results print "" print "==NEVER REPLIED v4==" print " " + str(never_replied4).replace(' ', '') print "" print "==ALWAYS REPLIED v4==" print " " + str(always_replied4).replace(' ', '') print "" print "==SOMETIMES REPLIED v4==" print " " + str(sometimes_replied4).replace(' ', '') print "" print "==TEST No==+===Test Description====+===================Times Replied===================+==Percentage==+" for i in range(0, len(times_replied4)) : test_id_str=results_g[0].test4_ids[i] percent=round( ((times_replied4[i]*100.0)/len(results_4)), 2) print "#"+str(i).ljust(11) +test_id_str.ljust(24)+ ("#"* int(percent/2)).ljust(55) + "("+str(percent)+"%)" print "[+] IPv4 Tests that were always replied: " + str(round(100*float(always_replied4.count(1))/len(always_replied4),2)) + "%" print "[+] IPv4 Tests that were never replied: " + str(round(100*float(never_replied4.count(1))/len(never_replied4),2)) + "%" print "[+] IPv4 Tests that were sometimes replied: " + str(round(100*float(sometimes_replied4.count(1))/len(sometimes_replied4),2)) + "%" # Create a detailed report for each test print "+------------------------------------------------------------------------------+" print "|" + "PER-TEST INFORMATION".center(78) + "|" print "+------------------------------------------------------------------------------+" for test_no in range(0, len(results_g[0].probe_result_pkts6)) : answered=[] print "\n=======================TEST " + str(test_no) + " " + results_g[0].test6_ids[test_no] + "=======================" # Print answer vector and the number of times answered for host in results_g: if host.probe_result_pkts6[test_no]==None: answered.append(0); else: answered.append(1); print "[+] Answered: " + str(answered); print "[+] Answers: " + str(answered.count(1)) + "/" + str(len(answered)), if answered.count(1) != 0 : percentage=100 * (float(answered.count(1)) / float(len(answered)) ) print "("+ str(round(percentage, 2)) +"%)" else: print "(0.00%)" # Print variations in traffic class auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: auxvect.append(hostpkt.tc); if count_unique(auxvect)>1 : print "[+] Traffic Class: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in flow label auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: auxvect.append(hostpkt.fl); if count_unique(auxvect)>1 : print "[+] Flow Label: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in payload length auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: auxvect.append(hostpkt.plen); if count_unique(auxvect)>1 : print "[+] Payload Length: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variation in next header type auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: auxvect.append(hostpkt.nh); for i in range(0, len(auxvect)): if auxvect[i]==0 : auxvect[i]="HOPT" elif auxvect[i]==6: auxvect[i]="TCP " elif auxvect[i]==17: auxvect[i]="UDP " elif auxvect[i]==43: auxvect[i]="ROUT" elif auxvect[i]==44: auxvect[i]="FRAG" elif auxvect[i]==58: auxvect[i]="ICMP" elif auxvect[i]==59: auxvect[i]="NNXT" elif auxvect[i]==60: auxvect[i]="DOPT" else: auxvect[i]="????" if count_unique(auxvect)>1 : print "[+] Next Header: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in hop limit auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: auxvect.append(hostpkt.hlim); if count_unique(auxvect)>1 : print "[+] Hop Limit: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in layer list layers=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: layerlist=tuple() currlayer=hostpkt while True: if currlayer==None : break; if type(currlayer)==scapy.packet.NoPayload: break; layerlist=layerlist+(currlayer.__class__.__name__,) currlayer=currlayer.payload if layers.count(layerlist)<=0 : layers.append(layerlist) if len(layers)>0 : print "[+] Layers:" for item in layers : print " |_" + str(item) # Print variations in TCP sequence auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].seq); if count_unique(auxvect)>1 : print "[+] TCP Sequence: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in TCP acknowledgement auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].ack); if count_unique(auxvect)>1 : print "[+] TCP Ack: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in data offset auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].dataofs * 4); if count_unique(auxvect)>1 : print "[+] TCP HeaderLen: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in the reserved field auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].reserved); if count_unique(auxvect)>1 : print "[+] TCP Reserved: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in flags auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].flags); if count_unique(auxvect)>1 : print "[+] TCP Flags: ".ljust(25), myflags=list_to_unique_tuple(auxvect) print "[", for flg in myflags: print octet_to_flags(flg), print "]" # Print variations in TCP Window auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].window); if count_unique(auxvect)>1 : print "[+] TCP Window: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in TCP urgent pointer auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].urgptr); if count_unique(auxvect)>1 : print "[+] TCP UrgPtr: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in the number of TCP options included auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(len(hostpkt[TCP].options)); if count_unique(auxvect)>1 : print "[+] TCP Num Opts: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in TCP options order auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].options); auxopts=[] for tcpopts in auxvect: optlist=[] if tcpopts==None : continue; for tcpopt in tcpopts: if tcpopt==None: break; else : optlist.append(tcpopt[0]) if auxopts.count(optlist)<=0 : auxopts.append(optlist) if len(auxopts)>0 : print "[+] TCP Options:" for item in auxopts : print " |_" + str(item) # Print variations in TCP options auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if TCP in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[TCP].options); auxopts=[] for tcpopts in auxvect: for tcpopt in tcpopts: if tcpopt==None: break; if tcpopt[0]=='MSS': auxopts.append(tcpopt[1]) if count_unique(auxopts)>1 : print "[+] TCP MSS: ".ljust(25), print list_to_unique_tuple(auxopts) auxopts=[] for tcpopts in auxvect: for tcpopt in tcpopts: if tcpopt==None: break; if tcpopt[0]=='WScale': auxopts.append(tcpopt[1]) if count_unique(auxopts)>1 : print "[+] TCP Wscale: ".ljust(25), print list_to_unique_tuple(auxopts) # Print variations in ICMP Parameter problem pointer auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6ParamProblem in hostpkt: auxvect.append(hostpkt[ICMPv6ParamProblem].ptr); if count_unique(auxvect)>1 : print "[+] ICMP ParamPointer: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Parameter problem code auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6ParamProblem in hostpkt: auxvect.append(hostpkt[ICMPv6ParamProblem].code); if count_unique(auxvect)>1 : print "[+] ICMP Param Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Echo Reply identifier auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6EchoReply in hostpkt: auxvect.append(hostpkt[ICMPv6EchoReply].id); if count_unique(auxvect)>1 : print "[+] ICMP EchoReply ID: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Echo Reply sequence auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6EchoReply in hostpkt: auxvect.append(hostpkt[ICMPv6EchoReply].seq); if count_unique(auxvect)>1 : print "[+] ICMP EchoReply Seq: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Echo Reply code auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6EchoReply in hostpkt: auxvect.append(hostpkt[ICMPv6EchoReply].code); if count_unique(auxvect)>1 : print "[+] ICMP EchoReply Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Destination Unreachable code auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6DestUnreach in hostpkt: auxvect.append(hostpkt[ICMPv6DestUnreach].code); if count_unique(auxvect)>1 : print "[+] ICMP DestUnreach Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Destination Unreachable unused auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6DestUnreach in hostpkt: auxvect.append(hostpkt[ICMPv6DestUnreach].unused); if count_unique(auxvect)>1 : print "[+] ICMP DestUnreach Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Packet Too Big MTU auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6PacketTooBig in hostpkt: auxvect.append(hostpkt[ICMPv6PacketTooBig].mtu); if count_unique(auxvect)>1 : print "[+] ICMP PktTooBig MTU: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Time Exceeded code auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6TimeExceeded in hostpkt: auxvect.append(hostpkt[ICMPv6TimeExceeded].code); if count_unique(auxvect)>1 : print "[+] ICMP DestUnreach Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Time Exceeded unused auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6TimeExceeded in hostpkt: auxvect.append(hostpkt[ICMPv6TimeExceeded].unused); if count_unique(auxvect)>1 : print "[+] ICMP DestUnreach Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Neighbor Advertisement Code auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6ND_NA in hostpkt: auxvect.append(hostpkt[ICMPv6ND_NA].code); if count_unique(auxvect)>1 : print "[+] ICMP NeighAdv Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Neighbor Advertisement Flags auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6ND_NA in hostpkt: flagstr="" if hostpkt[ICMPv6ND_NA].R==1: flagstr=flagstr+"R" if hostpkt[ICMPv6ND_NA].S==1: flagstr=flagstr+"S" if hostpkt[ICMPv6ND_NA].O==1: flagstr=flagstr+"O" auxvect.append(flagstr); if count_unique(auxvect)>1 : print "[+] ICMP NeighAdv Flags: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Neighbor Advertisement reserved auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6ND_NA in hostpkt: auxvect.append(hostpkt[ICMPv6ND_NA].res); if count_unique(auxvect)>1 : print "[+] ICMP NeighAdv Resvd: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Node Information Qtype auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6NIReplyIPv4 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv4].qtype) elif ICMPv6NIReplyNOOP in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyNOOP].qtype) elif ICMPv6NIReplyRefuse in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyRefuse].qtype) elif ICMPv6NIReplyIPv6 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv6].qtype) elif ICMPv6NIReplyName in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyName].qtype) elif ICMPv6NIReplyUnknown in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyUnknown].qtype) if count_unique(auxvect)>1 : print "[+] ICMP NodeInfo Qtype: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Node Information Code auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6NIReplyIPv4 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv4].code) elif ICMPv6NIReplyNOOP in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyNOOP].code) elif ICMPv6NIReplyRefuse in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyRefuse].code) elif ICMPv6NIReplyIPv6 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv6].code) elif ICMPv6NIReplyName in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyName].code) elif ICMPv6NIReplyUnknown in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyUnknown].code) if count_unique(auxvect)>1 : print "[+] ICMP NodeInfo Code: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Node Information Flags auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6NIReplyIPv4 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv4].flags) elif ICMPv6NIReplyNOOP in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyNOOP].flags) elif ICMPv6NIReplyRefuse in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyRefuse].flags) elif ICMPv6NIReplyIPv6 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv6].flags) elif ICMPv6NIReplyName in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyName].flags) elif ICMPv6NIReplyUnknown in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyUnknown].flags) if count_unique(auxvect)>1 : print "[+] ICMP NodeInfo Flags: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in ICMP Node Information Unused auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if ICMPv6NIReplyIPv4 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv4].unused) elif ICMPv6NIReplyNOOP in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyNOOP].unused) elif ICMPv6NIReplyRefuse in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyRefuse].unused) elif ICMPv6NIReplyIPv6 in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyIPv6].unused) elif ICMPv6NIReplyName in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyName].unused) elif ICMPv6NIReplyUnknown in hostpkt: auxvect.append(hostpkt[ICMPv6NIReplyUnknown].unused) if count_unique(auxvect)>1 : print "[+] ICMP NodeInfo Unused: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in fragment identifiers auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if IPv6ExtHdrFragment in hostpkt and not (IPerror6 in hostpkt): auxvect.append(hostpkt[IPv6ExtHdrFragment].id); if count_unique(auxvect)>1 : print "[+] Fragment ID: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in Fragment header "reserved" fields auxvect=[] for host in results_g: hostpkt=host.probe_result_pkts6[test_no] if hostpkt!=None: if IPv6ExtHdrFragment in hostpkt and not (IPerror6 in hostpkt): auxvect.append(str(hostpkt[IPv6ExtHdrFragment].res1)+str(hostpkt[IPv6ExtHdrFragment].res2)); if count_unique(auxvect)>1 : print "[+] Fragment Reserved: ".ljust(25), print list_to_unique_tuple(auxvect) # Print variations in Fragment offset auxvect=[] for host in results_g: # Get all received fragments for the current test pktfrags=[] for pkttuple in host.probe_result_pkts6_more : if pkttuple[1]==None : continue if pkttuple[0]==test_no: pktfrags.append(pkttuple[1]) # Extract fragment offsets offsetlist=[] for packet in pktfrags: if packet!=None: if IPv6ExtHdrFragment in packet and not (IPerror6 in packet): offsetlist.append(packet[IPv6ExtHdrFragment].offset); # Sort them, so they always appear in the right order (frag offset 0 first). offsetlist.sort(); if auxvect.count(offsetlist)<=0: auxvect.append(offsetlist) if len(auxvect)>1 : print "[+] Fragment Offset: " for item in auxvect : print " |_x" + str(item) print "+------------------------------------------------------------------------------+" print "|" + "PER-FIELD INFORMATION".center(78) + "|" print "+------------------------------------------------------------------------------+" diff_results=[] for i in range(0, len(always_replied)) : temp_vector=[] if always_replied[i]==1: for host in results_g: temp_vector.append(host.probe_result_pkts6[i]) result=packet_diff(temp_vector) test_id_str=results_g[0].test6_ids[i] diff_results.append( (test_id_str, result) ) # print "#"+str(i).ljust(11) +test_id_str.ljust(24), # for item in result : # if item != None : # if item[1]!= tuple() : # print item, # print "" elif sometimes_replied[i]==1: for host in results_g: if host.probe_result_pkts6[i]!=None : temp_vector.append(host.probe_result_pkts6[i]) if len(temp_vector)>1 : result=packet_diff(temp_vector) test_id_str=results_g[0].test6_ids[i] diff_results.append( (test_id_str, result) ) else : test_id_srt="N/A" # print "$"+str(i).ljust(11) +test_id_str.ljust(24), # for item in result : # if item != None : # if item[1]!= tuple() : # print item, # print "" # # print "[+] Description: determine which fields differ in IPv4 responses" # print "==TEST No==+===Test Description====+=== Fields that differ=====================+" # for i in range(0, len(always_replied4)) : # temp_vector=[] # if always_replied4[i]==1: # for host in results_4: # temp_vector.append(host.probe_result_pkts4[i]) # result=packet_diff(temp_vector) # test_id_str=results_4[0].test4_ids[i] # print "#"+str(i).ljust(11) +test_id_str.ljust(24)+str(result) # elif sometimes_replied4[i]==1: # for host in results_4: # if host.probe_result_pkts4[i]!=None : # temp_vector.append(host.probe_result_pkts4[i]) # if len(temp_vector)>1 : # result=packet_diff(temp_vector) # test_id_str=results_4[0].test4_ids[i] # else : # test_id_srt="N/A" # print "$"+str(i).ljust(11) +test_id_str.ljust(24)+str(result) print "\n\n[+] Summary of the differences for each field" print "==TEST No==+===Test Description====+=== Fields that differ=====================+" fields2show=(("IPv6", ("IP Version", "Traffic Class", "Flow Label", "Payload Length", "Next Header", "Hop Limit")), ("ICMP", ("Type", "Code")), ("ICMPv6", ("Type", "Code", "ParamPointer")), ("TCP", ("Sequence", "Acknowledgement", "Offset", "Reserved", "Flags", "Window", "Urg.Ptr", "Options")), ("UDP", ("Length", "Checksum")), ("DestOpts", ("Next Header", "Length", "Options")), ("FragHdr", ("Next Header", "Reserved1", "Reserved2", "Offset", "Identifier")), ("Hop-by-Hop", ("Next Header", "Length", "Options")), ("Routing", ("Next Header", "Length", "Type", "Segments Left", "Reserved", "Addresses")), ("Raw", ("Payload",)) ) for layer in fields2show : for field in layer[1] : print_field_variation_summary(diff_results, layer[0], field) print "+------------------------------------------------------------------------------+" print "|" + "PER-SYSTEM INFORMATION".center(78) + "|" print "+------------------------------------------------------------------------------+" print_analysis_intro("IPv4 versus IPv6 results") # Now for the timed tests, see if we get the same response (at the transport layer) for both IPv4 and IPv6 for host in results_g : # Skip hosts for which we don't have IPv4 results if len(host.result_vector4)<=0 : continue # Build a list of tests responses_ipv6 = host.probe_result_pkts6[0:16]+host.timed_results_pkts6 responses_ipv4 = host.probe_result_pkts4[0:16]+host.timed_results_pkts4 # Check both probe lists have the same size if len(responses_ipv6)!=len(responses_ipv4) : continue print " [+] " + str([host.ostype, host.ossubtype, host.osversion]) for i in range(0, len(responses_ipv6) ) : # Skip those for which we didnt get a response if responses_ipv6[i]==None or responses_ipv4[i]==None : continue ipv6str=" #TCP Header IPv6-Probe-"+str(i) ipv4str=" #TCP Header IPv4-Probe-"+str(i) if TCP in responses_ipv6[i] and TCP in responses_ipv4[i] : if responses_ipv6[i][TCP].seq != responses_ipv4[i][TCP].seq : ipv6str=ipv6str+" Seq="+str(responses_ipv6[i][TCP].seq).ljust(10) ipv4str=ipv4str+" Seq="+str(responses_ipv4[i][TCP].seq).ljust(10) if responses_ipv6[i][TCP].ack != responses_ipv4[i][TCP].ack : ipv6str=ipv6str+" Ack="+str(responses_ipv6[i][TCP].ack).ljust(10) ipv4str=ipv4str+" Ack="+str(responses_ipv4[i][TCP].ack).ljust(10) if responses_ipv6[i][TCP].dataofs!= responses_ipv4[i][TCP].dataofs : ipv6str=ipv6str+" Off="+str(responses_ipv6[i][TCP].dataofs).ljust(10) ipv4str=ipv4str+" Off="+str(responses_ipv4[i][TCP].dataofs).ljust(10) if responses_ipv6[i][TCP].flags!= responses_ipv4[i][TCP].flags : ipv6str=ipv6str+" Flg="+str(octet_to_flags(responses_ipv6[i][TCP].flags)).ljust(10) ipv4str=ipv4str+" Flg="+str(octet_to_flags(responses_ipv4[i][TCP].flags)).ljust(10) if responses_ipv6[i][TCP].window!= responses_ipv4[i][TCP].window : ipv6str=ipv6str+" Win="+str(responses_ipv6[i][TCP].window).ljust(10) ipv4str=ipv4str+" Win="+str(responses_ipv4[i][TCP].window).ljust(10) if responses_ipv6[i][TCP].urgptr!= responses_ipv4[i][TCP].urgptr : ipv6str=ipv6str+" Win="+str(responses_ipv6[i][TCP].urgptr).ljust(10) ipv4str=ipv4str+" Win="+str(responses_ipv4[i][TCP].urgptr).ljust(10) if not (TCP in responses_ipv6[i]): ipv6str=ipv6str+" No TCP data" if not (TCP in responses_ipv4[i]): ipv4str=ipv4str+" No TCP data" # Print the results print ipv6str print ipv4str print " #TCP Options IPv6-Probe-"+str(i), if TCP in responses_ipv6[i]: print str(responses_ipv6[i][TCP].options) else: print "" print " #TCP Options IPv4-Probe-"+str(i), if TCP in responses_ipv4[i]: print str(responses_ipv4[i][TCP].options) else : print "" print "" # Print variations of some IPv6 fields for each OS print_analysis_intro("Variations of the Traffic Class field") for host in results_g : print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", TC:").ljust(35), vect=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if vect.count(pkt.tc)<=0 : vect.append(pkt.tc) print str(vect) print_analysis_intro("Variations of the Flow Label field") for host in results_g : print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", FL:").ljust(35), vect=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if vect.count(pkt.fl)<=0 : vect.append(pkt.fl) print str(vect) print_analysis_intro("Variations of the Hop Limit") for host in results_g : print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", HL:").ljust(35), vect=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if vect.count(pkt.hlim)<=0 : vect.append(pkt.hlim) print str(vect) # Print differences in parameter problem pointers print_analysis_intro("Variations in Parameter Problem pointers") for i in range(0, len(results_g[0].probe_result_pkts6) ) : resps=[] for host in results_g: resps.append(host.probe_result_pkts6[i]) pntrs=[] for pkt in resps: if pkt!=None: if ICMPv6ParamProblem in pkt : ptr=pkt[ICMPv6ParamProblem].ptr if pntrs.count(ptr)<=0 : pntrs.append(ptr) if len(pntrs)>1 : print " #"+str(i)+" " + results_g[0].test6_ids[i] + " " + str(pntrs) # Print differences in parameter problem ICMP code print_analysis_intro("Variations in Parameter Problem ICMP Codes") for i in range(0, len(results_g[0].probe_result_pkts6) ) : resps=[] for host in results_g: resps.append(host.probe_result_pkts6[i]) pntrs=[] for pkt in resps: if pkt!=None: if ICMPv6ParamProblem in pkt : code=pkt[ICMPv6ParamProblem].code if pntrs.count(code)<=0 : pntrs.append(code) if len(pntrs)>1 : print " #"+str(i)+" " + results_g[0].test6_ids[i] + " " + str(pntrs) # Print differences in the sequence of layers print_analysis_intro("Variations in layer stacking (sequence of layers)") for i in range(0, len(results_g[0].probe_result_pkts6) ) : resps=[] for host in results_g: resps.append(host.probe_result_pkts6[i]) layers=[] for pkt in resps: if pkt!=None: layerlist=tuple() currlayer=pkt while True: if currlayer==None : break; if type(currlayer)==scapy.packet.NoPayload: break; layerlist=layerlist+(currlayer.__class__.__name__,) currlayer=currlayer.payload if layers.count(layerlist)<=0 : layers.append(layerlist) if len(layers) > 1: print " #"+str(i).ljust(3)+" " + results_g[0].test6_ids[i].ljust(20) for i in range(0, len(layers)) : print " ->" + str(layers[i]) # Print fragment identifiers print_analysis_intro("Fragment Identifiers in responses") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if type(pkt.payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append(pkt.payload.id) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", IDs:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Print fragment M flag print_analysis_intro("Fragment M flag in responses") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if type(pkt.payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append(pkt.payload.m) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", M:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Print fragment reserved fields print_analysis_intro("Fragment 'Reserved' fields in responses") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if type(pkt.payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append((pkt.payload.res1,pkt.payload.res2)) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", Res:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Print fragment identifiers, including responses that are formed by more than one packet print_analysis_intro("Fragment Identifiers in all received packets (includes tests with more than one response)") for host in results_g: fragids=[] for pkttuple in host.probe_result_pkts6_more : if pkttuple[1]==None : continue if type(pkttuple[1].payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append((pkttuple[0], pkttuple[1].payload.id)) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", IDs:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Print fragment offsets, including responses that are formed by more than one packet print_analysis_intro("Fragment offsets in all received packets (includes tests with more than one response)") for host in results_g: fragids=[] for pkttuple in host.probe_result_pkts6_more : if pkttuple[1]==None : continue if type(pkttuple[1].payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append((pkttuple[0], pkttuple[1].payload.offset)) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", 64b-Offs:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Same as previous one but expressed in bytes, not 64-bit words. print_analysis_intro("Fragment offsets (in bytes) in all received packets (includes tests with more than one response)") for host in results_g: fragids=[] for pkttuple in host.probe_result_pkts6_more : if pkttuple[1]==None : continue if type(pkttuple[1].payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append((pkttuple[0], pkttuple[1].payload.offset * 8)) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", OFFs:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Print fragment identifiers in requests print_analysis_intro("Fragment Identifiers in requests") for host in results_g: fragids=[] for pkt in host.probe_sent_pkts6 : if pkt==None : continue if type(pkt.payload)==scapy.layers.inet6.IPv6ExtHdrFragment : fragids.append(pkt.payload.id) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", IDs:").ljust(35), if len(fragids)>0 : print fragids else : print "No fragments found" print "\n" # Print destination options length print_analysis_intro("Destination Options lengths") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if IPv6ExtHdrDestOpt in pkt and not (IPerror6 in pkt): fragids.append(pkt[IPv6ExtHdrDestOpt].len) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", DOL:").ljust(35), if len(fragids)>0 : print fragids else : print "No DestOpts found" print "\n" # Print list of options for the DestinationOpts extension header print_analysis_intro("DestOpts: list of options") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if IPv6ExtHdrDestOpt in pkt and not (IPerror6 in pkt): fragids.append(pkt[IPv6ExtHdrDestOpt].options) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", DOPT:").ljust(35), if len(fragids)>0 : print fragids else : print "No DestOpts found" print "\n" # Print length of Hop-By-Hop extension headers print_analysis_intro("Hop-by-hop lengths") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if IPv6ExtHdrHopByHop in pkt and not (IPerror6 in pkt): fragids.append(pkt[IPv6ExtHdrHopByHop].len) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", HOL:").ljust(35), if len(fragids)>0 : print fragids else : print "No Hop-by-hop found" print "\n" # Print Hop-by-hop options print_analysis_intro("Hop-by-hop options") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if IPv6ExtHdrHopByHop in pkt and not (IPerror6 in pkt): fragids.append(pkt[IPv6ExtHdrHopByHop].options) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", HOPT:").ljust(35), fragids=list_to_unique_tuple(fragids) if len(fragids)>0 : print fragids else : print "No Hop-by-hop found" print "\n" # Print routing header lengths print_analysis_intro("Routing Header lengths") for host in results_g: fragids=[] for pkt in host.probe_result_pkts6 : if pkt==None : continue if IPv6ExtHdrRouting in pkt and not (IPerror6 in pkt): fragids.append(pkt[IPv6ExtHdrRouting].len) print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", RLen:").ljust(35), if len(fragids)>0 : print fragids else : print "No Routing found" print "\n" # Print info about the Ack number used in packets with flags RA, SA and R print_analysis_intro("Differences in the Acknowledgment field: RA vs SA") for host in results_g: print " [+] " + (host.ostype + " " + host.ossubtype + " " + host.osversion + ", Len:").ljust(35), flgs=[] acks=[] for i in [8, 9, 11, 12]: if type(host.probe_result_pkts6[i])==scapy.layers.inet6.IPv6 : flgs.append(octet_to_flags(host.probe_result_pkts6[i][TCP].flags)) ackvalue= host.probe_result_pkts6[i][TCP].ack - host.probe_sent_pkts6[i][TCP].seq acks.append(ackvalue) else: flgs.append('') acks.append("") for i in range(0, len(flgs)) : myitem="[" + flgs[i].ljust(3) if acks[i]==0: myitem=myitem+ ", S]," elif acks[i]>0: myitem=myitem+", S+"+str(acks[i])+"]," else : myitem=myitem+", S"+str(acks[i])+"]," print myitem.ljust(22), print "" print "\n" #This prints all packets. (Huge output!) # for i in range(0, len(results_g[0].probe_result_pkts6)) : # print "\n=======================TEST " + str(i) + " " + host.test6_ids[i] + "=======================\n" # for host in results_g : # print "[+] TEST " +str(i) + " " + host.test6_ids[i] + " " + (host.ostype + " " + host.ossubtype + " " + host.osversion) # if host.probe_result_pkts6[i]!=None : # host.probe_result_pkts6[i].show() # else : # print " This OS did not respond to the test" # ENTRY EXECUTION POINT main()