Source code for vtrace.tools.win32heap

# Heap Flags
HEAP_NO_SERIALIZE               = 0x00000001
HEAP_GROWABLE                   = 0x00000002
HEAP_GENERATE_EXCEPTIONS        = 0x00000004
HEAP_ZERO_MEMORY                = 0x00000008
HEAP_REALLOC_IN_PLACE_ONLY      = 0x00000010
HEAP_TAIL_CHECKING_ENABLED      = 0x00000020
HEAP_FREE_CHECKING_ENABLED      = 0x00000040
HEAP_DISABLE_COALESCE_ON_FREE   = 0x00000080
HEAP_CREATE_ALIGN_16            = 0x00010000
HEAP_CREATE_ENABLE_TRACING      = 0x00020000
HEAP_CREATE_ENABLE_EXECUTE      = 0x00040000

heap_flag_names = {
HEAP_NO_SERIALIZE:"HEAP_NO_SERIALIZE",
HEAP_GROWABLE:"HEAP_GROWABLE",
HEAP_GENERATE_EXCEPTIONS:"HEAP_GENERATE_EXCEPTIONS",
HEAP_ZERO_MEMORY:"HEAP_ZERO_MEMORY",
HEAP_REALLOC_IN_PLACE_ONLY:"HEAP_REALLOC_IN_PLACE_ONLY",
HEAP_TAIL_CHECKING_ENABLED:"HEAP_TAIL_CHECKING_ENABLED",
HEAP_FREE_CHECKING_ENABLED:"HEAP_FREE_CHECKING_ENABLED",
HEAP_DISABLE_COALESCE_ON_FREE:"HEAP_DISABLE_COALESCE_ON_FREE",
HEAP_CREATE_ALIGN_16:"HEAP_CREATE_ALIGN_16",
HEAP_CREATE_ENABLE_TRACING:"HEAP_CREATE_ENABLE_TRACING",
HEAP_CREATE_ENABLE_EXECUTE:"HEAP_CREATE_ENABLE_EXECUTE",
}

# Heap Chunk Flags
HEAP_ENTRY_BUSY             = 0x01
HEAP_ENTRY_EXTRA_PRESENT    = 0x02
HEAP_ENTRY_FILL_PATTERN     = 0x04
HEAP_ENTRY_VIRTUAL_ALLOC    = 0x08
HEAP_ENTRY_LAST_ENTRY       = 0x10
HEAP_ENTRY_SETTABLE_FLAG1   = 0x20
HEAP_ENTRY_SETTABLE_FLAG2   = 0x40
HEAP_ENTRY_SETTABLE_FLAG3   = 0x80



