Source code for vstruct.defs.dns
import socket
import vstruct
from vstruct.primitives import *
DNS_FLAG_RESPONSE = 0x8000
DNS_FLAG_AUTHORITATIVE = 0x0400
DNS_TYPE_A = 1
DNS_TYPE_CNAME = 5
DNS_CLASS_IN = 1
[docs]class DnsNamePart(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.length = v_uint8()
self.namepart = v_str()
[docs] def pcb_length(self):
size = self.length
if size == 0xc0: size = 1 # FIXME offsets for name...
self.vsGetField('namepart').vsSetLength(size)
[docs] def isNameTerm(self):
if self.length == 0:
return True
if self.length == 0xc0:
return True
return False
[docs]class DnsName(vstruct.VArray):
def __init__(self):
vstruct.VStruct.__init__(self)
[docs] def getFullName(self, dnspkt):
r = []
for fname,fobj in self.vsGetFields():
if fobj.length == 0xc0:
newn = DnsName()
# FIXME redundant parsing...
newn.vsParse(dnspkt.vsEmit(), ord(fobj.namepart))
r.append( newn.getFullName(dnspkt) )
else:
r.append(fobj.namepart)
return '.'.join(r)
[docs] def vsParse(self, bytes, offset=0):
self.vsClearFields()
while offset < len(bytes):
np = DnsNamePart()
offset = np.vsParse(bytes, offset=offset)
self.vsAddElement(np)
if np.isNameTerm():
break
return offset
[docs]class DnsQuery(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.qname = DnsName()
self.qtype = v_uint16(bigend=True)
self.qclass = v_uint16(bigend=True)
[docs]class DnsQueryArray(vstruct.VArray):
def __init__(self, reccnt):
vstruct.VArray.__init__(self)
for i in xrange(reccnt):
self.vsAddElement( DnsQuery() )
[docs]class DnsAnswer(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.qname = DnsName()
self.qtype = v_uint16(bigend=True)
self.qclass = v_uint16(bigend=True)
self.qttl = v_uint32(bigend=True)
self.dlength = v_uint16(bigend=True)
self.qdata = v_bytes()
[docs] def pcb_dlength(self):
size = self.dlength
self.vsGetField('qdata').vsSetLength(size)
[docs]class DnsAnswerArray(vstruct.VArray):
def __init__(self, reccnt):
vstruct.VArray.__init__(self)
for i in xrange(reccnt):
self.vsAddElement( DnsAnswer() )
[docs]class DnsPacket(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
#self.length = v_uint16(bigend=True)
self.transid = v_uint16(bigend=True)
self.flags = v_uint16(bigend=True)
self.ques_cnt = v_uint16(bigend=True)
self.answ_cnt = v_uint16(bigend=True)
self.auth_cnt = v_uint16(bigend=True)
self.addt_cnt = v_uint16(bigend=True)
self.records = vstruct.VStruct()
self.records.queries = DnsQueryArray(0)
self.records.answers = DnsAnswerArray(0)
self.records.authns = DnsAnswerArray(0)
self.records.addtl = DnsAnswerArray(0)
[docs] def pcb_ques_cnt(self):
self.records.queries = DnsQueryArray( self.ques_cnt )
[docs] def pcb_answ_cnt(self):
self.records.answers = DnsAnswerArray( self.answ_cnt )
[docs] def pcb_auth_cnt(self):
self.records.authns = DnsAnswerArray( self.auth_cnt )
[docs] def pcb_addt_cnt(self):
self.records.addtl = DnsAnswerArray( self.addt_cnt )
[docs] def getDnsQueries(self):
ret = []
for fname, q in self.records.queries.vsGetFields():
qname = q.qname.getFullName(self)
ret.append( (q.qtype, q.qclass, q.qname.getFullName( self ) ) )
return ret
[docs] def getDnsAnswers(self):
ret = []
for fname, a in self.records.answers.vsGetFields():
adata = a.qdata
if a.qtype == DNS_TYPE_A:
adata = socket.inet_ntop(socket.AF_INET, adata)
ret.append( (a.qtype, a.qclass, a.qttl, a.qname.getFullName(self), adata) )
return ret