# ex:ts=4 # # LikoRPC # -- the Mutual Calling RPC Framework # # $LinuxKorea: likorpc.py,v 2.4 2001/10/29 07:34:46 perky Exp $ # # Copyright 2001 Hye-Shik Chang. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of author nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # code by Hye-Shik Chang # # standard modules import socket, os import string # medusa modules import asyncore import fifo import cw_medusa # standard C extensions try: from cPickle import dumps, loads except: from pickle import dumps, loads try: from cStringIO import StringIO except: from StringIO import StringIO MTYPE_CALL = 0 MTYPE_RETURN = 1 class likorpc_root: _server = None def connect(addr, root=None): return likorpc_channel(root, None, addr) class HeavyBuffer: """ buffer collector for heavy transport """ def __init__(self): self.buffer = StringIO() def set(self, data): self.truncate() self.buffer.write(data) def append(self, data): self.buffer.write(data) def get(self): return self.buffer.getvalue() def truncate(self): self.buffer.seek(0) self.buffer.truncate() class LightBuffer: """ buffer collector for light transport (most cases) """ def __init__(self): self.buffer = '' def set(self, data): self.buffer = data def append(self, data): self.buffer = self.buffer + data def get(self): return self.buffer def truncate(self): self.buffer = '' class likorpc_channel(cw_medusa.async_chat): def __init__(self, root, conn, addr, map=None, heavy=0): if conn: cw_medusa.async_chat.__init__ (self, conn, map) self._connected = 1 else: cw_medusa.async_chat.__init__ (self) self.open(addr) self.root = root self.map = map self.addr = addr self.set_terminator ('\000') self.request_fifo = fifo.fifo() if heavy: self.buffer = HeavyBuffer() else: self.buffer = LightBuffer() def open(self, address): if type(address) is type(''): family = socket.AF_UNIX else: family = socket.AF_INET self.create_socket (family, socket.SOCK_STREAM) self.connect(address) pass def collect_incoming_data(self, data): self.buffer.append(data) def handle_connect(self): self._connected = 1 def handle_close(self): if self.map: self.del_channel(self.map) cw_medusa.async_chat.handle_close(self) def close(self): self._connected = 0 self.flush_pending_requests ('lost connection to rpc server') cw_medusa.async_chat.close(self) def flush_pending_requests(self, why): f = self.request_fifo while len(f): callback = f.pop() callback(why, None) def found_terminator(self): (mtype, code1, code2) = loads(self.buffer.get()) self.buffer.truncate() if mtype == MTYPE_CALL: o = self.root e = None try: for p in code1: o = getattr(o, p) result = apply(o, code2) except: e = repr(asyncore.compact_traceback()) result = None # these two lines must be splitted for prohibit copying big object self.push(dumps((MTYPE_RETURN, e, result))) self.push('\000') elif mtype == MTYPE_RETURN: callback = self.request_fifo.pop() # must be serial, since server do blocking call :) callback(code1, code2) # error, result def call(self, method, args, callback): if not self._connected: raise "OOPS" # How can? #self.open(self.addr) path = string.split (method, '.') self.push(dumps((MTYPE_CALL, path, args)) + '\000') self.request_fifo.push(callback) class likorpc_server(asyncore.dispatcher): def __init__(self, root, address, channel=likorpc_channel, heavy=0): if type(address) is type(()): family = socket.AF_INET else: family = socket.AF_UNIX self.channel = channel self.address = address self.heavytrans = heavy self.root = root self.create_socket(family, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(address) self.listen(128) self.listening = 1 def __del__(self): import os if type(self.address) == type(''): os.unlink(self.address) def handle_accept(self): if self.listening: conn, addr = self.accept() likorpc_channel(self.root, conn, addr) def writable(self): return 0 def loop(self): asyncore.loop() from socketfarm import Control class likorpc_control(Control): heavy = 1 root = likorpc_root def run(self): ownmap = {} likorpc_channel(self.root(), self.sock, self.addr, ownmap, self.heavy) asyncore.loop(map=ownmap) if __name__ == "__main__": class myroot(likorpc_root): def hello(self): return "hello!!!" def mycallback(e, r): print "Called Back!" print e, r # k = likorpc_server(myroot(), "test.sock") # # m = connect("test.sock") # m.call("hello", (), mycallback) # # asyncore.loop() import socketfarm s = socketfarm.SocketFarm(procnum=4, limit=(10, 10, 0), boom=(1.0, 10)) s.add_service(likorpc_control, "test.sock", root=myroot) s.loop()