def reprHeapFlags(flags):
[docs] ret = [] if flags & HEAP_ENTRY_BUSY: ret.append("BUSY") if flags & HEAP_ENTRY_FILL_PATTERN: ret.append("FILL") if flags & HEAP_ENTRY_LAST_ENTRY: ret.append("LAST") if len(ret): return "|".join(ret) return "NONE" class HeapCorruptionException(Exception):
[docs] def __init__(self, heap, segment, prevchunk, badchunk): prevaddr = 0 badaddr = 0 if prevchunk != None: prevaddr = prevchunk.address if badchunk != None: badaddr = badchunk.address Exception.__init__(self, "Prev Chunk: 0x%.8x Bad Chunk: 0x%.8x" % (prevaddr, badaddr)) self.heap = heap self.segment = segment self.prevchunk = prevchunk self.badchunk = badchunk class FreeListCorruption(Exception):
[docs] def __init__(self, heap, index, prevchunk, badchunk): prevaddr = 0 badaddr = 0 if prevchunk != None: prevaddr = prevchunk.address if badchunk != None: badaddr = badchunk.address Exception.__init__(self, "Index: %d Prev Chunk: 0x%.8x Bad Chunk: 0x%.8x" % (index, prevaddr, badaddr)) self.heap = heap self.index = index self.prevchunk = prevchunk self.badchunk = badchunk class ChunkNotFound(Exception):
[docs] pass def getHeapSegChunk(trace, address):
[docs] """ Find and return the heap, segment, and chunk for the given addres (or exception). """ for heap in getHeaps(trace): for seg in heap.getSegments(): segstart = seg.address segend = seg.getSegmentEnd() if address < segstart or address > segend: continue for chunk in seg.getChunks(): a = chunk.address b = chunk.address + len(chunk) if address >= a and address < b: return heap,seg,chunk raise ChunkNotFound("No Chunk Found for 0x%.8x" % address) def getHeaps(trace):
[docs] """ Get the win32 heaps (returns a list of Win32Heap objects) """ ret = [] pebaddr = trace.getMeta("PEB") peb = trace.getStruct("ntdll.PEB", pebaddr) heapcount = int(peb.NumberOfHeaps) hlist = trace.readMemoryFormat(long(peb.ProcessHeaps), "<"+('P'*heapcount)) for haddr in hlist: ret.append(Win32Heap(trace, haddr)) return ret class Win32Heap:
[docs] def __init__(self, trace, address): self.address = address self.trace = trace self.heap = trace.getStruct("ntdll.HEAP", address) self._win7_heap = False self.heapenc = None if self.heap.vsHasField('Encoding'): self.heapenc = self.heap.Encoding self._win7_heap = True self.seglist = None self.ucrdict = None def hasLookAside(self):
[docs] """ Does this heap have a lookaside? """ if not self.heap.Flags & HEAP_GROWABLE: return False if self.heap.Flags & HEAP_NO_SERIALIZE: return False return True def getUCRDict(self):
[docs] ''' Retrieve a dictionary of <ucr_address>:<ucr_size> items. (If this windows version doesn't support UCRs, the dict will be empty) ''' if self.ucrdict == None: self.ucrdict = {} if self.heap.vsHasField('UCRList'): listhead_va = self.address + self.heap.vsGetOffset('UCRList') for lva in self._getListEntries(listhead_va): ucrent = self.trace.getStruct('ntdll.HEAP_UCR_DESCRIPTOR', lva) if ucrent.Size != 0: self.ucrdict[ucrent.Address] = ucrent.Size return self.ucrdict def _win7ParseSegments(self):
# Address doesn't matter for the below call heapseg = self.trace.getStruct('ntdll.HEAP_SEGMENT', self.address) listhead = self.address + self.heap.vsGetOffset('SegmentList') # Negative offset from segment list entry to segment segoff = heapseg.vsGetOffset('SegmentListEntry') entry = self.heap.SegmentList.Flink while entry != listhead: seg = Win32Segment(self.trace, self, entry - segoff) self.seglist.append(seg) entry = self.trace.readMemoryFormat(entry, '<P')[0] def getSegments(self):
[docs] """ Return a list of Win32Segment objects. """ if self.seglist == None: self.seglist = [] # Windows 7 style heap segment list if self.heap.vsHasField('SegmentList'): self._win7ParseSegments() else: for i in range(long(self.heap.LastSegmentIndex)+1): sa = self.heap.Segments[i] self.seglist.append(Win32Segment(self.trace, self, long(sa))) return self.seglist def getLookAsideLists(self):
[docs] """ Return a list of the lookaside list for this heap """ if not self.hasLookAside(): raise Exception("Heap at 0x%.8x has no lookaside!" % (self.address)) # Look aside lists are 128 pointer slots in a chunk pointed # to by the FrontEndHeap field in the heap structure. #fetype = self.heap.FrontEndHeapType laside = self.heap.FrontEndHeap ret = [] for i in range(128): slot = laside + (i * 0x30) bucket = [] base = self.trace.readMemoryFormat(slot, "<I")[0] while base != 0: chunk = Win32Chunk(self.trace, self, base-8) bucket.append(chunk) base,blink = chunk.getFlinkBlink() ret.append(bucket) return ret def _getListEntries(self, addr, listhead=True):
ret = [] if not listhead: ret.append(addr) le = self.trace.getStruct('ntdll.LIST_ENTRY', addr) while le.Flink != addr: ret.append(le.Flink) le = self.trace.getStruct('ntdll.LIST_ENTRY', le.Flink) return ret def _win7FreeLists(self): #print 'Windows 7 Free List Parsing Fixme!' return [] def getFreeLists(self):
[docs] """ Return a list of the free lists in this heap. (Not including look-aside) """ ret = [] foff = self.heap.vsGetOffset("FreeLists") if self._win7_heap: return self._win7FreeLists() for i in range(128): le = self.heap.FreeLists[i] bucket = [] base = self.address + foff + (i*8) addr = le.Flink while addr != base: # If we die here, the guy before us was dorked. try: chunk = Win32Chunk(self.trace, self, addr-8) except Exception, e: chunk = bucket[-1] pchunk = None if len(bucket) >= 2: pchunk = bucket[-2] raise FreeListCorruption(self, i, pchunk, chunk) # Check our back pointer flink,blink = chunk.getFlinkBlink() if len(bucket): pchunk = bucket[-1] if blink != pchunk.getDataAddress(): raise FreeListCorruption(self, i, pchunk, chunk) addr = flink bucket.append(chunk) ret.append(bucket) return ret def getFlagNames(self):
[docs] ret = [] for k,v in heap_flag_names.items(): if self.heap.Flags & k: ret.append(v) return ret def __repr__(self):
fnames = "|".join(self.getFlagNames()) return "heap: 0x%.8x flags:%s" % (self.address, fnames) class Win32Segment:
[docs] def __init__(self, trace, heap, address): self.trace = trace self.heap = heap self.address = address self.seg = trace.getStruct("ntdll.HEAP_SEGMENT", address) #FIXME segments can specify chunk Size granularity self.chunks = None self.segend = self.address + (self.seg.NumberOfPages * 4096) def getSegmentEnd(self):
[docs] return self.segend def getChunks(self):
[docs] if self.chunks == None: self.chunks = [] addr = self.address lastchunk = None ucrdict = self.heap.getUCRDict() while True: # Skip any uncommited ranges... usize = ucrdict.get(addr) if usize != None: lastchunk = None addr += usize continue # Since an un-commited range may put us past the # lastblock (segend) we must double check... if addr >= self.segend: break chunk = Win32Chunk(self.trace, self.heap, addr) self.chunks.append(chunk) #if lastchunk != None: #if lastchunk.chunk.Size != chunk.chunk.PreviousSize: #print 'last size:',lastchunk.chunk.Size,'prev',chunk.chunk.PreviousSize #raise HeapCorruptionException(self.heap, self, lastchunk, chunk) if chunk.isLast(): break lastchunk = chunk addr += len(chunk) return self.chunks def getLastChunk(self):
[docs] va = self.seg.LastEntryInSegment return Win32Chunk(self.trace, self.heap, va) class Win32Chunk:
[docs] def __init__(self, trace, heap, address): self.trace = trace self.heap = heap self.address = address self.chunk = trace.getStruct("ntdll.HEAP_ENTRY", address) # Decode the heap chunk if needed... if self.heap.heapenc: self.chunk ^= self.heap.heapenc def __repr__(self): return "HeapChunk: 0x%.8x (%d) %s" % (self.address, len(self),self.reprFlags()) def __len__(self): return int(self.chunk.Size) * 8 def isLast(self):
[docs] return bool(int(self.chunk.Flags) & HEAP_ENTRY_LAST_ENTRY) def isBusy(self):
[docs] return bool(int(self.chunk.Flags) & HEAP_ENTRY_BUSY) def getDataAddress(self):
[docs] return self.address + len(self.chunk) def getDataSize(self):
[docs] return len(self) - len(self.chunk) def getDataBytes(self, maxsize=None):
[docs] size = self.getDataSize() if maxsize != None: size = min(size, maxsize) return self.trace.readMemory(self.getDataAddress(), size) def getFlinkBlink(self):
[docs] return reprHeapFlags(int(self.chunk.Flags))