Package envi :: Module codeflow
[hide private]
[frames] | no frames]

Source Code for Module envi.codeflow

  1  ''' 
  2  A module to contain code flow analysis for envi opcode objects... 
  3  ''' 
  4  import copy 
  5   
  6  import envi 
  7  import envi.memory as e_mem 
  8   
9 -class CodeFlowContext(object):
10 11 ''' 12 A CodeFlowContext is used for code-flow (not linear) based disassembly 13 for an envi MemoryObject (which is responsible for knowing the implementation 14 of parseOpcode(). The CodeFlowContext will optionally notify several callback 15 handlers for different events which occur during disassembly: 16 17 self._cb_opcode(va, op, branches) - called for every newly parsed opcode 18 NOTE: _cb_opcode must return the desired branches for continued flow 19 20 self._cb_function(fva, metadict) - called once for every function 21 22 self._cb_branchtable(tabva, destva) - called for switch tables 23 24 '''
25 - def __init__(self, mem):
26 27 self._funcs = {} 28 self._entries = {} 29 self._mem = mem 30 self._opdone = {}
31
32 - def _cb_opcode(self, va, op, branches):
33 ''' 34 Extend CodeFlowContext and implement this method to recieve 35 a callback for every newly discovered opcode. 36 ''' 37 return branches
38
39 - def _cb_function(self, fva, fmeta):
40 ''' 41 Extend CodeFlowContext and implement this method to recieve 42 a callback for every newly discovered function. Additionally, 43 metadata about the function may be stored in the fmeta dict. 44 ''' 45 pass
46
47 - def _cb_branchtable(self, tabva, destva):
48 pass
49
50 - def getCallsFrom(self, fva):
51 return self._funcs.get(fva)
52
53 - def addFunctionDef(self, fva, calls_from):
54 ''' 55 Add a priori knowledge of a function to the code flow 56 stuff... 57 ''' 58 59 if self._funcs.get(fva) != None: 60 return 61 62 self._funcs[fva] = calls_from
63
64 - def addCodeFlow(self, va, persist=False, exptable=True):
65 ''' 66 Do code flow disassembly from the specified address. Returnes a list 67 of the procedural branch targets discovered during code flow... 68 69 Set persist=True to store 'opdone' and never disassemble the same thing twice 70 Set exptable=True to expand branch tables in this phase 71 ''' 72 73 opdone = self._opdone 74 if not persist: 75 opdone = {} 76 77 calls_from = {} 78 optodo = [va, ] 79 80 while len(optodo): 81 82 va = optodo.pop() 83 84 if opdone.get(va): 85 continue 86 87 opdone[va] = True 88 89 try: 90 op = self._mem.parseOpcode(va) 91 except Exception, e: 92 print 'parseOpcodeError at 0x%.8x: %s' % (va,e) 93 # FIXME code block breakage... 94 continue 95 96 branches = op.getBranches() 97 # The opcode callback may filter branches... 98 branches = self._cb_opcode(va, op, branches) 99 100 while len(branches): 101 102 bva, bflags = branches.pop() 103 104 # Don't worry about unresolved branches now... 105 if bva == None: 106 continue 107 108 # Handle a table branch by adding more branches... 109 if bflags & envi.BR_TABLE: 110 if exptable: 111 ptrbase = bva 112 bdest = self._mem.readMemoryFormat(ptrbase, '<P')[0] 113 tabdone = {} 114 while self._mem.isValidPointer(bdest): 115 116 self._cb_branchtable(ptrbase, bdest) 117 118 if not tabdone.get(bdest): 119 tabdone[bdest] = True 120 branches.append((bdest, envi.BR_COND)) 121 122 ptrbase += self._mem.psize 123 bdest = self._mem.readMemoryFormat(ptrbase, '<P')[0] 124 125 continue 126 127 # FIXME handle conditionals here for block boundary detection! 128 129 if bflags & envi.BR_DEREF: 130 131 if not self._mem.probeMemory(bva, 1, e_mem.MM_READ): 132 continue 133 134 bva = self._mem.readMemoryFormat(bva, '<P')[0] 135 136 if not self._mem.probeMemory(bva, 1, e_mem.MM_EXEC): 137 continue 138 139 if bflags & envi.BR_PROC: 140 141 # Record that the current code flow has a call from it 142 # to the branch target... 143 144 # FIXME intel hack, call 0, pop reg for geteip... 145 if bva != va + len(op): 146 calls_from[bva] = True 147 # We only go up to procedural branches, not across 148 continue 149 150 if not opdone.get(bva): 151 optodo.append(bva) 152 153 return calls_from.keys()
154
155 - def _handleFunc(self, va, pth):
156 157 path = [] 158 159 # Check if this is already a known function. 160 if self._funcs.get(va) != None: 161 return 162 163 self._funcs[va] = True 164 165 # Check if we have disassembled a loop 166 if va in pth: 167 return 168 169 pth.append(va) 170 171 calls_from = self.addCodeFlow(va) 172 173 for callto in calls_from: 174 self._handleFunc(callto, pth=pth) 175 176 pth.pop() 177 178 # Finally, notify the callback of a new function 179 self._cb_function(va, {'CallsFrom':calls_from})
180
181 - def addEntryPoint(self, va):
182 ''' 183 Analyze the given procedure entry point and flow downward 184 to find all subsequent code blocks and procedure edges. 185 ''' 186 self._entries[va] = True 187 return self._handleFunc(va, [])
188