# ex:ts=4 # # asyncfarm # -- forkfarm extension to use Medusa Channel as a carrot # # $LinuxKorea: asyncfarm.py,v 1.14 2001/10/16 07:39:11 perky Exp $ # # Copyright 2001 Hye-Shik Chang. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of author nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # code by Hye-Shik Chang # import asyncore, asynchat import socket, signal from forkfarm import SC_SIGHUP, SC_SIGTERM, SC_SIGINT, forkfarm from errno import EALREADY, EINPROGRESS, EWOULDBLOCK class asyncfarm_carrot(asynchat.async_chat): """ the asyncfarm connection controller """ def __init__(self, sock, addr, side): asynchat.async_chat.__init__(self, sock) self.sock = sock self.addr = addr self.side = side if hasattr(self, 'init'): # help not to confuse method's name. self.run = getattr(self, 'init') def start(self): if self.access(self.addr): self.close() else: self.run() def access(self, addr): # override me return None def run(self): # please use this only for 'channel initializer'. # NEVER BLOCK AT THIS POINT. # and you can implement 'init' method instead of this. pass class asyncfarm(forkfarm): """ prefork daemon framework with medusa channel """ def __init__(self, address, controller=asyncfarm_carrot, maxprocess=4, side={}): forkfarm.__init__(self, address, controller, maxprocess, socket.SOCK_STREAM, 0, side) # limiting hard run time on AsyncFarm is so weird! self.childclass = asyncfarm_child class asyncfarm_child(asyncore.dispatcher): def __init__(self, sock, controller, cinfo, side): asyncore.dispatcher.__init__(self, sock) self.childn = cinfo[0] self.sock = sock self.side = side self.goahead = 1 self.accepting = 1 self.controller = controller def handle_accept(self): try: conn, addr = self.sock.accept() except socket.error, why: if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK): return else: raise socket.error, why else: self.controller(conn, addr, self.side).start() # it doesn't block hehe :) def writable(self): return 0 def loop(self): signal.signal(signal.SIGTERM, self.signal_terminator) while self.goahead: # TODO: check graceful terminating on forkfarm asyncore.loop() signal.signal(signal.SIGTERM, signal.SIG_DFL) def signal_terminator(self, signum, frame): self.goahead = 0 asyncore.close_all() try: # this is heavy extension for many non-essential extensions import pqueue from time import time import select except: pass else: class fancy_asyncfarm_carrot(asyncfarm_carrot): """ the fancy asyncfarm connection controller """ def __init__(self, sock, addr, side): asyncfarm_carrot.__init__(self, sock, addr, side) self.alarmq = side['__alarmq'] def setalarm(self, interval): if interval <= 0.0: del self.alarmq[self] # this can throw KeyError else: self.alarmq[self] = time() + interval def handle_alarm(self): self.log_info('unhandled alarm event', 'warning') def close(self): asynchat.async_chat.close(self) try: del self.alarmq[self] except KeyError, IndexError: pass class fancy_asyncfarm(asyncfarm): """ ''fancy'' prefork daemon framework with medusa channel """ def __init__(self, address, controller=fancy_asyncfarm_carrot, maxprocess=4, side={}): forkfarm.__init__(self, address, controller, maxprocess, socket.SOCK_STREAM, 0, side) self.childclass = fancy_asyncfarm_child class fancy_asyncfarm_child(asyncfarm_child): def __init__(self, sock, controller, cinfo, side): if side.has_key('map'): self.map = side['map'] else: self.map = asyncore.socket_map if hasattr(select, 'poll'): self.poll = asyncore.poll3 else: self.poll = asyncore.poll if side.has_key('timeunit'): self.timeunit = side['timeunit'] else: self.timeunit = 30.0 asyncfarm_child.__init__(self, sock, controller, cinfo, side) self.alarmq = pqueue.PQueue() self.side['__alarmq'] = self.alarmq # fd: time def loop(self): signal.signal(signal.SIGTERM, self.signal_terminator) while self.goahead: # TODO: check graceful terminating on forkfarm self.poll(self.timeunit, self.map) while self.alarmq and self.alarmq.peek()[0] <= time(): self.alarmq.pop()[1].handle_alarm() signal.signal(signal.SIGTERM, signal.SIG_DFL) if __name__ == "__main__": import os class mycontrol(fancy_asyncfarm_carrot): def init(self): self.set_terminator('\n') self.push("Hello!! I'm process %d.\n" % os.getpid()) self.setalarm(5) def collect_incoming_data(self, data): print ">>>", data def found_terminator(self): self.push("Oh Yeah!\n") self.setalarm(5) def handle_alarm(self): self.push("Hurry!!\n") self.setalarm(5) fancy_asyncfarm(('', 9988), mycontrol, side={'timeunit': 5.0}).loop()