"""
Cobra RMI Framework
Cobra is a remote method invocation interface that is very "pythony". It is
MUCH like its inspiration pyro, but slimmer and safer for things like threading
and object de-registration. Essentially, cobra allows you to call methods from
and get/set attributes on objects that exist on a remote system.
"""
# Copyright (C) 2011 Invisigoth - See LICENSE file for details
import os
import sys
import time
import errno
import types
import socket
import struct
import urllib2
import traceback
import cPickle as pickle
from threading import currentThread,Thread,RLock
from SocketServer import ThreadingTCPServer, BaseRequestHandler
daemon = None
verbose = False
version = "Cobra2"
COBRA_PORT=5656
COBRASSL_PORT=5653
cobra_retrymax = None # Optional *global* retry max count
# Message Types
COBRA_HELLO = 0
COBRA_CALL = 1
COBRA_GETATTR = 2
COBRA_SETATTR = 3
COBRA_ERROR = 4
COBRA_GOODBYE = 5
[docs]class CobraException(Exception):
"""Base for Cobra exceptions"""
pass
[docs]class CobraClosedException(CobraException):
"""Raised when a connection is unexpectedly closed."""
pass
[docs]class CobraRetryException(Exception):
"""Raised when the retrymax (if present) for a proxy object is exceeded."""
pass
[docs]class CobraPickleException(Exception):
"""Raised when pickling fails."""
pass
[docs]def connectSocket(host, port, timeout=None):
"""
Make the long names go away....
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout is not None:
s.settimeout(timeout)
s.connect((host, port))
return s
[docs]def getCallerInfo():
"""
This function may be used from *inside* a method being called
by a remote caller. It will return a tuple of host,port for the
other side of the connection... use wisely ;)
"""
return getattr(currentThread(),"_cobra_caller_info",None)
[docs]def getLocalInfo():
"""
This function returns the local host,port combination being
used in the socket servicing the current request
"""
return getattr(currentThread(), "_cobra_local_info", None)
[docs]def setCallerInfo(callerinfo):
"""
This is necissary because of crazy python method call
name munging for thread attributes ;)
"""
currentThread()._cobra_caller_info = callerinfo
[docs]def setLocalInfo(localinfo):
currentThread()._cobra_local_info = localinfo
[docs]class CobraMethod:
def __init__(self, proxy, methname):
self.proxy = proxy
self.methname = methname
def __call__(self, *args, **kwargs):
name = self.proxy._cobra_name
if verbose: print "CALLING:",name,self.methname,repr(args)[:20],repr(kwargs)[:20]
csock = self.proxy._cobra_getsock()
mtype, name, data = csock.cobraTransaction(COBRA_CALL, name, (self.methname, args, kwargs))
if mtype == COBRA_CALL:
return data
raise data
[docs]class CobraSocket:
def __init__(self, socket):
self.socket = socket
[docs] def getSockName(self):
return self.socket.getsockname()
[docs] def getPeerName(self):
return self.socket.getpeername()
[docs] def sendMessage(self, mtype, objname, data):
"""
Send message is responsable for transmission of cobra messages,
and socket reconnection in the event that the send fails for network
reasons.
"""
try:
buf = pickle.dumps(data)
except pickle.PickleError, e:
raise CobraPickleException("The arguments/attributes must be pickleable: %s" % e)
self.sendExact(struct.pack("<III", mtype, len(objname), len(buf)) + objname + buf)
[docs] def recvMessage(self):
"""
Returns tuple of mtype, objname, and data
This method is *NOT* responsable for re-connection, because there
is not context on the server side for what to send on re-connect.
Client side uses of the CobraSocket object should use cobraTransaction
to ensure re-tranmission of the request on reception errors.
"""
s = self.socket
hdr = self.recvExact(12)
mtype, nsize, dsize = struct.unpack("<III", hdr)
name = self.recvExact(nsize)
data = pickle.loads(self.recvExact(dsize))
return (mtype, name, data)
[docs] def recvExact(self, size):
buf = ""
s = self.socket
while len(buf) != size:
x = s.recv(size - len(buf))
if len(x) == 0:
raise CobraClosedException("Socket closed in recvExact...")
buf += x
return buf
[docs] def sendExact(self, buf):
self.socket.sendall(buf)
[docs]class SocketBuilder:
'''
For internal use by CobraClientSocket
'''
def __init__(self, scheme, host, port, timeout, kwargs):
self.scheme = scheme
self.host = host
self.port = port
self.timeout = timeout
self.kwargs = kwargs
def __call__(self):
host = self.host
port = self.port
if verbose: print "CONNECTING TO:",self.host,self.port
timeout = self.timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.timeout is not None:
sock.settimeout(self.timeout)
if self.scheme == 'cobrassl':
# A cobra SSL socket can take a few keywords in the
# CobraProxy constructor...
import ssl
sslkwargs = {}
sslca = self.kwargs.get('sslca')
if sslca != None:
sslkwargs['ca_certs'] = sslca
sslkwargs['cert_reqs']=ssl.CERT_REQUIRED
# Check for specified client certificate info
sslkey = self.kwargs.get('sslkey')
if sslkey:
sslkwargs['keyfile'] = sslkey
sslcrt = self.kwargs.get('sslcrt')
if sslcrt:
sslkwargs['certfile'] = sslcrt
sock = ssl.wrap_socket(sock, **sslkwargs)
sock.connect((self.host, self.port))
if verbose:'CONNECTED!'
return sock
[docs]class CobraClientSocket(CobraSocket):
def __init__(self, sockctor, retrymax=None):
CobraSocket.__init__(self, sockctor())
self.sockctor = sockctor
self.retries = 0
self.retrymax = retrymax
if self.retrymax == None:
self.retrymax = cobra_retrymax
[docs] def reConnect(self):
"""
Handle the event where we need to reconnect
"""
while self.retrymax is None or self.retries < self.retrymax:
if verbose: sys.stderr.write("COBRA: Reconnection Attempt\n")
try:
self.socket = self.sockctor()
return
except Exception, e:
time.sleep(2 ** self.retries)
self.retries += 1
raise CobraRetryException()
[docs] def cobraTransaction(self, mtype, objname, data):
"""
This is an API for clients to use. It will retransmit
a sendMessage() automagically on recpt of an exception
in recvMessage()
"""
while True:
try:
self.sendMessage(mtype, objname, data)
return self.recvMessage()
except CobraClosedException, e:
self.reConnect()
except socket.error, e:
self.reConnect()
[docs]class CobraDaemon(ThreadingTCPServer):
def __init__(self, host="", port=COBRA_PORT, sslcrt=None, sslkey=None, sslca=None):
'''
Construct a cobra daemon object.
Parameters:
host - Optional hostname/ip to bind the service to (default: inaddr_any)
port - The port to bind (Default: COBRA_PORT)
# SSL Options
sslcrt / sslkey - Specify sslcrt and sslkey to enable SSL server side
sslca - Specify an SSL CA key to use validating client certs
'''
self.shared = {}
self.host = host
self.port = port
self.reflock = RLock()
self.refcnts = {}
# SSL Options
self.sslca = sslca
self.sslcrt = sslcrt
self.sslkey = sslkey
if sslcrt and not os.path.isfile(sslcrt):
raise Exception('CobraDaemon: sslcrt param must be a file!')
if sslkey and not os.path.isfile(sslkey):
raise Exception('CobraDaemon: sslkey param must be a file!')
if sslca and not os.path.isfile(sslca):
raise Exception('CobraDaemon: sslca param must be a file!')
self.allow_reuse_address = True
ThreadingTCPServer.__init__(self, (host, port), CobraRequestHandler)
if port == 0:
self.port = self.socket.getsockname()[1]
self.daemon_threads = True
[docs] def fireThread(self):
thr = Thread(target=self.serve_forever)
thr.setDaemon(True)
thr.start()
[docs] def getSharedObject(self, name):
return self.shared.get(name, None)
[docs] def getSharedObjects(self):
'''
Return a list of (name, obj) for the currently shared objects.
Example:
for name,obj in daemon.getSharedObjects():
print('%s: %r' % (name,obj))
'''
return self.shared.items()
[docs] def getSharedName(self, obj):
'''
If this object is shared already, get the name...
'''
for name, sobj in self.shared.items():
if sobj == obj:
return name
return None
[docs] def getRandomName(self):
ret = ""
for byte in os.urandom(16):
ret += "%.2x" % ord(byte)
return ret
[docs] def shareObject(self, obj, name=None, doref=False):
"""
Share an object in this cobra server. By specifying
doref=True you will let CobraProxy objects decide that
the object is done and should be un-shared. Also, if
name == None a random name is chosen.
Returns: name (or the newly generated random one)
"""
refcnt = None
if doref:
refcnt = 0
if name == None:
name = self.getRandomName()
self.shared[name] = obj
self.refcnts[name] = refcnt
return name
[docs] def getObjectRefCount(self, name):
return self.refcnts.get(name)
[docs] def decrefObject(self, name):
"""
Decref this object and if it reaches 0, unshare it.
"""
if verbose: print "DECREF:",name
self.reflock.acquire()
try:
refcnt = self.refcnts.get(name, None)
if refcnt != None:
refcnt -= 1
self.refcnts[name] = refcnt
if refcnt == 0:
self.unshareObject(name)
finally:
self.reflock.release()
[docs] def increfObject(self, name):
if verbose: print "INCREF:",name
self.reflock.acquire()
try:
refcnt = self.refcnts.get(name, None)
if refcnt != None:
refcnt += 1
self.refcnts[name] = refcnt
finally:
self.reflock.release()
[docs] def unshareObject(self, name):
if verbose: print 'UNSHARE',name
self.refcnts.pop(name, None)
return self.shared.pop(name, None)
[docs]class CobraRequestHandler(BaseRequestHandler):
[docs] def handle(self):
c = CobraConnectionHandler(self.server, self.request)
c.handleClient()
[docs]class CobraConnectionHandler:
def __init__(self, daemon, socket):
self.daemon = daemon
self.socket = socket
self.handlers = (
self.handleHello,
self.handleCall,
self.handleGetAttr,
self.handleSetAttr,
self.handleError,
self.handleGoodbye)
[docs] def handleClient(self):
peer = self.socket.getpeername()
me = self.socket.getsockname()
if verbose: print "GOT A CONNECTIONN",peer
sock = self.socket
if self.daemon.sslkey:
import ssl
sslca = self.daemon.sslca
keyfile = self.daemon.sslkey
certfile = self.daemon.sslcrt
sslreq = ssl.CERT_NONE
# If they specify a CA key, require valid client certs
if sslca:
sslreq=ssl.CERT_REQUIRED
sock = ssl.wrap_socket(sock,
keyfile=keyfile, certfile=certfile,
ca_certs=sslca, cert_reqs=sslreq,
server_side=True)
csock = CobraSocket(sock)
setCallerInfo(peer)
setLocalInfo(me)
while True:
try:
mtype,name,data = csock.recvMessage()
except CobraClosedException:
break
except socket.error:
if verbose: traceback.print_exc()
break
obj = self.daemon.getSharedObject(name)
if verbose: print "MSG FOR:",name,type(obj)
if obj == None:
try:
csock.sendMessage(COBRA_ERROR, name, Exception("Unknown object requested: %s" % name))
except CobraClosedException:
pass
if verbose: print "WARNING: Got request for unknown object",name
continue
try:
handler = self.handlers[mtype]
except:
try:
csock.sendMessage(COBRA_ERROR, name, Exception("Invalid Message Type"))
except CobraClosedException:
pass
if verbose: print "WARNING: Got Invalid Message Type: %d for %s" % (mtype, data)
continue
try:
handler(csock, name, obj, data)
except Exception, e:
if verbose: traceback.print_exc()
try:
csock.sendMessage(COBRA_ERROR, name, e)
except TypeError, typee:
# Probably about pickling...
csock.sendMessage(COBRA_ERROR, name, Exception(str(e)))
except CobraClosedException:
pass
[docs] def handleError(self, csock, oname, obj, data):
print "THIS SHOULD NEVER HAPPEN"
[docs] def handleHello(self, csock, oname, obj, data):
"""
Hello messages are used to get the initial cache of
method names for the newly connected object.
"""
if verbose: print "GOT A HELLO"
self.daemon.increfObject(oname)
ret = {}
for name in dir(obj):
if type(getattr(obj,name)) in (types.MethodType, types.BuiltinMethodType):
ret[name] = True
try:
csock.sendMessage(COBRA_HELLO, version, ret)
except CobraClosedException:
pass
[docs] def handleCall(self, csock, oname, obj, data):
if verbose: print "GOT A CALL",data
methodname, args, kwargs = data
meth = getattr(obj, methodname)
try:
csock.sendMessage(COBRA_CALL, "", meth(*args, **kwargs))
except CobraClosedException:
pass
[docs] def handleGetAttr(self, csock, oname, obj, name):
if verbose: print "GETTING ATTRIBUTE:",name
try:
csock.sendMessage(COBRA_GETATTR, "", getattr(obj, name))
except CobraClosedException:
pass
[docs] def handleSetAttr(self, csock, oname, obj, data):
if verbose: print "SETTING ATTRIBUTE:",data
name,value = data
setattr(obj, name, value)
try:
csock.sendMessage(COBRA_SETATTR, "", "")
except CobraClosedException:
pass
[docs] def handleGoodbye(self, csock, oname, obj, data):
if verbose: print 'GOODBYE!',oname,obj,data
self.daemon.decrefObject(oname)
try:
csock.sendMessage(COBRA_GOODBYE, "", "")
except CobraClosedException:
pass
[docs]def isCobraUri(uri):
try:
x = urllib2.Request(uri)
if x.get_type() not in ["cobra","cobrassl"]:
return False
except Exception, e:
return False
return True
[docs]class CobraProxy:
"""
A proxy object for remote objects shared with Cobra
Additional keyword arguments may depend on protocol.
cobra://
Only the standard args
cobrassl://
sslca - File path to a CA certs file. Causes server validation.
sslcrt / sslkey - Client side cert info
"""
def __init__(self, URI, retrymax=None, timeout=None, **kwargs):
port = COBRA_PORT
req = urllib2.Request(URI)
scheme = req.get_type()
host = req.get_host()
name = req.get_selector().strip("/")
if scheme not in ["cobra","cobrassl"]:
raise Exception("Invalid scheme: %s" % scheme)
if ":" in host:
host,portstr = host.split(":")
port = int(portstr)
if verbose: print "HOST",host,"PORT",port,"OBJ",name
self._cobra_uri = URI
self._cobra_scheme = scheme
self._cobra_host = host
self._cobra_port = port
self._cobra_slookup = (host,port)
self._cobra_name = name
self._cobra_retrymax = retrymax
self._cobra_timeout = timeout
self._cobra_kwargs = kwargs
if kwargs.get('sslkey') and not os.path.isfile(kwargs.get('sslkey')):
raise Exception('CobraProxy: sslkey must be a file!')
if kwargs.get('sslcrt') and not os.path.isfile(kwargs.get('sslcrt')):
raise Exception('CobraProxy: sslcrt must be a file!')
if kwargs.get('sslca') and not os.path.isfile(kwargs.get('sslca')):
raise Exception('CobraProxy: sslca must be a file!')
csock = self._cobra_getsock()
mtype,rver,data = csock.cobraTransaction(COBRA_HELLO, name, "")
if mtype == COBRA_ERROR:
raise data
if rver != version:
raise Exception("Server Version Not Supported: %s" % rver)
if mtype != COBRA_HELLO:
raise Exception("Invalid Cobra Hello Response")
self._cobra_methods = data
def _cobra_getsock(self):
thr = currentThread()
tsocks = getattr(thr, 'cobrasocks', None)
if tsocks == None:
tsocks = {}
thr.cobrasocks = tsocks
sock = tsocks.get(self._cobra_slookup)
if not sock:
sock = self._cobra_newsock()
tsocks[self._cobra_slookup] = sock
return sock
def _cobra_newsock(self):
"""
This is only used by *clients*
"""
retrymax = self._cobra_retrymax
builder = SocketBuilder( self._cobra_scheme,
self._cobra_host,
self._cobra_port,
self._cobra_timeout,
self._cobra_kwargs)
return CobraClientSocket(builder, retrymax=retrymax)
def __getstate__(self):
return self.__dict__
def __setstate__(self, sdict):
self.__dict__.update(sdict)
def __hash__(self):
return hash(self._cobra_uri)
def __nonzero__(self):
return True
def __repr__(self):
return str(self)
def __str__(self):
return "<CobraProxy %s>" % self._cobra_uri
def __eq__(self, obj):
ouri = getattr(obj, '_cobra_uri', None)
return self._cobra_uri == ouri
def __ne__(self, obj):
if self == obj:
return False
return True
def __setattr__(self, name, value):
if verbose: print "SETATTR %s %s" % (name, repr(value)[:20])
if name.startswith('_cobra_'):
self.__dict__[name] = value
return
csock = self._cobra_getsock()
mtype,name,data = csock.cobraTransaction(COBRA_SETATTR, self._cobra_name, (name, value))
if mtype == COBRA_ERROR:
raise data
elif mtype == COBRA_SETATTR:
return
else:
raise Exception("Invalid Cobra Response")
def __getattr__(self, name):
if verbose: print "GETATTR",name
if name == "__getinitargs__":
raise AttributeError()
# Handle methods
if self._cobra_methods.get(name, False):
return CobraMethod(self, name)
csock = self._cobra_getsock()
mtype,name,data = csock.cobraTransaction(COBRA_GETATTR, self._cobra_name, name)
if mtype == COBRA_ERROR:
raise data
return data
def __del__(self):
"""
Tell the server we're done with our reference in case it's refcnt'd
"""
try:
csock = self._cobra_getsock()
csock.cobraTransaction(COBRA_GOODBYE, self._cobra_name, "")
except socket.error, e:
if verbose: print "Probably Harmless: %s" % e
except CobraException, e:
if verbose: print "Probably Harmless: %s" % e
[docs]def startCobraServer(host="", port=COBRA_PORT):
global daemon
if daemon == None:
daemon = CobraDaemon(host,port)
daemon.fireThread()
return daemon
[docs]def runCobraServer(host='', port=COBRA_PORT):
daemon = CobraDaemon(host,port)
daemon.serve_forever()
[docs]def shareObject(obj, name=None, doref=False):
"""
If shareObject is called before startCobraServer
or startCobraSslServer, it will call startCobraServer
"""
global daemon
if daemon == None:
startCobraServer()
return daemon.shareObject(obj, name, doref=doref)
[docs]def unshareObject(name):
return daemon.unshareObject(name)