#!/usr/bin/env python import re import tempfile import sys sys.path.append("liblinear-1.8/python") import liblinearutil as ll from scapy.all import * __all__ = ["parse_6fp", "parse_6fp_file", "parse_nmapfp", "parse_nmapfp_file", "parse_nmapfp_raw", "parse_fp_file_magic", "parse_feature_set", "parse_feature_set_file", "parse_model", "parse_model_file", "parse_groups", "cmp_probename"] class OSClass (object): def __init__(self, l): self.nmapclass = l self.cpe = [] @staticmethod def parse(s): return OSClass(strip_pipes(s)) class OSDescription (object): def __init__(self): self.nmapname = None self.nmapclasses = [] class Group (object): def __init__(self): self.desc = OSDescription() self.rs_list = [] def add_rs(self, rs): self.rs_list.append(rs) def wrap_filename(fn): """Wrap a function that takes a file argument with one that takes a filename.""" def wrapped(filename, *args, **kwargs): f = open(filename) try: return fn(f, *args, **kwargs) finally: f.close() return wrapped def preprocess_6fp(f): for line in f: m = re.match(r'^#PARSE#\s*(.*?)\s*$', line) if m: yield m.group(1) def nameval_pairs(f): for line in f: m = re.match(r'^(\w+)=(.*)', line) assert(m is not None) yield m.group(1), m.group(2) def strip_pipes(s): return tuple(x.strip() or None for x in s.split("|")) def grep_nmapfp(f): result = [] for line in f: line = line.strip() if line.startswith("OS:"): result.append(line[3:]) return "".join(result) def run_length_decode(s): def f(m): c, n = m.groups() n = int(n) return c * n return re.sub(r'(..){(\d+)}', f, s) def parse_nmapfp_raw(nmapfp): fp = [] for m in re.finditer(r'([\w.]+)\((.*?)\)', nmapfp): probe_name, tests_str = m.groups() tests = [] indiv_tests = tests_str.split("%") for indiv_test in indiv_tests: m = re.match(r'(\w+)=(.*)', indiv_test) test_name, test_value = m.groups() tests.append((test_name, run_length_decode(test_value))) fp.append((probe_name, tests)) return fp PROBE_ORDER = [ "S1", "S2", "S3", "S4", "S5", "S6", "TECN", "T2", "T3", "T4", "T5", "T6", "T7", "IE1", "IE2", "NS", "NI", "U1", ] def cmp_probename(a, b): return cmp(PROBE_ORDER.index(a), PROBE_ORDER.index(b)) PROBE_NAME_MAP = { "NMAP_OS_PROBE_TCP_0": "S1", "NMAP_OS_PROBE_TCP_1": "S2", "NMAP_OS_PROBE_TCP_2": "S3", "NMAP_OS_PROBE_TCP_3": "S4", "NMAP_OS_PROBE_TCP_4": "S5", "NMAP_OS_PROBE_TCP_5": "S6", "NMAP_OS_PROBE_TCP_6": "TECN", "NMAP_OS_PROBE_TCP_7": "T2", "NMAP_OS_PROBE_TCP_8": "T3", "NMAP_OS_PROBE_TCP_9": "T4", "NMAP_OS_PROBE_TCP_10": "T5", "NMAP_OS_PROBE_TCP_11": "T6", "NMAP_OS_PROBE_TCP_12": "T7", "ICMP_ExtHdrs_17": "IE1", "ICMP_ExtHdrs_32": "IE2", "ICMPNSol_0": "NS", "ICMP_NI_Query_15": "NI", "NMAP_OS_PROBE_UDP": "U1", } class Response (object): def __init__(self): self.p = None self.send_time = None self.recv_time = None def parse_6fp_response_string(r): m = re.match(r'^{\s*(\d+)\s*,(?:\s*\d+\s*,)?\s*([0-9a-fA-F ]*)}\s*$', r) assert(m is not None) test_no, contents_hex = m.groups() test_no = int(test_no) contents_hex = re.sub(r'\s+', "", contents_hex) contents = contents_hex.decode("hex") if contents: packet = IPv6(contents) else: packet = None return test_no, packet SEQ_PROBE_NAMES = ( "S1", "S2", "S3", "S4", "S5", "S6", ) TIMED6_DPORT_MAP = { 48621: (1, "S1"), 48622: (2, "S2"), 48623: (3, "S3"), 48624: (4, "S4"), 48625: (5, "S5"), 48626: (6, "S6"), } class ResponseSet (object): def __init__(self): self.desc = OSDescription() self.flow_label = None self.responses = {} self.timed_responses = {} def get(self, key): return self.responses.get(key) def __getitem__(self, key): return self.responses[key] def parse_6fp(self, f): for name, val in nameval_pairs(preprocess_6fp(f)): if name == "osclass": pass elif name == "nmapname": assert(self.desc.nmapname is None) self.desc.nmapname = val.strip() elif name == "nmapclass": self.desc.nmapclasses.append(strip_pipes(val)) elif name == "flow_label": self.flow_label = int(val) elif name == "test6_id": test_id = PROBE_NAME_MAP.get(val, val) assert(test_id not in self.responses) self.current_resp = Response() self.responses[test_id] = [self.current_resp] elif name == "test4_id": # Ignore. self.current_resp = None elif name == "sent6": test_no, packet = parse_6fp_response_string(val) if packet and self.flow_label is None: self.flow_label = packet.fl elif name == "result6": assert(self.current_resp is not None) test_no, packet = parse_6fp_response_string(val) if packet: self.current_resp.p = packet elif name == "timed6_result": # The timed6_result lines, under test id IPv6_NmapProbes_100ms, # actually stand for S1 through S6, sent with correct timing. We # prefer the timed6_result replies when available (handled at # the end of this function). test_no, packet = parse_6fp_response_string(val) index, test_id = TIMED6_DPORT_MAP[packet[TCP].dport] if packet: resp = Response() resp.p = packet self.timed_responses.setdefault(test_id, []) self.timed_responses[test_id].append(resp) elif name == "elapsed": if self.current_resp is not None: self.current_resp.send_time = float(val) # We don't have separate send and receive times in 6fp format. self.current_resp.recv_time = float(val) if self.flow_label is None: self.flow_label = 0x12345 untimed_seq_probes = set([x for x in self.responses.keys() if x in SEQ_PROBE_NAMES]) timed_seq_probes = set([x for x in self.timed_responses.keys() if x in SEQ_PROBE_NAMES]) if untimed_seq_probes.issubset(timed_seq_probes): for v in SEQ_PROBE_NAMES: if v in self.timed_responses: self.responses[v] = self.timed_responses[v] # Synthesize timing for seq probes. for i, v in enumerate(SEQ_PROBE_NAMES): resps = self.responses.get(v) if resps is None: continue for resp in resps: if resp.send_time is None: resp.send_time = resp.recv_time = (i + 1) * 0.1 def parse_nmapfp_string(self, s): fp = parse_nmapfp_raw(s) for probe_name, tests in fp: if probe_name == "SCAN": continue if probe_name == "EXTRA": for test_name, test_value in tests: if test_name == "FL": self.flow_label = int(test_value, 16) continue # Map for some early fingerprints that used the long names. probe_name = PROBE_NAME_MAP.get(probe_name, probe_name) resp = Response() for test_name, test_value in tests: if test_name == "P": contents_hex = test_value.replace("X", "0") contents = contents_hex.decode("hex") resp.p = IPv6(contents) elif test_name == "ST": resp.send_time = float(test_value) elif test_name == "RT": resp.recv_time = float(test_value) self.responses.setdefault(probe_name, []) self.responses[probe_name].append(resp) if self.flow_label is None: # Early versions of nmap-os6 set the flow label to 0. The versions # that set the flow label also set META(FL). self.flow_label = 0 def parse_nmapfp(self, f): self.parse_nmapfp_string(grep_nmapfp(f)) def parse_6fp(f): rs = ResponseSet() rs.parse_6fp(f) return rs def parse_nmapfp(f): rs = ResponseSet() rs.parse_nmapfp(f) return rs parse_6fp_file = wrap_filename(parse_6fp) parse_nmapfp_file = wrap_filename(parse_nmapfp) def parse_fp_file_magic(filename): """Guess the format of a fingerprint file and parse it.""" parser = None f = open(filename) try: for line in f: if line.startswith("#PARSE#"): parser = parse_6fp break if line.startswith("OS:"): parser = parse_nmapfp break if parser: f.seek(0) return parser(f) finally: f.close() raise ValueError("Can't guess format of %r" % filename) def preprocess_generic(f): """Strip comments.""" for line in f: line = re.sub(r'#.*', '', line) yield line def is_feature_name(s): return re.match(r'\w+(\.\w+)*', s) is not None def make_tokenizer(f): def tok(): for line in preprocess_generic(f): for part in line.split(): yield part gen = tok() def fun(): return next(gen, None) return fun def cartesian_product(a, b): p = [] for x in a: for y in b: p.append(x + "." + y) return p class SetParser (object): def __init__(self, f): self.f = f self.nt = None self.vars = {} self.tokens = [] self.get_token_prim = make_tokenizer(f) def get_token(self): if self.tokens: token = self.tokens.pop(0) else: token = self.get_token_prim() return token def unget_token(self, token): """Unget the current self.nt; set self.nt to the given value.""" self.tokens.append(self.nt) self.nt = token def setvar(self, name, val): assert name not in self.vars self.vars[name] = val def expandvar(self, name): return self.vars[name] def parse_simple(self): cur = [] if self.nt == "[": self.nt = self.get_token() while self.nt != "]": cur.extend(self.parse_product()) self.nt = self.get_token() elif self.nt.startswith("$"): varname = self.nt cur.extend(self.expandvar(varname)) self.nt = self.get_token() else: assert is_feature_name(self.nt), self.nt cur.append(self.nt) self.nt = self.get_token() return cur def parse_product(self): accum = [] accum = self.parse_simple() while self.nt == "*": self.nt = self.get_token() cur = self.parse_simple() accum = cartesian_product(accum, cur) return accum def parse_statement(self): if self.nt.startswith("$"): varname = self.nt self.nt = self.get_token() if self.nt == "=": self.nt = self.get_token() self.setvar(varname, self.parse_product()) return [] self.unget_token(varname) return self.parse_product() def parse(self): feature_names = [] self.nt = self.get_token() while self.nt is not None: feature_names.extend(self.parse_statement()) return feature_names def parse_feature_set(f): parser = SetParser(f) return parser.parse() parse_feature_set_file = wrap_filename(parse_feature_set) class Model (object): def __init__(self): self.feature_names = [] self.scale_params = None # liblinear model. self.model = None # Array of lists of descs (list indexes match liblinear labels). self.descs = [] # Array of lists of variances (one per feature). self.variances = [] # Array of lists of means (one per feature). self.means = [] def parse(self, f): C = None iter = preprocess_generic(f) while True: line = next(iter, None) if line is None: break if line.strip() == "" or line.strip().startswith("#"): continue parts = line.split() if parts == ["begin", "features"]: assert(len(self.feature_names) == 0) while True: line = next(iter) if line.split() == ["end", "features"]: break self.feature_names.append(line.strip()) elif parts == ["begin", "scale"]: if self.scale_params is None: self.scale_params = [] while True: line = next(iter) if line.split() == ["end", "scale"]: break s_min, s_max = line.split(",") s_min = float(s_min) s_max = float(s_max) self.scale_params.append((s_min, s_max)) elif parts == ["begin", "liblinear"]: model_file = tempfile.NamedTemporaryFile() while True: line = next(iter) if line.split() == ["end", "liblinear"]: break print >> model_file, line model_file.flush() model = ll.load_model(model_file.name) model_file.close() self.model = model elif parts[0] == "c": assert len(parts) == 2 C = float(parts[1]) elif parts[0] == "class": self.descs.append([]) self.descs[-1] = OSDescription() elif parts[0] == "nmapname": nmapname = " ".join(parts[1:]) desc = self.descs[-1] desc.nmapname = nmapname elif parts[0] == "nmapclass": nmapclass_str = " ".join(parts[1:]) desc = self.descs[-1] osclass = OSClass.parse(nmapclass_str) desc.nmapclasses.append(osclass) elif parts[0] == "cpe": assert len(parts) == 2 cpe = parts[1] desc = self.descs[-1] osclass = desc.nmapclasses[-1] osclass.cpe.append(cpe) elif parts[0] == "variances": variances = [float(x) for x in parts[1:]] self.variances.append(variances) elif parts[0] == "means": means = [float(x) for x in parts[1:]] self.means.append(means) else: raise ValueError(line) self.model.param.C = C def parse_model(f): model = Model() model.parse(f) return model parse_model_file = wrap_filename(parse_model) def parse_groups(f): groups = [] group = None osclass = None fprint = None for line in preprocess_generic(f): if fprint is not None: if re.match(r'^[A-Z0-9]+\(', line): fprint.append(line) continue elif line.strip() == "": continue else: rs = ResponseSet() rs.parse_nmapfp_string("".join(fprint)) group.add_rs(rs) fprint = None parts = [x.strip() for x in line.split(None, 1)] if not parts: continue try: keyword, rest = parts except ValueError: keyword, rest = parts[0], None if keyword == "group": group = Group() osclass = None group.desc.nmapname = rest groups.append(group) elif keyword == "nmapclass": osclass = OSClass.parse(rest) group.desc.nmapclasses.append(osclass) elif keyword == "cpe": parts = rest.split() try: cpe, flags = rest.split() except ValueError: cpe = rest.strip() flags = None osclass.cpe.append(cpe) elif keyword == "print": # Start reading fingerprint lines. fprint = [] elif keyword == "file": group.add_rs(parse_6fp_file(rest)) else: assert False, keyword if fprint is not None: rs = ResponseSet() rs.parse_nmapfp_string("".join(fprint)) group.add_rs(rs) fprint = None return groups parse_groups_file = wrap_filename(parse_groups) def format_nmapclass(nmapclass): result = [] if len(nmapclass) == 0: return "" if nmapclass[0] is not None: result.append(nmapclass[0]) i = 1 while i < len(nmapclass): sep = "|" while i < len(nmapclass) - 1 and nmapclass[i] is None: sep += "|" i += 1 result.append(sep) if nmapclass[i] is not None: result.append(nmapclass[i]) i += 1 return " ".join(result)