'''
The stalker subsystem is a breakpoint based coverage tool
'''
import vtrace
import envi
import envi.memory as e_mem
import envi.codeflow as e_codeflow
[docs]class StalkerCodeFlow(e_codeflow.CodeFlowContext):
def __init__(self, trace):
e_codeflow.CodeFlowContext.__init__(self, trace)
self.trace = trace
self.setupBreakLists(None)
[docs] def setupBreakLists(self, mmap):
self.mmap = mmap
self.bplist = [] # Block Breaks
self.sbreaks = [] # Stalker Breaks
self.scbreaks = [] # Callbreaks
def _cb_opcode(self, va, op, branches):
ret = []
for br,bflags in branches:
if bflags & envi.BR_DEREF and br != None:
bflags &= ~envi.BR_DEREF # Mask it back out...
if not self.trace.probeMemory(br, 1, e_mem.MM_READ):
continue
br = self.trace.readMemoryFormat(br, '<P')[0]
# Skip branches to other maps...
if br != None and self.trace.getMemoryMap(br) != self.mmap:
continue
ret.append( (br, bflags) )
# Procedural branches to regs etc must be marked
# Otherwise, add another breakpoint like us
if bflags & envi.BR_PROC:
if br == None:
self.scbreaks.append(op.va)
else:
self.sbreaks.append(br)
continue
if br == None:
#print 'Skipping a branch from 0x%.8x: %s' % (op.va, repr(op))
self.scbreaks.append(op.va)
continue
# Conditional branches always create new blocks...
if bflags & envi.BR_COND:
self.bplist.append(br)
continue
# Even non-conditional jmp's will create new blocks for now...
if br != op.va + len(op):
self.bplist.append(br)
continue
return ret
[docs]class StalkerBreak(vtrace.Breakpoint):
'''
Stalker breakpoints are added to function entry points
to trigger code-flow analysis and subsequent block breakpoint
addition.
'''
def __init__(self, address, expression=None):
vtrace.Breakpoint.__init__(self, address, expression=expression)
self.fastbreak = True
self.mymap = None
[docs] def resolvedaddr(self, trace, address):
vtrace.Breakpoint.resolvedaddr(self, trace, address)
self.mymap = trace.getMemoryMap(address)
[docs] def notify(self, event, trace):
self.trace = trace
# Get out of the way
self.enabled = False
self.deactivate(trace)
breaks = trace.getMeta('StalkerBreaks')
h = trace.getMeta('StalkerHits')
h.append(self.address)
cf = trace.getMeta('StalkerCodeFlow')
if cf == None:
cf = StalkerCodeFlow(trace)
trace.setMeta('StalkerCodeFlow', cf)
cf.setupBreakLists(self.mymap)
cf.addCodeFlow(self.address, persist=True)
for va in cf.bplist:
if breaks.get(va):
continue
breaks[va] = True
#print 'block: 0x%.8x' % va
b = StalkerBlockBreak(va)
bid = trace.addBreakpoint(b)
for va in cf.sbreaks:
if breaks.get(va):
continue
breaks[va] = True
#print 'func: 0x%.8x' % va
b = StalkerBreak(va)
bid = trace.addBreakpoint(b)
for va in cf.scbreaks:
if breaks.get(va):
continue
breaks[va] = True
#print 'call: 0x%.8x' % va
b = StalkerDynBreak(va)
bid = trace.addBreakpoint(b)
[docs]class StalkerBlockBreak(vtrace.Breakpoint):
'''
A breakpoint object which is put on codeblock boundaries
to track hits.
'''
def __init__(self, address, expression=None):
vtrace.Breakpoint.__init__(self, address, expression=expression)
self.fastbreak = True
[docs] def notify(self, event, trace):
h = trace.getMeta('StalkerHits')
h.append(self.address)
self.enabled = False
self.deactivate(trace)
trace.runAgain()
[docs]class StalkerDynBreak(vtrace.Breakpoint):
'''
A breakpoint which is placed on dynamic branches to track
code flow across them.
'''
def __init__(self, address, expression=None):
vtrace.Breakpoint.__init__(self, address, expression=expression)
self.fastbreak = True
self.mymap = None
self.lasthit = None
self.lastcnt = 0
[docs] def resolvedaddr(self, trace, address):
vtrace.Breakpoint.resolvedaddr(self, trace, address)
self.mymap = trace.getMemoryMap(address)
[docs] def notify(self, event, trace):
trace.runAgain()
self.deactivate(trace)
op = trace.parseOpcode(self.address)
# Where is the call going?
dva = op.getOperValue(0, emu=trace)
if self.lasthit == dva:
self.lastcnt += 1
else:
self.lasthit = dva
self.lastcnt = 0
#print 'Dynamic: 0x%.8x: %s -> 0x%.8x' % (self.address, repr(op), dva)
if trace.getMemoryMap(dva) == self.mymap:
addStalkerEntry(trace, dva)
if self.lastcnt > 10: # FIXME what should this be??!?!
self.lasthit = None
self.lastcnt = 0
self.enabled = False
else:
self.activate(trace)
[docs]def initStalker(trace):
if trace.getMeta('StalkerBreaks') == None:
trace.setMeta('StalkerBreaks', {})
trace.setMeta('StalkerHits', [])
[docs]def clearStalkerHits(trace):
'''
Clear the stalker hit list for the given trace
'''
initStalker(trace)
trace.setMeta('StalkerHits', [])
[docs]def getStalkerHits(trace):
'''
Retrieve the list of blocks hit in the current stalker
'''
initStalker(trace)
return trace.getMeta('StalkerHits', [])
[docs]def clearStalkerBreaks(trace):
'''
Cleanup all stalker breaks and metadata
'''
initStalker(trace)
breaks = trace.getMeta('StalkerBreaks', {})
trace.setMeta('StalkerCodeFlow', None)
bpaddrs = list(breaks.keys())
for va in bpaddrs:
bp = trace.getBreakpointByAddr(va)
if bp != None:
trace.removeBreakpoint(bp.id)
breaks.pop(va, None)
[docs]def resetStalkerBreaks(trace):
'''
Re-enable all previously hit stalker breakpoints.
'''
initStalker(trace)
breaks = trace.getMeta('StalkerBreaks', {})
bpaddrs = list(breaks.keys())
trace.fb_bp_done = False # FIXME HACK
for va in bpaddrs:
bp = trace.getBreakpointByAddr(va)
if bp != None:
trace.setBreakpointEnabled(bp.id, enabled=True)
[docs]def addStalkerEntry(trace, va):
'''
Add stalker coverage beginning with the specified entry point
'''
initStalker(trace)
b = trace.getMeta('StalkerBreaks')
if b.get(va):
return
bp = StalkerBreak(va)
trace.addBreakpoint(bp)
b[va] = True