X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=ldd%2Frec.py;fp=ldd%2Frec.py;h=4145343d824c75021da63dddb11db6372b8e5170;hb=769e7ed964e3720cf25825dd5390af5fb0bf4851;hp=0000000000000000000000000000000000000000;hpb=2e783944bffb349dff8667dab0ba0c48b21c9504;p=ldd.git diff --git a/ldd/rec.py b/ldd/rec.py new file mode 100644 index 0000000..4145343 --- /dev/null +++ b/ldd/rec.py @@ -0,0 +1,342 @@ +# ldd - DNS implementation in Python +# Copyright (C) 2006 Fredrik Tolf +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import socket +import struct + +import proto +import dn + +rtypes = [] + +def addrtype(id, name, syntax): + rtypes.append((id, name, syntax)) + +def rtypebyid(id): + for rtype in rtypes: + if rtype[0] == id: + return rtype + return None + +def rtypebyname(name): + for rtype in rtypes: + if rtype[1] == name.upper(): + return rtype[0] + return None + +class error(Exception): + def __init__(self, text): + self.text = text + + def __str__(self): + return self.text + +class malformedrr(Exception): + def __init__(self, text): + self.text = text + + def __str__(self): + return self.text + +class rrhead: + def __init__(self, name = None, rtype = None, rclass = None): + if rclass is None: rclass = CLASSIN + if type(name) == str: + self.name = dn.fromstring(name) + else: + self.name = name + if type(rtype) == str: + self.rtype = rtypebyname(rtype) + if self.rtype is None: + raise error("no such rtype " + rtype) + else: + self.rtype = rtype + self.rclass = rclass + + def encode(self, names, offset): + ret, names = proto.encodename(self.name, names, offset) + ret += struct.pack(">HH", self.rtype, self.rclass) + return ret, names + + def __eq__(self, other): + return self.name == other.name and self.rtype == other.rtype + + def __str__(self): + rtype = rtypebyid(self.rtype) + if rtype is None: + return "%02x RRhead %s" % (self.rtype, self.name) + else: + return "%s RRhead %s" % (rtype[1], self.name) + + def istype(self, rtype): + if type(rtype) == str: + rtype = rtypebyname(rtype) + return self.rtype == rtype + + def decode(self, packet, offset): + name, offset = proto.decodename(packet, offset) + rtype, rclass = struct.unpack(">HH", packet[offset:offset + struct.calcsize(">HH")]) + offset += struct.calcsize(">HH") + ret = rrhead(name, rtype, rclass) + return ret, offset + decode = classmethod(decode) + +class rrdata: + def __init__(self, rtype, *args): + if type(rtype) == tuple and type(args[0]) == dict: + self.rtype = rtype + self.rdata = args[0] + return + + if type(rtype) == str: + self.rtype = rtypebyname(rtype) + if self.rtype is None: + raise error("no such rtype " + rtype) + else: + self.rtype = rtype + rtid = self.rtype + self.rtype = rtypebyid(rtid) + if self.rtype is None: + raise error("no such rtype " + rtid) + self.rdata = {} + for i, e in enumerate(self.rtype[2]): + d = self.convdata(e[0], args[i]) + self.rdata[e[1]] = d + + def __eq__(self, other): + return(self.rdata == other.rdata) + + def __str__(self): + ret = "{" + first = True + for e in self.rtype[2]: + if not first: + ret += ", " + first = False + ret += e[1] + ": " + d = self.rdata[e[1]] + if e[0] == "4": + ret += socket.inet_ntop(socket.AF_INET, d) + elif e[0] == "6": + ret += socket.inet_ntop(socket.AF_INET6, d) + elif e[0] == "s": + ret += '"' + d + '"' + else: + ret += str(d) + ret += "}" + return ret + + def istype(self, rtype): + if type(rtype) == str: + rtype = rtypebyname(rtype) + return self.rtype[0] == rtype + + def convdata(self, dtype, data): + if dtype == "4": + if type(data) != str: + raise error("IPv4 address must be a string") + if len(data) == 4: + d = data + else: + d = socket.inet_pton(socket.AF_INET, data) + if dtype == "6": + if type(data) != str: + raise error("IPv6 address must be a string") + if len(data) == 16 and data.find(":") == -1: + d = data + else: + d = socket.inet_pton(socket.AF_INET6, data) + if dtype == "d": + if type(data) == str: + d = dn.fromstring(data) + elif isinstance(data, dn.domainname): + d = data + else: + raise error("Domain name must be either proper or string") + if dtype == "s": + d = str(data) + if dtype == "i": + d = int(data) + return d + + def __iter__(self): + return iter(self.rdata) + + def __getitem__(self, i): + return self.rdata[i] + + def __setitem__(self, i, v): + for e in self.rtype[2]: + if e[1] == i: + break + else: + raise error("No such data for " + self.rtype[1] + " record: " + str(i)) + self.rdata[i] = self.convdata(e[0], v) + + def encode(self, names, offset): + ret = "" + for e in self.rtype[2]: + d = self.rdata[e[1]] + if e[2] == "strc": + ret += d + offset += len(d) + if e[2] == "cmdn": + buf, names = proto.encodename(d, names, offset) + ret += buf + offset += len(buf) + if e[2] == "lstr": + ret += chr(len(d)) + d + offset += 1 + len(d) + if e[2] == "llstr": + ret += struct.pack(">H", len(d)) + d + offset += struct.calcsize(">H") + len(d) + if e[2] == "short": + ret += struct.pack(">H", d) + offset += struct.calcsize(">H") + if e[2] == "long": + ret += struct.pack(">L", d) + offset += struct.calcsize(">L") + if e[2] == "int6": + ret += struct.pack(">Q", d)[-6:] + offset += 6 + return ret, names + + def decode(self, rtid, packet, offset, dlen): + rtype = rtypebyid(rtid) + origoff = offset + rdata = {} + if rtype is None: + rtype = (rtid, "Unknown", [("s", "unknown", "strc", dlen)]) + for e in rtype[2]: + if e[2] == "strc": + d = packet[offset:offset + e[3]] + offset += e[3] + if e[2] == "cmdn": + d, offset = proto.decodename(packet, offset) + if e[2] == "lstr": + dl = ord(packet[offset]) + offset += 1 + d = packet[offset:offset + dl] + offset += dl + if e[2] == "llstr": + (dl,) = struct.unpack(">H", packet[offset:offset + struct.calcsize(">H")]) + offset += struct.calcsize(">H") + d = packet[offset:offset + dl] + offset += dl + if e[2] == "short": + (d,) = struct.unpack(">H", packet[offset:offset + struct.calcsize(">H")]) + offset += struct.calcsize(">H") + if e[2] == "long": + (d,) = struct.unpack(">L", packet[offset:offset + struct.calcsize(">L")]) + offset += struct.calcsize(">L") + if e[2] == "int6": + (d,) = struct.unpack(">Q", ("\0" * (struct.calcsize(">Q") - 6)) + packet[offset:offset + 6]) + offset += 6 + rdata[e[1]] = d + if origoff + dlen != offset: + raise malformedrr(rtype[1] + " RR data length mismatch") + return rrdata(rtype, rdata) + decode = classmethod(decode) + +class rr: + def __init__(self, head, ttl, data): + if type(head) == tuple: + self.head = rrhead(*head) + else: + self.head = head + self.ttl = ttl + self.data = data + self.flags = set() + + def setflags(self, flags): + self.flags |= set(flags) + + def clrflags(self, flags): + self.flags -= set(flags) + + def encode(self, names, offset): + ret, names = self.head.encode(names, offset) + if self.data is None: + data = "" + else: + data, names = self.data.encode(names, offset + len(ret) + struct.calcsize(">LH")) + ret += struct.pack(">LH", self.ttl, len(data)) + ret += data + return ret, names + + def __eq__(self, other): + return self.head == other.head and self.ttl == other.ttl and self.data == other.data + + def __str__(self): + rtype = rtypebyid(self.head.rtype) + if rtype is None: + ret = "%02x" % self.head.rtype + else: + ret = rtype[1] + ret += " RR %s, TTL=%i: %s" % (self.head.name, self.ttl, self.data) + if len(self.flags) > 0: + ret += " (Flags:" + for f in self.flags: + ret += " " + f + ret += ")" + return ret + + def decode(self, packet, offset): + head, offset = rrhead.decode(packet, offset) + ttl, dlen = struct.unpack(">LH", packet[offset:offset + struct.calcsize(">LH")]) + offset += struct.calcsize(">LH") + if dlen == 0: + data = None + else: + data = rrdata.decode(head.rtype, packet, offset, dlen) + offset += dlen + return rr(head, ttl, data), offset + decode = classmethod(decode) + +addrtype(0x01, "A", [("4", "address", "strc", 4)]) +addrtype(0x02, "NS", [("d", "nsname", "cmdn")]) +addrtype(0x05, "CNAME", [("d", "priname", "cmdn")]) +addrtype(0x06, "SOA", [("d", "priserv", "cmdn"), + ("d", "mailbox", "cmdn"), + ("i", "serial", "long"), + ("i", "refresh", "long"), + ("i", "retry", "long"), + ("i", "expire", "long"), + ("i", "minttl", "long")]) +addrtype(0x0c, "PTR", [("d", "target", "cmdn")]) +addrtype(0x0f, "MX", [("i", "prio", "short"), + ("d", "target", "cmdn")]) +addrtype(0x10, "TXT", [("s", "rrtext", "lstr")]) +addrtype(0x1c, "AAAA", [("6", "address", "strc", 16)]) +addrtype(0x21, "SRV", [("i", "prio", "short"), + ("i", "weight", "short"), + ("i", "port", "short"), + ("d", "target", "cmdn")]) +addrtype(0xfa, "TSIG", [("d", "algo", "cmdn"), + ("i", "stime", "int6"), + ("i", "fudge", "short"), + ("s", "mac", "llstr"), + ("i", "orgid", "short"), + ("i", "err", "short"), + ("s", "other", "llstr")]) + +CLASSIN = 1 +CLASSCS = 2 +CLASSCH = 3 +CLASSHS = 4 +CLASSNONE = 254 +CLASSANY = 255