#!/usr/bin/env python # -*- coding: utf-8 -*- # ***********************IMPORTANT NMAP LICENSE TERMS************************ # * * # * The Nmap Security Scanner is (C) 1996-2009 Insecure.Com LLC. Nmap is * # * also a registered trademark of Insecure.Com LLC. This program is free * # * software; you may redistribute and/or modify it under the terms of the * # * GNU General Public License as published by the Free Software * # * Foundation; Version 2 with the clarifications and exceptions described * # * below. This guarantees your right to use, modify, and redistribute * # * this software under certain conditions. If you wish to embed Nmap * # * technology into proprietary software, we sell alternative licenses * # * (contact sales@insecure.com). Dozens of software vendors already * # * license Nmap technology such as host discovery, port scanning, OS * # * detection, and version detection. * # * * # * Note that the GPL places important restrictions on "derived works", yet * # * it does not provide a detailed definition of that term. To avoid * # * misunderstandings, we consider an application to constitute a * # * "derivative work" for the purpose of this license if it does any of the * # * following: * # * o Integrates source code from Nmap * # * o Reads or includes Nmap copyrighted data files, such as * # * nmap-os-db or nmap-service-probes. * # * o Executes Nmap and parses the results (as opposed to typical shell or * # * execution-menu apps, which simply display raw Nmap output and so are * # * not derivative works.) * # * o Integrates/includes/aggregates Nmap into a proprietary executable * # * installer, such as those produced by InstallShield. * # * o Links to a library or executes a program that does any of the above * # * * # * The term "Nmap" should be taken to also include any portions or derived * # * works of Nmap. This list is not exclusive, but is meant to clarify our * # * interpretation of derived works with some common examples. Our * # * interpretation applies only to Nmap--we don't speak for other people's * # * GPL works. * # * * # * If you have any questions about the GPL licensing restrictions on using * # * Nmap in non-GPL works, we would be happy to help. As mentioned above, * # * we also offer alternative license to integrate Nmap into proprietary * # * applications and appliances. These contracts have been sold to dozens * # * of software vendors, and generally include a perpetual license as well * # * as providing for priority support and updates as well as helping to * # * fund the continued development of Nmap technology. Please email * # * sales@insecure.com for further information. * # * * # * As a special exception to the GPL terms, Insecure.Com LLC grants * # * permission to link the code of this program with any version of the * # * OpenSSL library which is distributed under a license identical to that * # * listed in the included COPYING.OpenSSL file, and distribute linked * # * combinations including the two. You must obey the GNU GPL in all * # * respects for all of the code used other than OpenSSL. If you modify * # * this file, you may extend this exception to your version of the file, * # * but you are not obligated to do so. * # * * # * If you received these files with a written license agreement or * # * contract stating terms other than the terms above, then that * # * alternative license agreement takes precedence over these comments. * # * * # * Source is provided to this software because we believe users have a * # * right to know exactly what a program is going to do before they run it. * # * This also allows you to audit the software for security holes (none * # * have been found so far). * # * * # * Source code also allows you to port Nmap to new platforms, fix bugs, * # * and add new features. You are highly encouraged to send your changes * # * to nmap-dev@insecure.org for possible incorporation into the main * # * distribution. By sending these changes to Fyodor or one of the * # * Insecure.Org development mailing lists, it is assumed that you are * # * offering the Nmap Project (Insecure.Com LLC) the unlimited, * # * non-exclusive right to reuse, modify, and relicense the code. Nmap * # * will always be available Open Source, but this is important because the * # * inability to relicense code has caused devastating problems for other * # * Free Software projects (such as KDE and NASM). We also occasionally * # * relicense the code to third parties as discussed above. If you wish to * # * specify special license conditions of your contributions, just say so * # * when you send them. * # * * # * This program is distributed in the hope that it will be useful, but * # * WITHOUT ANY WARRANTY; without even the implied warranty of * # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * # * General Public License v2.0 for more details at * # * http://www.gnu.org/licenses/gpl-2.0.html , or in the COPYING file * # * included with Nmap. * # * * # ***************************************************************************/ import re import locale import os import os.path import time import StringIO import copy from types import StringTypes from xml.sax import make_parser from xml.sax import SAXException from xml.sax.handler import ContentHandler from xml.sax.saxutils import XMLGenerator from xml.sax.xmlreader import AttributesImpl as Attributes import zenmapCore.I18N from zenmapCore.UmitLogging import log from zenmapCore.NmapCommand import split_quoted from zenmapCore.NmapOptions import NmapOptions # The version of the Nmap DTD this file understands and emits. XML_OUTPUT_VERSION = "1.03" class HostInfo(object): def __init__(self): self.comment = None; self._tcpsequence = {} self._osclasses = [] self._osmatches = [] self._ports = [] self._ports_used = [] self._extraports = [] self._uptime = {} self._hostnames = [] self._tcptssequence = {} self._ipidsequence = {} self._ip = None self._ipv6 = None self._mac = None self._state = '' self._comment = '' self._trace = {} def make_clone(self): clone = HostInfo() clone.comment = self.comment clone._tcpsequence = copy.deepcopy(self._tcpsequence) clone._osclasses = map(lambda d: copy.deepcopy(d), self._osclasses) clone._osmatches = self._osmatches clone._ports = copy.deepcopy(self._ports) clone._ports_used = self._ports_used clone._extraports = self._extraports clone._uptime = copy.deepcopy(self._uptime) clone._hostnames = copy.deepcopy(self._hostnames) clone._tcptssequence = copy.deepcopy(self._tcptssequence) clone._ipidsequence = copy.deepcopy(self._ipidsequence) clone._ip = copy.deepcopy(self._ip) clone._ipv6 = copy.deepcopy(self._ipv6) clone._mac = copy.deepcopy(self._mac) clone._state = self._state clone._comment = self._comment clone._trace = copy.deepcopy(self._trace) return clone # tcpsequence is a dict of the form # {'index': u'203', # 'values': u'3637785D,35B440D1,35E9FC3B,3640DB42,355F5931,3601AE14', # 'difficulty': u'Good luck!'} def set_tcpsequence(self, sequence): self._tcpsequence = sequence def get_tcpsequence(self): if self._tcpsequence: return self._tcpsequence return {} # tcptssequence is a dict of the form # {'values': u'71D0483C,71D048A3,71D0490C,71D04973,71D049DB,71D04A45', # 'class': u'1000HZ'} def set_tcptssequence(self, sequence): self._tcptssequence = sequence def get_tcptssequence(self): if self._tcptssequence: return self._tcptssequence return {} # ipidsequence is a dict of the form # {'values': u'0,0,0,0,0,0', 'class': u'All zeros'} def set_ipidsequence(self, sequence): self._ipidsequence = sequence def get_ipidsequence(self): if self._ipidsequence: return self._ipidsequence return {} # osclasses is a list containing dicts of the form # {'vendor': u'Linux', 'osfamily': u'Linux', 'type': u'general purpose', # 'osgen': u'2.6.X', 'accuracy': u'98'} def set_osclasses(self, classes): self._osclasses = classes def get_osclasses(self): return self._osclasses # osmatches is a list of dicts of the form # {'name': u'Linux 2.6.24', 'accuracy': u'98', 'line': u'1000'} def set_osmatches(self, matches): self._osmatches = matches def get_osmatches(self): return self._osmatches def get_best_osmatch(self): """Return the OS match with the highest accuracy. If there is a tie, one of the best matches will be returned.""" if not self._osmatches: return None def osmatch_key(osmatch): # Sort first by accuracy, then by name so it's deterministic. try: accuracy = float(osmatch.get("accuracy", "")) except ValueError: accuracy = 0 return (accuracy, osmatch.get("name")) osmatches = self.osmatches[:] osmatches.sort(cmp = lambda a, b: cmp(osmatch_key(a), osmatch_key(b))) return osmatches[-1] def get_best_osclass(self): """Return the OS class with the highest accuracy. If there is a tie, one of the best matches will be returned.""" if not self._osclasses: return None def osclass_key(osclass): # Sort first by accuracy, then by name so it's deterministic. try: accuracy = float(osclass.get("accuracy", "")) except ValueError: accuracy = 0 return (accuracy, osclass.get("name")) osclasses = self.osclasses[:] osclasses.sort(cmp = lambda a, b: cmp(osclass_key(a), osclass_key(b))) return osclasses[-1] # ports_used is a list like # [{'state': u'open', 'portid': u'22', 'proto': u'tcp'}, # {'state': u'closed', 'portid': u'25', 'proto': u'tcp'}, # {'state': u'closed', 'portid': u'44054', 'proto': u'udp'}] # but not all three elements are necessarily present. def set_ports_used(self, ports): self._ports_used = ports def get_ports_used(self): return self._ports_used # uptime is a dict of the form # {'seconds': u'1909493', 'lastboot': u'Wed Jul 2 06:48:31 2008'} def set_uptime(self, uptime): self._uptime = uptime def get_uptime(self): if self._uptime: return self._uptime # Avoid empty dict return return {"seconds":"", "lastboot":""} # ports is an array containing dicts of the form # {'port_state': u'open', 'portid': u'22', 'protocol': u'tcp', # 'service_conf': u'10', 'service_extrainfo': u'protocol 2.0', # 'service_method': u'probed', 'service_name': u'ssh', # 'service_product': u'OpenSSH', 'service_version': u'4.3'} def set_ports(self, ports): self._ports = ports def get_ports(self): return self._ports # extraports is an array of dicts of the form # {'count': u'1709', 'state': u'filtered'} def set_extraports(self, port_list): self._extraports = port_list def get_extraports(self): return self._extraports # hostnames is a list containing dicts of the form # [{'hostname': u'scanme.nmap.org', 'hostname_type': u'PTR'}] def set_hostnames(self, hostname_list): self._hostnames = hostname_list def get_hostnames(self): return self._hostnames # ip, ipv6, and mac are either None or dicts of the form # {'vendor': u'', 'type': u'ipv4', 'addr': u'64.13.134.52'} def set_ip(self, addr): self._ip = addr def get_ip(self): return self._ip def set_mac(self, addr): self._mac = addr def get_mac(self): return self._mac def set_ipv6(self, addr): self._ipv6 = addr def get_ipv6(self): return self._ipv6 # comment is a string. def get_comment(self): return self._comment def set_comment(self, comment): self._comment = comment # state is a string like u'up' or u'down'. def set_state(self, status): self._state = status def get_state(self): return self._state def get_hostname(self): hostname = None if len(self._hostnames) > 0: hostname = self._hostnames[0]["hostname"] address = self.ip or self.ipv6 or self.mac if address is not None: address = address["addr"] if hostname is not None: if address is not None: return "%s (%s)" % (hostname, address) else: return hostname else: if address is not None: return address else: return _("Unknown Host") def get_port_count_by_states(self, states): count = 0 for p in self.ports: state = p.get('port_state') if state in states: count += 1 for extra in self.get_extraports(): if extra['state'] in states: count += int(extra['count']) return count def get_open_ports(self): return self.get_port_count_by_states(('open', 'open|filtered')) def get_filtered_ports(self): return self.get_port_count_by_states(('filtered', 'open|filtered', 'closed|filtered')) def get_closed_ports(self): return self.get_port_count_by_states(('closed', 'closed|filtered')) def get_scanned_ports(self): scanned = 0 for p in self.ports: scanned+=1 for extra in self.get_extraports(): scanned += int(extra["count"]) return scanned def get_services(self): services = [] for p in self.ports: services.append({"service_name":p.get("service_name", _("unknown")), "portid":p.get("portid", ""), "service_version":p.get("service_version", _("Unknown version")), "service_product":p.get("service_product", ""), "service_extrainfo":p.get("service_extrainfo", ""), "port_state":p.get("port_state", _("Unknown")), "protocol":p.get("protocol", "")}) return services def get_trace(self): return self._trace def set_trace(self, trace): self._trace = trace def append_trace_hop(self, hop): if "hops" in self._trace: self._trace["hops"].append(hop) else: self._trace["hops"] = [hop] def set_trace_error(self, errorstr): self._trace["error"] = errorstr # Properties tcpsequence = property(get_tcpsequence, set_tcpsequence) osclasses = property(get_osclasses, set_osclasses) osmatches = property(get_osmatches, set_osmatches) ports = property(get_ports, set_ports) ports_used = property(get_ports_used, set_ports_used) extraports = property(get_extraports, set_extraports) uptime = property(get_uptime, set_uptime) hostnames = property(get_hostnames, set_hostnames) tcptssequence = property(get_tcptssequence, set_tcptssequence) ipidsequence = property(get_ipidsequence, set_ipidsequence) ip = property(get_ip, set_ip) ipv6 = property(get_ipv6, set_ipv6) mac = property(get_mac, set_mac) state = property(get_state, set_state) comment = property(get_comment, set_comment) services = property(get_services) trace = property(get_trace, set_trace) class ParserBasics(object): def __init__ (self): # This flag informs us whether the XML output file is temporary (True), # or user specified (False). If any of them is user-specified, it # doesn't get stripped out of the command string in # _verify_output_options(). self.xml_is_temp = True self.nmap = {'nmaprun':{},\ 'scaninfo':[],\ 'verbose':'',\ 'debugging':'',\ 'hosts':[],\ 'runstats':{}} self.ops = NmapOptions() self._nmap_output = None def set_xml_is_temp(self, xml_is_temp): # This flag is False if a user has specified his own -oX option - in # which case we not should remove the -oX option from the command # string. A value of True means that we're using a temporary file which # should be removed from the command string (see # _verify_output_options()). self.xml_is_temp = xml_is_temp def get_profile_name(self): return self.nmap['nmaprun'].get('profile_name', '') def set_profile_name(self, name): self.nmap['nmaprun']['profile_name'] = name def get_target(self): return " ".join(self.ops.target_specs) def set_target(self, target): if target is None: self.ops.target_specs = [] else: self.ops.target_specs = target.split() def get_nmap_output(self): return self._nmap_output def set_nmap_output(self, nmap_output): self._nmap_output = nmap_output def get_debugging_level (self): return self.nmap.get('debugging', '') def set_debugging_level(self, level): self.nmap['debugging'] = level def get_verbose_level (self): return self.nmap.get('verbose', '') def set_verbose_level(self, level): self.nmap['verbose'] = level def get_scaninfo(self): return self.nmap.get('scaninfo', '') def set_scaninfo(self, info): self.nmap['scaninfo'] = info def get_services_scanned (self): if self._services_scanned == None: return self._services_scanned services = [] for scan in self.nmap.get('scaninfo', []): services.append(scan['services']) self._services_scanned = ','.join(services) return self._services_scanned def set_services_scanned (self, services_scanned): self._services_scanned = services_scanned def get_nmap_command (self): return self._verify_output_options(self.nmap['nmaprun'].get('args', '')) def set_nmap_command(self, command): self.nmap['nmaprun']['args'] = self._verify_output_options(command) def get_scan_type (self): types = [] for t in self.nmap.get('scaninfo', []): types.append(t['type']) return types def get_protocol (self): protocols = [] for proto in self.nmap.get('scaninfo', []): protocols.append(proto['protocol']) return protocols def get_num_services (self): if self._num_services == None: return self._num_services num = 0 for n in self.nmap.get('scaninfo', []): num += int(n['numservices']) self._num_services = num return self._num_services def set_num_services (self, num_services): self._num_services = num_services def get_date (self): epoch = int(self.nmap['nmaprun'].get('start', '0')) return time.localtime (epoch) def get_start(self): return self.nmap['nmaprun'].get('start', '0') def set_start(self, start): self.nmap['nmaprun']['start'] = start def set_date(self, date): if type(date) == type(int): self.nmap['nmaprun']['start'] = date else: raise Exception("Wrong date format. Date should be saved \ in epoch format!") def get_open_ports(self): ports = 0 for h in self.nmap.get('hosts', []): ports += h.get_open_ports() return ports def get_filtered_ports(self): ports = 0 for h in self.nmap.get('hosts', []): ports += h.get_filtered_ports() return ports def get_closed_ports(self): ports = 0 for h in self.nmap['hosts']: ports += h.get_closed_ports() return ports def get_formatted_date(self): return time.strftime("%B %d, %Y - %H:%M", self.get_date()) def get_scanner (self): return self.nmap['nmaprun'].get('scanner', '') def set_scanner(self, scanner): self.nmap['nmaprun']['scanner'] = scanner def get_scanner_version (self): return self.nmap['nmaprun'].get('version', '') def set_scanner_version(self, version): self.nmap['nmaprun']['version'] = version # IPv4 def get_ipv4(self): hosts = self.nmap.get('hosts') if hosts is None: return [] return [host.ip for host in hosts if host.ip is not None] # MAC def get_mac(self): hosts = self.nmap.get('hosts') if hosts is None: return [] return [host.mac for host in hosts if host.mac is not None] # IPv6 def get_ipv6(self): hosts = self.nmap.get('hosts') if hosts is None: return [] return [host.ipv6 for host in hosts if host.ipv6 is not None] def get_hostnames (self): hostnames = [] for host in self.nmap.get('hosts', []): hostnames += host.get_hostnames() return hostnames def get_hosts(self): return self.nmap.get('hosts', None) def get_runstats(self): return self.nmap.get('runstats', None) def set_runstats(self, stats): self.nmap['runstats'] = stats def get_hosts_down(self): return int(self.nmap['runstats'].get('hosts_down', '0')) def set_hosts_down(self, down): self.nmap['runstats']['hosts_down'] = int(down) def get_hosts_up(self): return int(self.nmap['runstats'].get('hosts_up', '0')) def set_hosts_up(self, up): self.nmap['runstats']['hosts_up'] = int(up) def get_hosts_scanned(self): return int(self.nmap['runstats'].get('hosts_scanned', '0')) def set_hosts_scanned(self, scanned): self.nmap['runstats']['hosts_scanned'] = int(scanned) def get_finish_time (self): return time.localtime(int(self.nmap['runstats'].get('finished_time', '0'))) def set_finish_time(self, finish): self.nmap['runstats']['finished_time'] = int(finish) def get_finish_epoc_time(self): return int(self.nmap['runstats'].get('finished_time', '0')) def set_finish_epoc_time(self, time): self.nmap['runstats']['finished_time'] = time def get_scan_name(self): """Get a human-readable string representing this scan.""" scan_name = self.nmap.get("scan_name") if scan_name: return scan_name if self.profile_name and self.target: return _("%s on %s") % (self.profile_name, self.target) return self.get_nmap_command() def set_scan_name(self, scan_name): self.nmap["scan_name"] = scan_name def get_formatted_finish_date(self): return time.strftime("%B %d, %Y - %H:%M", self.get_finish_time()) def _verify_output_options(self, command): """Remove and -oX options from the command stored in the XML file, unless they were put there by the user.""" command_list = split_quoted(command) i = 0 while i < len(command_list): if command_list[i] == "-oX" and self.xml_is_temp: del command_list[i:i + 2] else: i += 1 return " ".join (command_list) profile_name = property(get_profile_name, set_profile_name) target = property(get_target, set_target) nmap_output = property(get_nmap_output, set_nmap_output) debugging_level = property(get_debugging_level, set_debugging_level) verbose_level = property(get_verbose_level, set_verbose_level) scaninfo = property(get_scaninfo, set_scaninfo) services_scanned = property(get_services_scanned, set_services_scanned) nmap_command = property(get_nmap_command, set_nmap_command) scan_type = property(get_scan_type) protocol = property(get_protocol) num_services = property(get_num_services, set_num_services) date = property(get_date, set_date) open_ports = property(get_open_ports) filtered_ports = property(get_filtered_ports) closed_ports = property(get_closed_ports) formatted_date = property(get_formatted_date) scanner = property(get_scanner, set_scanner) scanner_version = property(get_scanner_version, set_scanner_version) ipv4 = property(get_ipv4) mac = property(get_mac) ipv6 = property(get_ipv6) hostnames = property(get_hostnames) hosts = property(get_hosts) runstats = property(get_runstats, set_runstats) hosts_down = property(get_hosts_down, set_hosts_down) hosts_up = property(get_hosts_up, set_hosts_up) hosts_scanned = property(get_hosts_scanned, set_hosts_scanned) finish_time = property(get_finish_time, set_finish_time) finish_epoc_time = property(get_finish_epoc_time, set_finish_epoc_time) formatted_finish_date = property(get_formatted_finish_date) start = property(get_start, set_start) scan_name = property(get_scan_name, set_scan_name) _num_services = None _services_scanned = None class NmapParserSAX(ParserBasics, ContentHandler): def __init__(self): ParserBasics.__init__(self) # The text inside an xml-stylesheet processing instruction, like # 'href="file:///usr/share/nmap/nmap.xsl" type="text/xsl"'. self.xml_stylesheet_data = None self.in_interactive_output = False self.in_run_stats = False self.in_host = False self.in_hostnames = False self.in_ports = False self.in_port = False self.in_os = False self.in_trace = False self.list_extraports = [] self.filename = None self.unsaved = False def set_parser(self, parser): self.parser = parser def parse(self, f): """Parse an Nmap XML file from the file-like object f.""" self.parser.parse(f) def parse_file(self, filename): """Parse an Nmap XML file from the named file.""" f = open(filename, "r") try: self.parse(f) self.filename = filename finally: f.close() def _parse_nmaprun(self, attrs): run_tag = "nmaprun" if self._nmap_output is None and attrs.has_key("nmap_output"): self._nmap_output = attrs["nmap_output"] self.nmap[run_tag]["profile_name"] = attrs.get("profile_name", "") self.nmap[run_tag]["start"] = attrs.get("start", "") self.nmap[run_tag]["args"] = attrs.get("args", "") self.nmap[run_tag]["scanner"] = attrs.get("scanner", "") self.nmap[run_tag]["version"] = attrs.get("version", "") self.nmap[run_tag]["xmloutputversion"] = attrs.get("xmloutputversion", "") command_string = self.get_nmap_command() command_list = command_string.split() self.ops.parse(command_list[1:]) def _parse_output(self, attrs): if attrs.get("type") != "interactive": return if self.in_interactive_output: raise SAXException("Unexpected nested \"output\" element.") self.in_interactive_output = True self.nmap_output = "" def _parse_scaninfo(self, attrs): dic = {} dic["type"] = attrs.get("type", "") dic["protocol"] = attrs.get("protocol", "") dic["numservices"] = attrs.get("numservices", "") dic["services"] = attrs.get("services", "") self.nmap["scaninfo"].append(dic) def _parse_verbose(self, attrs): self.nmap["verbose"] = attrs.get("level", "") def _parse_debugging(self, attrs): self.nmap["debugging"] = attrs.get("level", "") def _parse_runstats_finished(self, attrs): self.nmap["runstats"]["finished_time"] = attrs.get("time", "") def _parse_runstats_hosts(self, attrs): self.nmap["runstats"]["hosts_up"] = attrs.get("up", "") self.nmap["runstats"]["hosts_down"] = attrs.get("down", "") self.nmap["runstats"]["hosts_scanned"] = attrs.get("total", "") def _parse_host(self, attrs): self.host_info = HostInfo() self.host_info.comment = attrs.get("comment", "") def _parse_host_status(self, attrs): self.host_info.set_state(attrs.get("state", "")) def _parse_host_address(self, attrs): address_attributes = {"type":attrs.get("addrtype", ""), "vendor":attrs.get("vendor", ""), "addr":attrs.get("addr", "")} if address_attributes["type"] == "ipv4": self.host_info.set_ip(address_attributes) elif address_attributes["type"] == "ipv6": self.host_info.set_ipv6(address_attributes) elif address_attributes["type"] == "mac": self.host_info.set_mac(address_attributes) def _parse_host_hostname(self, attrs): self.list_hostnames.append({"hostname":attrs.get("name", ""), "hostname_type":attrs.get("type", "")}) def _parse_host_extraports(self, attrs): self.list_extraports.append({"state":attrs.get("state", ""), "count":attrs.get("count", "")}) def _parse_host_port(self, attrs): self.dic_port = {"protocol":attrs.get("protocol", ""), "portid":attrs.get("portid", "")} def _parse_host_port_state(self, attrs): self.dic_port["port_state"] = attrs.get("state", "") self.dic_port["reason"] = attrs.get("reason", "") self.dic_port["reason_ttl"] = attrs.get("reason_ttl", "") def _parse_host_port_service(self, attrs): self.dic_port["service_name"] = attrs.get("name", "") self.dic_port["service_method"] = attrs.get("method", "") self.dic_port["service_conf"] = attrs.get("conf", "") self.dic_port["service_product"] = attrs.get("product", "") self.dic_port["service_version"] = attrs.get("version", "") self.dic_port["service_extrainfo"] = attrs.get("extrainfo", "") def _parse_host_osmatch(self, attrs): self.list_osmatch.append(self._parsing(attrs, ['name', 'accuracy', 'line'])) def _parse_host_portused(self, attrs): self.list_portused.append(self._parsing(attrs, ['state','proto','portid'])) def _parse_host_osclass(self, attrs): self.list_osclass.append(self._parsing(attrs, ['type', 'vendor', 'osfamily', 'osgen', 'accuracy'])) def _parsing(self, attrs, attrs_list): # Returns a dict with the attributes of a given tag with the # atributes names as keys and their respective values dic = {} for at in attrs_list: dic[at] = attrs.get(at, "") return dic def _parse_host_uptime(self, attrs): self.host_info.set_uptime(self._parsing(attrs, ["seconds", "lastboot"])) def _parse_host_tcpsequence(self, attrs): self.host_info.set_tcpsequence(self._parsing(attrs, ['index', 'difficulty', 'values'])) def _parse_host_tcptssequence(self, attrs): self.host_info.set_tcptssequence(self._parsing(attrs, ['class', 'values'])) def _parse_host_ipidsequence(self, attrs): self.host_info.set_ipidsequence(self._parsing(attrs, ['class', 'values'])) def _parse_host_trace(self, attrs): trace = {} for attr in ["proto", "port"]: trace[attr] = attrs.get(attr, "") self.host_info.set_trace(trace) def _parse_host_trace_hop(self, attrs): hop = self._parsing(attrs, ["ttl", "rtt", "ipaddr", "host"]) self.host_info.append_trace_hop(hop) def _parse_host_trace_error(self, attrs): self.host_info.set_trace_error(attrs.get("errorstr", "")) def processingInstruction(self, target, data): if target == "xml-stylesheet": self.xml_stylesheet_data = data def startElement(self, name, attrs): if name == "nmaprun": self._parse_nmaprun(attrs) if name == "output": self._parse_output(attrs) elif name == "scaninfo": self._parse_scaninfo(attrs) elif name == "verbose": self._parse_verbose(attrs) elif name == "debugging": self._parse_debugging(attrs) elif name == "runstats": self.in_run_stats = True elif self.in_run_stats and name == "finished": self._parse_runstats_finished(attrs) elif self.in_run_stats and name == "hosts": self._parse_runstats_hosts(attrs) elif name == "host": self.in_host = True self._parse_host(attrs) self.list_ports = [] self.list_extraports = [] elif self.in_host and name == "status": self._parse_host_status(attrs) elif self.in_host and name == "address": self._parse_host_address(attrs) elif self.in_host and name == "hostnames": self.in_hostnames = True self.list_hostnames = [] elif self.in_host and self.in_hostnames and name == "hostname": self._parse_host_hostname(attrs) elif self.in_host and name == "ports": self.in_ports = True elif self.in_host and self.in_ports and name == "extraports": self._parse_host_extraports(attrs) elif self.in_host and self.in_ports and name == "port": self.in_port = True self._parse_host_port(attrs) elif self.in_host and self.in_ports and \ self.in_port and name == "state": self._parse_host_port_state(attrs) elif self.in_host and self.in_ports and \ self.in_port and name == "service": self._parse_host_port_service(attrs) elif self.in_host and name == "os": self.in_os = True self.list_portused = [] self.list_osmatch = [] self.list_osclass = [] elif self.in_host and self.in_os and name == "osmatch": self._parse_host_osmatch(attrs) elif self.in_host and self.in_os and name == "portused": self._parse_host_portused(attrs) elif self.in_host and self.in_os and name == "osclass": self._parse_host_osclass(attrs) elif self.in_host and name == "uptime": self._parse_host_uptime(attrs) elif self.in_host and name == "tcpsequence": self._parse_host_tcpsequence(attrs) elif self.in_host and name == "tcptssequence": self._parse_host_tcptssequence(attrs) elif self.in_host and name == "ipidsequence": self._parse_host_ipidsequence(attrs) elif self.in_host and name == "trace": self.in_trace = True self._parse_host_trace(attrs) elif self.in_host and self.in_trace and name == "hop": self._parse_host_trace_hop(attrs) elif self.in_host and self.in_trace and name == "error": self._parse_host_trace_error(attrs) def endElement(self, name): if name == "runstats": self.in_interactive_output = False elif name == "runstats": self.in_run_stats = False elif name == "host": self.in_host = False self.host_info.set_extraports(self.list_extraports) self.host_info.set_ports(self.list_ports) self.nmap["hosts"].append(self.host_info) elif self.in_host and name == "hostnames": self.in_hostnames = False self.host_info.set_hostnames(self.list_hostnames) elif self.in_host and name == "ports": self.in_ports = False elif self.in_host and self.in_ports and name == "port": self.in_port = False self.list_ports.append(self.dic_port) del(self.dic_port) elif self.in_host and self.in_os and name == "os": self.in_os = False self.host_info.set_ports_used(self.list_portused) self.host_info.set_osmatches(self.list_osmatch) self.host_info.set_osclasses(self.list_osclass) del(self.list_portused) del(self.list_osmatch) del(self.list_osclass) elif self.in_host and self.in_trace and name == "trace": self.in_trace = False def characters(self, content): if self.in_interactive_output: self.nmap_output += content def write_xml(self, f): """Write the XML representation of this object to the file-like object f.""" writer = XMLGenerator(f) writer.startDocument() if self.xml_stylesheet_data is not None: writer.processingInstruction("xml-stylesheet", self.xml_stylesheet_data) self._write_nmaprun(writer) self._write_scaninfo(writer) self._write_verbose(writer) self._write_debugging(writer) self._write_output(writer) self._write_hosts(writer) self._write_runstats(writer) writer.endElement("nmaprun") writer.endDocument() def get_xml(self): """Return a string containing the XML representation of this scan.""" buffer = StringIO.StringIO() self.write_xml(buffer) string = buffer.getvalue() buffer.close() return string def write_xml_to_file(self, filename): """Write the XML representation of this scan to the file whose name is given.""" fd = open(filename, "wb") self.write_xml(fd) fd.close() def _write_output(self, writer): if self._nmap_output is None: return writer.startElement("output", Attributes({"type": "interactive"})) writer.characters(self.nmap_output) writer.endElement("output") def _write_runstats(self, writer): ################## # Runstats element writer.startElement("runstats", Attributes(dict())) ## Finished element writer.startElement("finished", Attributes(dict(time = str(self.finish_epoc_time), timestr = time.ctime(time.mktime(self.get_finish_time()))))) writer.endElement("finished") ## Hosts element writer.startElement("hosts", Attributes(dict(up = str(self.hosts_up), down = str(self.hosts_down), total = str(self.hosts_scanned)))) writer.endElement("hosts") writer.endElement("runstats") # End of Runstats element ######################### def _write_hosts(self, writer): for host in self.hosts: # Start host element writer.startElement("host", Attributes(dict(comment=host.comment))) # Status element writer.startElement("status", Attributes(dict(state=host.state))) writer.endElement("status") ################## # Address elements ## IPv4 if host.ip is not None: writer.startElement("address", Attributes(dict(addr=host.ip.get("addr", ""), vendor=host.ip.get("vendor", ""), addrtype=host.ip.get("type", "")))) writer.endElement("address") ## IPv6 if host.ipv6 is not None: writer.startElement("address", Attributes(dict(addr=host.ipv6.get("addr", ""), vendor=host.ipv6.get("vendor", ""), addrtype=host.ipv6.get("type", "")))) writer.endElement("address") ## MAC if host.mac is not None: writer.startElement("address", Attributes(dict(addr=host.mac.get("addr", ""), vendor=host.mac.get("vendor", ""), addrtype=host.mac.get("type", "")))) writer.endElement("address") # End of Address elements ######################### ################### # Hostnames element writer.startElement("hostnames", Attributes({})) for hname in host.hostnames: if type(hname) == type({}): writer.startElement("hostname", Attributes(dict(name = hname.get("hostname", ""), type = hname.get("hostname_type", "")))) writer.endElement("hostname") writer.endElement("hostnames") # End of Hostnames element ########################## ############### # Ports element writer.startElement("ports", Attributes({})) ## Extraports elements for ext in host.get_extraports(): if type(ext) == type({}): writer.startElement("extraports", Attributes(dict(count = ext.get("count", ""), state = ext.get("state", "")))) writer.endElement("extraports") ## Port elements for p in host.ports: if type(p) == type({}): writer.startElement("port", Attributes(dict(portid = p.get("portid", ""), protocol = p.get("protocol", "")))) ### Port state writer.startElement("state", Attributes(dict(state=p.get("port_state", ""), reason=p.get("reason", ""), reason_ttl=p.get("reason_ttl", "")))) writer.endElement("state") ### Port service info d = {} for xml_attr, member in (("conf", "service_conf"), ("method", "service_method"), ("name", "service_name"), ("product", "service_product"), ("version", "service_version"), ("extrainfo", "service_extrainfo")): if p.get(member): d[xml_attr] = p.get(member) writer.startElement("service", Attributes(d)) writer.endElement("service") writer.endElement("port") writer.endElement("ports") # End of Ports element ###################### ############ # OS element writer.startElement("os", Attributes({})) ## Ports used elements for pu in host.ports_used: if type(pu) == type({}): writer.startElement("portused", Attributes(dict(state = pu.get("state", ""), proto = pu.get("proto", ""), portid = pu.get("portid", "")))) writer.endElement("portused") ## Osclass elements for oc in host.osclasses: if type(oc) == type({}): writer.startElement("osclass", Attributes(dict(vendor = oc.get("vendor", ""), osfamily = oc.get("osfamily", ""), type = oc.get("type", ""), osgen = oc.get("osgen", ""), accuracy = oc.get("accuracy", "")))) writer.endElement("osclass") ## Osmatch elements for om in host.osmatches: if type(om) == type({}): writer.startElement("osmatch", Attributes(dict(name = om.get("name", ""), accuracy = om.get("accuracy", ""), line = om.get("line", "")))) writer.endElement("osmatch") writer.endElement("os") # End of OS element ################### # Uptime element if type(host.uptime) == type({}): writer.startElement("uptime", Attributes(dict(seconds = host.uptime.get("seconds", ""), lastboot = host.uptime.get("lastboot", "")))) writer.endElement("uptime") ##################### # Sequences elementes ## TCP Sequence element # Cannot use dict() here, because of the 'class' attribute. if type(host.tcpsequence) == type({}): writer.startElement("tcpsequence", Attributes({"index":host.tcpsequence.get("index", ""), "difficulty":host.tcpsequence.get("difficulty", ""), "values":host.tcpsequence.get("values", "")})) writer.endElement("tcpsequence") ## IP ID Sequence element if type(host.ipidsequence) == type({}): writer.startElement("ipidsequence", Attributes({"class":host.ipidsequence.get("class", ""), "values":host.ipidsequence.get("values", "")})) writer.endElement("ipidsequence") ## TCP TS Sequence element if type(host.tcptssequence) == type({}): writer.startElement("tcptssequence", Attributes({"class":host.tcptssequence.get("class", ""), "values":host.tcptssequence.get("values", "")})) writer.endElement("tcptssequence") # End of sequences elements ########################### ## Trace element if len(host.trace) > 0: writer.startElement("trace", Attributes({"proto":host.trace.get("proto", ""), "port":host.trace.get("port", "")})) if "hops" in host.trace: for hop in host.trace["hops"]: writer.startElement("hop", Attributes({"ttl":hop["ttl"], "rtt":hop["rtt"], "ipaddr":hop["ipaddr"], "host":hop["host"]})) writer.endElement("hop") if "error" in host.trace: writer.startElement("error", Attributes({"errorstr":host.trace["error"]})) writer.endElement("error") writer.endElement("trace") # End of trace element ########################### # End host element writer.endElement("host") def _write_debugging(self, writer): writer.startElement("debugging", Attributes(dict( level=str(self.debugging_level)))) writer.endElement("debugging") def _write_verbose(self, writer): writer.startElement("verbose", Attributes(dict( level=str(self.verbose_level)))) writer.endElement("verbose") def _write_scaninfo(self, writer): for scan in self.scaninfo: if type(scan) == type({}): writer.startElement("scaninfo", Attributes(dict(type = scan.get("type", ""), protocol = scan.get("protocol", ""), numservices = scan.get("numservices", ""), services = scan.get("services", "")))) writer.endElement("scaninfo") def _write_nmaprun(self, writer): writer.startElement("nmaprun", Attributes(dict(args = str(self.nmap_command), profile_name = str(self.profile_name), scanner = str(self.scanner), start = str(self.start), startstr = time.ctime(time.mktime(self.get_date())), version = str(self.scanner_version), xmloutputversion = str(XML_OUTPUT_VERSION)))) def set_unsaved(self): self.unsaved = True def is_unsaved(self): return self.unsaved def nmap_parser_sax(): parser = make_parser() nmap_parser = NmapParserSAX() parser.setContentHandler(nmap_parser) nmap_parser.set_parser(parser) return nmap_parser NmapParser = nmap_parser_sax if __name__ == '__main__': import sys file_to_parse = sys.argv[1] np = NmapParser() np.parse_file(file_to_parse) for host in np.hosts: print "%s:" % host.ip["addr"] print " Comment:", repr(host.comment) print " TCP sequence:", repr(host.tcpsequence) print " TCP TS sequence:", repr(host.tcptssequence) print " IP ID sequence:", repr(host.ipidsequence) print " Uptime:", repr(host.uptime) print " OS Match:", repr(host.osmatches) print " Ports:" for p in host.ports: print "\t%s" % repr(p) print " Ports used:", repr(host.ports_used) print " OS Class:", repr(host.osclasses) print " Hostnames:", repr(host.hostnames) print " IP:", repr(host.ip) print " IPv6:", repr(host.ipv6) print " MAC:", repr(host.mac) print " State:", repr(host.state) if "hops" in host.trace: print " Trace:" for hop in host.trace["hops"]: print " ", repr(hop) print