"""
Cobra's built in clustering framework
"""
import gc
import sys
import time
import Queue
import struct
import socket
import urllib2
import traceback
import threading
import subprocess
import multiprocessing
import cobra
import dcode
queen_port = 32124
cluster_port = 32123
cluster_ip = "224.69.69.69"
sub_cmd = """
import cobra.cluster
cobra.cluster.getAndDoWork("%s", docode=%s)
"""
class InvalidInProgWorkId(Exception):
[docs] def __init__(self, workid):
Exception.__init__(self, "Work ID %d is not valid" % workid)
self.workid = workid
class ClusterWork(object):
[docs] """
Extend this object to create your own work units. Do it in
a proper module (and not __main__ to be able to use this
in conjunction with cobra.dcode).
"""
def __init__(self, timeout=None):
object.__init__(self)
self.id = None # Set by adding to the server
self.server = None # Set by ClusterClient
self.starttime = 0
self.endtime = 0 # Both are set by worker before and after work()
self.timeout = timeout
self.touchtime = None
self.excinfo = None # Will be exception traceback on work unit fail.
def touch(self): # heh...
[docs] """
Update the internal "touch time" which is used by the timeout
subsystem to see if this work unit has gone too long without
making progress...
"""
self.touchtime = time.time()
def isTimedOut(self):
[docs] """
Check if this work unit is timed out.
"""
if self.timeout == None:
return False
if self.touchtime == None:
return False
return (self.touchtime + self.timeout) < time.time()
def work(self):
[docs] """
Actually do the work associated with this work object.
"""
print "OVERRIDE ME"
for i in range(10):
self.setCompletion(i*10)
self.setStatus("Sleeping: %d" % i)
time.sleep(1)
def done(self):
[docs] """
This is called back on the server once a work unit
is complete and returned.
"""
print "OVERRIDE DONE"
def setCompletion(self, percent):
[docs] """
Work units may call this whenever they like to
tell the server how far along their work they are.
"""
self.touch()
self.server.setWorkCompletion(self.id, percent)
def setStatus(self, status):
[docs] """
Work units may call this to inform the server of
their status.
"""
self.touch()
self.server.setWorkStatus(self.id, status)
def openSharedFile(self, filename):
[docs] '''
A helper API to open a file like object on the server.
Example:
fd = self.openSharedFile('/foo/bar/baz')
fbytes = fd.read()
NOTE: The server must use shareFileToWorkers().
'''
uri = self.server.openSharedFile( filename )
return cobra.CobraProxy(uri)
class ClusterCallback:
[docs] """
Place one of these in the ClusterServer to get synchronous
event information about what's going on in the cluster server.
(mostly for the GUI).
"""
def workAdded(self, server, work):
[docs] pass
def workGotten(self, server, work):
[docs] pass
def workStatus(self, server, workid, status):
[docs] pass
def workCompletion(self, server, workid, completion):
[docs] pass
def workDone(self, server, work):
[docs] pass
def workFailed(self, server, work):
[docs] pass
def workTimeout(self, server, work):
[docs] pass
def workCanceled(self, server, work):
[docs] pass
class VerboseCallback(ClusterCallback):
[docs] # This is mostly for testing...
def workAdded(self, server, work):
[docs] print "WORK ADDED: %d" % work.id
def workGotten(self, server, work):
[docs] print "WORK GOTTEN: %d" % work.id
def workStatus(self, server, workid, status):
[docs] print "WORK STATUS: (%d) %s" % (workid, status)
def workCompletion(self, server, workid, completion):
[docs] print "WORK COMPLETION: (%d) %d%%" % (workid, completion)
def workDone(self, server, work):
[docs] print "WORK DONE: %d" % work.id
def workFailed(self, server, work):
[docs] print "WORK FAILED: %d" % work.id
def workTimeout(self, server, work):
[docs] print "WORK TIMEOUT: %d" % work.id
def workCanceled(self, server, work):
[docs] print "WORK CANCELED %d" % work.id
import collections
class ClusterServer:
[docs]
def __init__(self, name, maxsize=None, docode=False, bindsrc="", cobrad=None):
"""
The cluster server is the core of the code that manages work units.
Arguments:
maxsize - How big should the work queue be before add blocks
docode - Should we also be a dcode server?
bindsrc - Should we bind a src IP for our multicast announcements?
cobrad - Should we use an existing cobra daemon to share our objects?
"""
self.go = True
self.name = name
self.queens = []
self.nextwid = 0
self.inprog = {}
self.sharedfiles = {}
self.maxsize = maxsize
self.queue = collections.deque()
self.qcond = threading.Condition()
self.widiter = iter(xrange(999999999))
# Initialize a cobra daemon if needed
if cobrad == None:
cobrad = cobra.CobraDaemon(host="", port=0)
self.cobrad = cobrad
self.cobraname = self.cobrad.shareObject(self)
# Setup our transmission socket
self.sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sendsock.bind((bindsrc, 0))
# Set this to a ClusterCallback extension if
# you want notifications.
self.callback = None
if docode:
self.cobrad.shareObject(dcode.DcodeFinder(), "DcodeServer")
# Fire the timeout monitor thread...
thr = threading.Thread(target=self.timerThread)
thr.setDaemon(True)
thr.start()
def addClusterQueen(self, queenhost):
[docs] '''
Inform the ClusterServer about the presence of a ClusterQueen instance
on the given host. When the ClusterServer begins to announce work,
he will do so in "infrastructure mode" and ask any set queens for help.
'''
queen = cobra.CobraProxy('cobra://%s:%d/ClusterQueen' % (queenhost, queen_port))
self.queens.append( queen )
def shareFileToWorkers(self, filename):
[docs] '''
Add a file to the list of files which are "shared" to worker clients.
This allows workers to access a file from the server.
Example:
s.shareFileToWorkers('/path/to/file')
NOTE: Workers may use the openSharedFile() API to access them.
'''
self.sharedfiles[filename] = True
def openSharedFile(self, filename):
[docs] '''
Return a URI for an open file decriptor for the specified filename.
NOTE: use openSharedFile() method on work unit to get back a proxy.
'''
if not self.sharedfiles.get(filename):
raise Exception('File %s is not shared!')
fd = file(filename, 'rb')
cname = self.cobrad.shareObject(fd, doref=True)
host,port = cobra.getLocalInfo()
uri = 'cobra://%s:%d/%s' % (host, port, cname)
return uri
def __touchWork(self, workid):
# Used to both validate an inprog workid *and*
# update it's timestamp for the timeout thread
work = self.inprog.get(workid, None)
if work == None:
raise InvalidInProgWorkId(workid)
work.touch()
def __cleanWork(self, workid):
# Used by done/timeout/etc to clea up an in
# progress work unit
return self.inprog.pop(workid, None)
def timerThread(self):
[docs] # Internal function to monitor work unit time
while self.go:
try:
for id,work in self.inprog.items():
if work.isTimedOut():
self.timeoutWork(work)
except Exception, e:
print "ClusterTimer: %s" % e
time.sleep(2)
def shutdownServer(self):
[docs] self.go = False
def announceWork(self):
[docs] """
Announce to our multicast cluster peers that we have work
to do! (Or use a queen to proxy to them...)
"""
if self.queens:
for q in self.queens:
try:
q.proxyAnnounceWork(self.name, self.cobraname, self.cobrad.port)
except Exception, e:
print('Queen Error: %s' % e)
else:
buf = "cobra:%s:%s:%d" % (self.name, self.cobraname, self.cobrad.port)
self.sendsock.sendto(buf, (cluster_ip, cluster_port))
def runServer(self, firethread=False):
[docs]
if firethread:
thr = threading.Thread(target=self.runServer)
thr.setDaemon(True)
thr.start()
else:
self.cobrad.fireThread()
while self.go:
if len(self.queue):
self.announceWork()
time.sleep(2)
def inQueueCount(self):
[docs] """
How long is the current work unit queue.
"""
return len(self.queue)
def inProgressCount(self):
[docs] """
How many work units are in progress?
"""
return len(self.inprog)
def addWork(self, work):
[docs] """
Add a work object to the ClusterServer. This
"""
if not isinstance(work, ClusterWork):
raise Exception("%s is not a ClusterWork extension!")
# If this work has no ID, give it one
if work.id == None:
work.id = self.widiter.next()
self.qcond.acquire()
if self.maxsize != None:
while len(self.queue) >= self.maxsize:
self.qcond.wait()
self.queue.append(work)
self.qcond.release()
if self.callback:
self.callback.workAdded(self, work)
def getWork(self):
[docs]
self.qcond.acquire()
try:
ret = self.queue.popleft()
except IndexError, e:
self.qcond.release()
return None
self.qcond.notifyAll()
self.qcond.release()
self.inprog[ret.id] = ret
self.__touchWork(ret.id)
if self.callback:
self.callback.workGotten(self, ret)
return ret
def doneWork(self, work):
[docs] """
Used by the clients to report work as done.
"""
self.__cleanWork(work.id)
work.done()
if self.callback:
self.callback.workDone(self, work)
def timeoutWork(self, work):
[docs] """
This method may be over-ridden to handle
work units that time our for whatever reason.
"""
self.__cleanWork(work.id)
if self.callback:
self.callback.workTimeout(self, work)
def failWork(self, work):
[docs] """
This is called for a work unit that is in a failed state. This is most
commonly that the work() method has raised an exception.
"""
self.__cleanWork(work.id)
if self.callback:
self.callback.workFailed(self, work)
def cancelAllWork(self, inprog=True):
[docs] """
Cancel all of the currently pending work units. You may
specify inprog=False to cancel all *queued* work units
but allow inprogress work units to complete.
"""
self.qcond.acquire()
qlist = list(self.queue)
self.queue.clear()
if inprog:
p = self.inprog
self.inprog = {}
qlist.extend(p.values())
self.qcond.notifyAll()
self.qcond.release()
if self.callback:
for w in qlist:
self.callback.workCanceled(self, w)
def cancelWork(self, workid):
[docs] """
Cancel a work unit by ID.
"""
cwork = self.__cleanWork(workid)
# Remove it from the work queue
# (if we didn't find in inprog)
if cwork == None:
self.qcond.acquire()
qlist = list(self.queue)
self.queue.clear()
for work in qlist:
if work.id != workid:
self.queue.append(work)
else:
cwork = work
self.qcond.notifyAll()
self.qcond.release()
if cwork == None:
return
if self.callback:
self.callback.workCanceled(self, cwork)
def setWorkStatus(self, workid, status):
[docs] """
Set the humon readable status for the given work unit.
"""
self.__touchWork(workid)
if self.callback:
self.callback.workStatus(self, workid, status)
def setWorkCompletion(self, workid, percent):
[docs] """
Set the percentage completion status for this work unit.
"""
self.__touchWork(workid)
if self.callback:
self.callback.workCompletion(self, workid, percent)
class ClusterClient:
[docs]
"""
Listen for our name (or any name if name=="*") on the cobra cluster
multicast address and if we find a server in need, go help.
maxwidth is the number of work units to do in parallel
docode will enable code sharing with the server
"""
def __init__(self, name, maxwidth=multiprocessing.cpu_count(), docode=False):
self.go = True
self.name = name
self.width = 0
self.maxwidth = maxwidth
self.verbose = False
self.docode = docode
if docode: dcode.enableDcodeClient()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(("",cluster_port))
mreq = struct.pack("4sL", socket.inet_aton(cluster_ip), socket.INADDR_ANY)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
def processWork(self):
[docs] """
Runs handing out work up to maxwidth until self.go == False.
"""
while self.go:
buf, sockaddr = self.sock.recvfrom(4096)
if self.width >= self.maxwidth:
continue
server, svrport = sockaddr
if not buf.startswith("cobra"):
continue
info = buf.split(":")
ilen = len(info)
if ilen == 4:
cc,name,cobject,portstr = info
elif ilen == 5:
cc,name,cobject,portstr,server = info
else:
continue
if (self.name != name) and (self.name != "*"):
continue
port = int(portstr)
uri = "%s://%s:%d/%s" % (cc,server,port,cobject)
self.fireRunner(uri)
def fireRunner(self, uri):
[docs] thr = threading.Thread(target=self.threadForker, args=(uri,))
thr.setDaemon(True)
thr.start()
def threadForker(self, uri):
[docs] self.width += 1
cmd = sub_cmd % (uri, self.docode)
try:
sub = subprocess.Popen([sys.executable, '-c', cmd], stdin=subprocess.PIPE)
sub.wait()
finally:
self.width -= 1
class ClusterQueen:
[docs]
def __init__(self, ifip, recast=True):
# Setup our transmission socket
self.sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sendsock.bind((ifip, 0))
# FIXME TODO make her optionally a multicast listener that re-broadcasts
# to her subjects...
def proxyAnnounceWork(self, name, cobraname, port):
[docs] """
Send out a multicast announcement to our subjects to go help
a cluster server.
"""
# Get the host IP from the connection information
host, x = cobra.getCallerInfo()
buf = "cobra:%s:%s:%d:%s" % (name, cobraname, port, host)
self.sendsock.sendto(buf, (cluster_ip, cluster_port))
def getHostPortFromUri(uri):
[docs] """
Take the server URI and pull out the
host and port for use in building the
dcode uri.
"""
x = urllib2.Request(uri)
port = None
hparts = x.get_host().split(":")
host = hparts[0]
if len(hparts):
port = int(hparts[1])
return host,port
def workThread(server, work):
[docs] try:
work.server = server
work.starttime = time.time()
work.touch()
work.work()
work.endtime = time.time()
work.server.doneWork(work)
except InvalidInProgWorkId, e: # the work was canceled
pass # Nothing to do, the server already knows
except Exception, e:
# Tell the server that the work unit failed
work.excinfo = traceback.format_exc()
work.server.failWork(work)
traceback.print_exc()
def runAndWaitWork(server, work):
[docs]
thr = threading.Thread(target=workThread, args=(server, work))
thr.setDaemon(True)
thr.start()
# Wait around for done or timeout
while True:
if work.isTimedOut():
break
# If the thread is done, lets get out.
if not thr.isAlive():
break
# If our parent, or some thread closes stdin,
# time to pack up and go.
if sys.stdin.closed:
break
time.sleep(2)
def getAndDoWork(uri, docode=False):
[docs]
# If we wanna use dcode, set it up
try:
if docode:
dcode.enableDcodeClient()
host,port = getHostPortFromUri(uri)
cobra.dcode.addDcodeServer(host, port=port)
# Use a cobra proxy with timeout/maxretry so we
# don't hang forever if the server goes away
proxy = cobra.CobraProxy(uri, timeout=60, retrymax=3)
work = proxy.getWork()
# If we got work, do it.
if work != None:
runAndWaitWork(proxy, work)
except Exception, e:
traceback.print_exc()
# Any way it goes we wanna exit now. Work units may have
# spun up non-daemon threads, so lets GTFO.
gc.collect() # Try to call destructors
sys.exit(0) # GTFO