Source code for envi.memory

import re
import struct

import envi

"""
A module containing memory utilities and the definition of the
memory access API used by all vtoys trace/emulators/workspaces.
"""

# Memory Map Permission Flags
MM_NONE = 0x0
MM_READ = 0x4
MM_WRITE = 0x2
MM_EXEC = 0x1
MM_SHARED = 0x08

MM_READ_WRITE = MM_READ | MM_WRITE
MM_READ_EXEC  =  MM_READ | MM_EXEC
MM_RWX = MM_READ | MM_WRITE | MM_EXEC

pnames = ['No Access', 'Execute', 'Write', None, 'Read']
[docs]def getPermName(perm): ''' Return the human readable name for a *single* memory perm enumeration value. ''' return pnames[perm]
[docs]def reprPerms(mask): plist = ['-','-','-','-'] if mask & MM_SHARED: plist[0] = 's' if mask & MM_READ: plist[1] = 'r' if mask & MM_WRITE: plist[2] = 'w' if mask & MM_EXEC: plist[3] = 'x' return "".join(plist)
[docs]def parsePerms(pstr): ret = 0 if pstr.find('s') != -1: ret |= MM_SHARED if pstr.find('r') != -1: ret |= MM_READ if pstr.find('w') != -1: ret |= MM_WRITE if pstr.find('x') != -1: ret |= MM_EXEC return ret
[docs]class IMemory: """ This is the interface spec (and a few helper utils) for the unified memory object interface. NOTE: If your actual underlying memory format is such that over-riding anything (like isValidPointer!) can be faster than the default implementation, DO IT! """ def __init__(self, archmod=None): self.imem_psize = struct.calcsize("P") self.imem_arch = archmod # If the specified an arch module, use that! if archmod != None: self.imem_psize = archmod.getPointerSize()
[docs] def getPointerSize(self): return self.imem_arch.getPointerSize()
[docs] def readMemory(self, va, size): """ Read memory from the specified virtual address for size bytes and return it as a python string. Example: mem.readMemory(0x41414141, 20) -> "A..." """ raise Exception("must implement readMemory!")
[docs] def writeMemory(self, va, bytes): """ Write the given bytes to the specified virtual address. Example: mem.writeMemory(0x41414141, "VISI") """ raise Exception("must implement writeMemory!")
[docs] def protectMemory(self, va, size, perms): """ Change the protections for the given memory map. On most platforms the va/size *must* exactly match an existing memory map. """ raise Exception("must implement protectMemory!")
[docs] def probeMemory(self, va, size, perm): """ Check to be sure that the given virtual address and size is contained within one memory map, and check that the perms are contained within the permission bits for the memory map. (MM_READ | MM_WRITE | MM_EXEC | ...) Example probeMemory(0x41414141, 20, envi.memory.MM_WRITE) (check if the memory for 20 bytes at 0x41414141 is writable) """ map = self.getMemoryMap(va) if map == None: return False mapva, mapsize, mapperm, mapfile = map mapend = mapva+mapsize if va+size >= mapend: return False if mapperm & perm != perm: return False return True
[docs] def allocateMemory(self, size, perms=MM_RWX, suggestaddr=0): raise Exception("must implement allocateMemory!")
[docs] def addMemoryMap(self, mapva, perms, fname, bytes): raise Exception("must implement addMemoryMap!")
[docs] def getMemoryMaps(self): raise Exception("must implement getMemoryMaps!") # Mostly helpers from here down...
[docs] def readMemoryFormat(self, va, fmt): # Somehow, pointers are "signed" when they # get chopped up by python's struct package if self.imem_psize == 4: fmt = fmt.replace("P","I") elif self.imem_psize == 8: fmt = fmt.replace("P","Q") size = struct.calcsize(fmt) bytes = self.readMemory(va, size) return struct.unpack(fmt, bytes)
[docs] def getSegmentInfo(self, id): return (0,0xffffffff)
[docs] def readMemValue(self, addr, size): bytes = self.readMemory(addr, size) if bytes == None: return None #FIXME change this (and all uses of it) to passing in format... if len(bytes) != size: raise Exception("Read Gave Wrong Length At 0x%.8x (va: 0x%.8x wanted %d got %d)" % (self.getProgramCounter(),addr, size, len(bytes))) if size == 1: return struct.unpack("B", bytes)[0] elif size == 2: return struct.unpack("<H", bytes)[0] elif size == 4: return struct.unpack("<I", bytes)[0] elif size == 8: return struct.unpack("<Q", bytes)[0]
[docs] def writeMemoryFormat(self, va, fmt, *args): ''' Write a python format sequence of variables out to memory after serializing using struct pack... Example: trace.writeMemoryFormat(va, '<PBB', 10, 30, 99) ''' if self.imem_psize == 4: fmt = fmt.replace("P","I") elif self.imem_psize == 8: fmt = fmt.replace("P","Q") mbytes = struct.pack(fmt, *args) self.writeMemory(va, mbytes)
[docs] def getMemoryMap(self, va): """ Return a tuple of mapva,size,perms,filename for the memory map which contains the specified address (or None). """ for mapva,size,perms,mname in self.getMemoryMaps(): if va >= mapva and va < (mapva+size): return (mapva,size,perms,mname) return None
[docs] def isValidPointer(self, va): return self.getMemoryMap(va) != None
[docs] def isReadable(self, va): maptup = self.getMemoryMap(va) if maptup == None: return False return bool(maptup[2] & MM_READ)
[docs] def isWriteable(self, va): maptup = self.getMemoryMap(va) if maptup == None: return False return bool(maptup[2] & MM_WRITE)
[docs] def isExecutable(self, va): maptup = self.getMemoryMap(va) if maptup == None: return False return bool(maptup[2] & MM_EXEC)
[docs] def isShared(self, va): maptup = self.getMemoryMap(va) if maptup == None: return False return bool(maptup[2] & MM_SHARED)
[docs] def searchMemory(self, needle, regex=False): """ A quick cheater way to searchMemoryRange() for each of the current memory maps. """ results = [] for va,size,perm,fname in self.getMemoryMaps(): try: results.extend(self.searchMemoryRange(needle, va, size, regex=regex)) except: pass # Some platforms dont let debuggers read non-readable mem return results
[docs] def searchMemoryRange(self, needle, address, size, regex=False): """ Search the specified memory range (address -> size) for the string needle. Return a list of addresses where the match occurs. """ results = [] memory = self.readMemory(address, size) if regex: for match in re.finditer(needle, memory): off = match.start() results.append(address+off) else: offset = 0 while offset < size: loc = memory.find(needle, offset) if loc == -1: # No more to be found ;) break results.append(address+loc) offset = loc+len(needle) # Skip one past our matcher return results
[docs] def parseOpcode(self, va): ''' Parse an opcode from the specified virtual address. Example: op = m.parseOpcode(0x7c773803) ''' if self.imem_arch == None: raise Exception('IMemory got no architecture module (%s)' % (self.__class__.__name__)) b = self.readMemory(va, 16) return self.imem_arch.makeOpcode(b, 0, va)
[docs] def findOpcode(self,opcode=None,loc=None,size=1000,verbose=False): ''' Starts at the current EIP (self.getProgramCounter()), and searches each instruction for the instruction requested. Returns the address of the first found opcode, than returns. Example: # Find the first 'popad' opcode In [1]: popad_loc = trace.findOpcode(opcode="popad",verbose=True) [+] 0x1020cd0L pushad [+] 0x1020cd1L mov esi,0x0101a000 [+] 0x1020cd6L lea edi,dword [esi - 102400] [+] 0x1020cdcL push edi [+] 0x1020cddL or ebp,0xffffffff --SNIP-- [+] 0x1020e49L push edi [+] 0x1020e4aL call ebp [+] 0x1020e4cL pop eax [+] 0x1020e4dL popad Location - 0x01020E4D # Using popad's location, search for the next 'jmp' In [2]: jmp = trace.findOpcode(loc=popad_loc.va, opcode="jmp", verbose=True) [+] 0x1020e4dL popad [+] 0x1020e4eL lea eax,dword [esp - 128] [+] 0x1020e52L push 0 [+] 0x1020e54L cmp esp,eax [+] 0x1020e56L jnz 0x01020e52 [+] 0x1020e58L sub esp,0xffffff80 [+] 0x1020e5bL jmp 0x01012475 Location - 0x01020E5B # Or you can do it simply #TODO Extend to take in expressions. ''' # If location is specified use that, otherwise use EIP if loc: offset = loc else: offset = self.getProgramCounter() if not opcode: print "You didn't give me an opcode dude!" return False while offset < (offset + size): cur_opcode = self.parseOpcode(offset) if verbose: print '[+] ' + hex(cur_opcode.va) + ' ' + repr(cur_opcode) if cur_opcode.mnem == opcode: if verbose: print "Location - 0x" + ("%0.8x".upper() % cur_opcode.va) return cur_opcode offset += cur_opcode.size
[docs]class MemoryObject(IMemory): def __init__(self, archmod=None): """ Take a set of memory maps (va, perms, fname, bytes) and put them in a sparse space finder. You may specify your own page-size to optimize the search for an architecture. """ IMemory.__init__(self, archmod=archmod) self._map_defs = [] #FIXME MemoryObject: def allocateMemory(self, size, perms=MM_RWX, suggestaddr=0):
[docs] def addMemoryMap(self, va, perms, fname, bytes): ''' Add a memory map to this object... ''' msize = len(bytes) map = (va, msize, perms, fname) hlpr = [va, va+msize, map, bytes] self._map_defs.append(hlpr) return
[docs] def getMemorySnap(self): ''' Take a memory snapshot which may be restored later. Example: snap = mem.getMemorySnap() ''' return [ list(mdef) for mdef in self._map_defs ]
[docs] def setMemorySnap(self, snap): ''' Restore a previously saved memory snapshot. Example: mem.setMemorySnap(snap) ''' self._map_defs = [list(md) for md in snap]
[docs] def getMemoryMap(self, va): """ Get the va,size,perms,fname tuple for this memory map """ for mva, mmaxva, mmap, mbytes in self._map_defs: if va >= mva and va < mmaxva: return mmap return None
[docs] def getMemoryMaps(self): return [ mmap for mva, mmaxva, mmap, mbytes in self._map_defs ]
[docs] def readMemory(self, va, size): for mva, mmaxva, mmap, mbytes in self._map_defs: if va >= mva and va < mmaxva: mva, msize, mperms, mfname = mmap if not mperms & MM_READ: raise envi.SegmentationViolation(va) offset = va - mva return mbytes[offset:offset+size] raise envi.SegmentationViolation(va)
[docs] def writeMemory(self, va, bytes): for mapdef in self._map_defs: mva, mmaxva, mmap, mbytes = mapdef if va >= mva and va < mmaxva: mva, msize, mperms, mfname = mmap if not mperms & MM_WRITE: raise envi.SegmentationViolation(va) offset = va - mva mapdef[3] = mbytes[:offset] + bytes + mbytes[offset+len(bytes):] return raise envi.SegmentationViolation(va)
[docs] def getByteDef(self, va): """ An optimized routine which returns the existing segment bytes sequence without creating a new string object *AND* an offset of va into the buffer. Used internally for optimized memory handling. Returns (offset, bytes) """ for mapdef in self._map_defs: mva, mmaxva, mmap, mbytes = mapdef if va >= mva and va < mmaxva: offset = va - mva return (offset, mbytes) raise envi.SegmentationViolation(va)
[docs]class MemoryFile: ''' A file like object to wrap around a memory object. ''' def __init__(self, memobj, baseaddr): self.baseaddr = baseaddr self.offset = baseaddr self.memobj = memobj
[docs] def seek(self, offset): self.offset = self.baseaddr + offset
[docs] def read(self, size): ret = self.memobj.readMemory(self.offset, size) self.offset += size return ret
[docs] def write(self, bytes): self.memobj.writeMemory(self.offset, bytes) self.offset += len(bytes)
[docs]def memdiff(bytes1, bytes2): ''' Return a list of (offset, size) tuples showing any memory differences between the given bytes. ''' size = len(bytes1) if size != len(bytes2): raise Exception('memdiff *requires* same size bytes') ret = [] offset = 0 while offset < size: if bytes1[offset] != bytes2[offset]: beginoff = offset # Gather up all the difference bytes. while ( offset < size and bytes1[offset] != bytes2[offset]): offset += 1 ret.append((beginoff, offset-beginoff)) offset += 1 return ret