Source code for cobra.cluster

"""
Cobra's built in clustering framework
"""

import gc
import sys
import time
import Queue
import struct
import socket
import urllib2
import traceback
import threading
import subprocess
import multiprocessing

import cobra
import dcode

queen_port   = 32124

cluster_port = 32123
cluster_ip = "224.69.69.69"

sub_cmd = """
import cobra.cluster
cobra.cluster.getAndDoWork("%s", docode=%s)
"""

class InvalidInProgWorkId(Exception):
[docs] def __init__(self, workid): Exception.__init__(self, "Work ID %d is not valid" % workid) self.workid = workid class ClusterWork(object):
[docs] """ Extend this object to create your own work units. Do it in a proper module (and not __main__ to be able to use this in conjunction with cobra.dcode). """ def __init__(self, timeout=None): object.__init__(self) self.id = None # Set by adding to the server self.server = None # Set by ClusterClient self.starttime = 0 self.endtime = 0 # Both are set by worker before and after work() self.timeout = timeout self.touchtime = None self.excinfo = None # Will be exception traceback on work unit fail. def touch(self): # heh...
[docs] """ Update the internal "touch time" which is used by the timeout subsystem to see if this work unit has gone too long without making progress... """ self.touchtime = time.time() def isTimedOut(self):
[docs] """ Check if this work unit is timed out. """ if self.timeout == None: return False if self.touchtime == None: return False return (self.touchtime + self.timeout) < time.time() def work(self):
[docs] """ Actually do the work associated with this work object. """ print "OVERRIDE ME" for i in range(10): self.setCompletion(i*10) self.setStatus("Sleeping: %d" % i) time.sleep(1) def done(self):
[docs] """ This is called back on the server once a work unit is complete and returned. """ print "OVERRIDE DONE" def setCompletion(self, percent):
[docs] """ Work units may call this whenever they like to tell the server how far along their work they are. """ self.touch() self.server.setWorkCompletion(self.id, percent) def setStatus(self, status):
[docs] """ Work units may call this to inform the server of their status. """ self.touch() self.server.setWorkStatus(self.id, status) def openSharedFile(self, filename):
[docs] ''' A helper API to open a file like object on the server. Example: fd = self.openSharedFile('/foo/bar/baz') fbytes = fd.read() NOTE: The server must use shareFileToWorkers(). ''' uri = self.server.openSharedFile( filename ) return cobra.CobraProxy(uri) class ClusterCallback:
[docs] """ Place one of these in the ClusterServer to get synchronous event information about what's going on in the cluster server. (mostly for the GUI). """ def workAdded(self, server, work):
[docs] pass def workGotten(self, server, work):
[docs] pass def workStatus(self, server, workid, status):
[docs] pass def workCompletion(self, server, workid, completion):
[docs] pass def workDone(self, server, work):
[docs] pass def workFailed(self, server, work):
[docs] pass def workTimeout(self, server, work):
[docs] pass def workCanceled(self, server, work):
[docs] pass class VerboseCallback(ClusterCallback):
[docs] # This is mostly for testing... def workAdded(self, server, work):
[docs] print "WORK ADDED: %d" % work.id def workGotten(self, server, work):
[docs] print "WORK GOTTEN: %d" % work.id def workStatus(self, server, workid, status):
[docs] print "WORK STATUS: (%d) %s" % (workid, status) def workCompletion(self, server, workid, completion):
[docs] print "WORK COMPLETION: (%d) %d%%" % (workid, completion) def workDone(self, server, work):
[docs] print "WORK DONE: %d" % work.id def workFailed(self, server, work):
[docs] print "WORK FAILED: %d" % work.id def workTimeout(self, server, work):
[docs] print "WORK TIMEOUT: %d" % work.id def workCanceled(self, server, work):
[docs] print "WORK CANCELED %d" % work.id import collections
class ClusterServer:
[docs] def __init__(self, name, maxsize=None, docode=False, bindsrc="", cobrad=None): """ The cluster server is the core of the code that manages work units. Arguments: maxsize - How big should the work queue be before add blocks docode - Should we also be a dcode server? bindsrc - Should we bind a src IP for our multicast announcements? cobrad - Should we use an existing cobra daemon to share our objects? """ self.go = True self.name = name self.queens = [] self.nextwid = 0 self.inprog = {} self.sharedfiles = {} self.maxsize = maxsize self.queue = collections.deque() self.qcond = threading.Condition() self.widiter = iter(xrange(999999999)) # Initialize a cobra daemon if needed if cobrad == None: cobrad = cobra.CobraDaemon(host="", port=0) self.cobrad = cobrad self.cobraname = self.cobrad.shareObject(self) # Setup our transmission socket self.sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sendsock.bind((bindsrc, 0)) # Set this to a ClusterCallback extension if # you want notifications. self.callback = None if docode: self.cobrad.shareObject(dcode.DcodeFinder(), "DcodeServer") # Fire the timeout monitor thread... thr = threading.Thread(target=self.timerThread) thr.setDaemon(True) thr.start() def addClusterQueen(self, queenhost):
[docs] ''' Inform the ClusterServer about the presence of a ClusterQueen instance on the given host. When the ClusterServer begins to announce work, he will do so in "infrastructure mode" and ask any set queens for help. ''' queen = cobra.CobraProxy('cobra://%s:%d/ClusterQueen' % (queenhost, queen_port)) self.queens.append( queen ) def shareFileToWorkers(self, filename):
[docs] ''' Add a file to the list of files which are "shared" to worker clients. This allows workers to access a file from the server. Example: s.shareFileToWorkers('/path/to/file') NOTE: Workers may use the openSharedFile() API to access them. ''' self.sharedfiles[filename] = True def openSharedFile(self, filename):
[docs] ''' Return a URI for an open file decriptor for the specified filename. NOTE: use openSharedFile() method on work unit to get back a proxy. ''' if not self.sharedfiles.get(filename): raise Exception('File %s is not shared!') fd = file(filename, 'rb') cname = self.cobrad.shareObject(fd, doref=True) host,port = cobra.getLocalInfo() uri = 'cobra://%s:%d/%s' % (host, port, cname) return uri def __touchWork(self, workid):
# Used to both validate an inprog workid *and* # update it's timestamp for the timeout thread work = self.inprog.get(workid, None) if work == None: raise InvalidInProgWorkId(workid) work.touch() def __cleanWork(self, workid): # Used by done/timeout/etc to clea up an in # progress work unit return self.inprog.pop(workid, None) def timerThread(self):
[docs] # Internal function to monitor work unit time while self.go: try: for id,work in self.inprog.items(): if work.isTimedOut(): self.timeoutWork(work) except Exception, e: print "ClusterTimer: %s" % e time.sleep(2) def shutdownServer(self):
[docs] self.go = False def announceWork(self):
[docs] """ Announce to our multicast cluster peers that we have work to do! (Or use a queen to proxy to them...) """ if self.queens: for q in self.queens: try: q.proxyAnnounceWork(self.name, self.cobraname, self.cobrad.port) except Exception, e: print('Queen Error: %s' % e) else: buf = "cobra:%s:%s:%d" % (self.name, self.cobraname, self.cobrad.port) self.sendsock.sendto(buf, (cluster_ip, cluster_port)) def runServer(self, firethread=False):
[docs] if firethread: thr = threading.Thread(target=self.runServer) thr.setDaemon(True) thr.start() else: self.cobrad.fireThread() while self.go: if len(self.queue): self.announceWork() time.sleep(2) def inQueueCount(self):
[docs] """ How long is the current work unit queue. """ return len(self.queue) def inProgressCount(self):
[docs] """ How many work units are in progress? """ return len(self.inprog) def addWork(self, work):
[docs] """ Add a work object to the ClusterServer. This """ if not isinstance(work, ClusterWork): raise Exception("%s is not a ClusterWork extension!") # If this work has no ID, give it one if work.id == None: work.id = self.widiter.next() self.qcond.acquire() if self.maxsize != None: while len(self.queue) >= self.maxsize: self.qcond.wait() self.queue.append(work) self.qcond.release() if self.callback: self.callback.workAdded(self, work) def getWork(self):
[docs] self.qcond.acquire() try: ret = self.queue.popleft() except IndexError, e: self.qcond.release() return None self.qcond.notifyAll() self.qcond.release() self.inprog[ret.id] = ret self.__touchWork(ret.id) if self.callback: self.callback.workGotten(self, ret) return ret def doneWork(self, work):
[docs] """ Used by the clients to report work as done. """ self.__cleanWork(work.id) work.done() if self.callback: self.callback.workDone(self, work) def timeoutWork(self, work):
[docs] """ This method may be over-ridden to handle work units that time our for whatever reason. """ self.__cleanWork(work.id) if self.callback: self.callback.workTimeout(self, work) def failWork(self, work):
[docs] """ This is called for a work unit that is in a failed state. This is most commonly that the work() method has raised an exception. """ self.__cleanWork(work.id) if self.callback: self.callback.workFailed(self, work) def cancelAllWork(self, inprog=True):
[docs] """ Cancel all of the currently pending work units. You may specify inprog=False to cancel all *queued* work units but allow inprogress work units to complete. """ self.qcond.acquire() qlist = list(self.queue) self.queue.clear() if inprog: p = self.inprog self.inprog = {} qlist.extend(p.values()) self.qcond.notifyAll() self.qcond.release() if self.callback: for w in qlist: self.callback.workCanceled(self, w) def cancelWork(self, workid):
[docs] """ Cancel a work unit by ID. """ cwork = self.__cleanWork(workid) # Remove it from the work queue # (if we didn't find in inprog) if cwork == None: self.qcond.acquire() qlist = list(self.queue) self.queue.clear() for work in qlist: if work.id != workid: self.queue.append(work) else: cwork = work self.qcond.notifyAll() self.qcond.release() if cwork == None: return if self.callback: self.callback.workCanceled(self, cwork) def setWorkStatus(self, workid, status):
[docs] """ Set the humon readable status for the given work unit. """ self.__touchWork(workid) if self.callback: self.callback.workStatus(self, workid, status) def setWorkCompletion(self, workid, percent):
[docs] """ Set the percentage completion status for this work unit. """ self.__touchWork(workid) if self.callback: self.callback.workCompletion(self, workid, percent) class ClusterClient:
[docs] """ Listen for our name (or any name if name=="*") on the cobra cluster multicast address and if we find a server in need, go help. maxwidth is the number of work units to do in parallel docode will enable code sharing with the server """ def __init__(self, name, maxwidth=multiprocessing.cpu_count(), docode=False): self.go = True self.name = name self.width = 0 self.maxwidth = maxwidth self.verbose = False self.docode = docode if docode: dcode.enableDcodeClient() self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(("",cluster_port)) mreq = struct.pack("4sL", socket.inet_aton(cluster_ip), socket.INADDR_ANY) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) def processWork(self):
[docs] """ Runs handing out work up to maxwidth until self.go == False. """ while self.go: buf, sockaddr = self.sock.recvfrom(4096) if self.width >= self.maxwidth: continue server, svrport = sockaddr if not buf.startswith("cobra"): continue info = buf.split(":") ilen = len(info) if ilen == 4: cc,name,cobject,portstr = info elif ilen == 5: cc,name,cobject,portstr,server = info else: continue if (self.name != name) and (self.name != "*"): continue port = int(portstr) uri = "%s://%s:%d/%s" % (cc,server,port,cobject) self.fireRunner(uri) def fireRunner(self, uri):
[docs] thr = threading.Thread(target=self.threadForker, args=(uri,)) thr.setDaemon(True) thr.start() def threadForker(self, uri):
[docs] self.width += 1 cmd = sub_cmd % (uri, self.docode) try: sub = subprocess.Popen([sys.executable, '-c', cmd], stdin=subprocess.PIPE) sub.wait() finally: self.width -= 1 class ClusterQueen:
[docs] def __init__(self, ifip, recast=True): # Setup our transmission socket self.sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sendsock.bind((ifip, 0)) # FIXME TODO make her optionally a multicast listener that re-broadcasts # to her subjects... def proxyAnnounceWork(self, name, cobraname, port):
[docs] """ Send out a multicast announcement to our subjects to go help a cluster server. """ # Get the host IP from the connection information host, x = cobra.getCallerInfo() buf = "cobra:%s:%s:%d:%s" % (name, cobraname, port, host) self.sendsock.sendto(buf, (cluster_ip, cluster_port)) def getHostPortFromUri(uri):
[docs] """ Take the server URI and pull out the host and port for use in building the dcode uri. """ x = urllib2.Request(uri) port = None hparts = x.get_host().split(":") host = hparts[0] if len(hparts): port = int(hparts[1]) return host,port def workThread(server, work):
[docs] try: work.server = server work.starttime = time.time() work.touch() work.work() work.endtime = time.time() work.server.doneWork(work) except InvalidInProgWorkId, e: # the work was canceled pass # Nothing to do, the server already knows except Exception, e: # Tell the server that the work unit failed work.excinfo = traceback.format_exc() work.server.failWork(work) traceback.print_exc() def runAndWaitWork(server, work):
[docs] thr = threading.Thread(target=workThread, args=(server, work)) thr.setDaemon(True) thr.start() # Wait around for done or timeout while True: if work.isTimedOut(): break # If the thread is done, lets get out. if not thr.isAlive(): break # If our parent, or some thread closes stdin, # time to pack up and go. if sys.stdin.closed: break time.sleep(2) def getAndDoWork(uri, docode=False):
[docs] # If we wanna use dcode, set it up try: if docode: dcode.enableDcodeClient() host,port = getHostPortFromUri(uri) cobra.dcode.addDcodeServer(host, port=port) # Use a cobra proxy with timeout/maxretry so we # don't hang forever if the server goes away proxy = cobra.CobraProxy(uri, timeout=60, retrymax=3) work = proxy.getWork() # If we got work, do it. if work != None: runAndWaitWork(proxy, work) except Exception, e: traceback.print_exc() # Any way it goes we wanna exit now. Work units may have # spun up non-daemon threads, so lets GTFO. gc.collect() # Try to call destructors sys.exit(0) # GTFO