# ex:ts=4
#
# asyncfarm
#   -- forkfarm extension to use Medusa Channel as a carrot
#
# $LinuxKorea: asyncfarm.py,v 1.14 2001/10/16 07:39:11 perky Exp $
#
# Copyright 2001 Hye-Shik Chang. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without 
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of author nor the names of its contributors
#    may be used to endorse or promote products derived from this software
#    without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. 
#
# code by Hye-Shik Chang <perky@linuxkorea.co.kr>
#

import asyncore, asynchat
import socket, signal

from forkfarm import SC_SIGHUP, SC_SIGTERM, SC_SIGINT, forkfarm
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK

class asyncfarm_carrot(asynchat.async_chat):
	""" the asyncfarm connection controller """

	def __init__(self, sock, addr, side):
		asynchat.async_chat.__init__(self, sock)
		self.sock = sock
		self.addr = addr
		self.side = side
		if hasattr(self, 'init'): # help not to confuse method's name.
			self.run = getattr(self, 'init')
	
	def start(self):
		if self.access(self.addr):
			self.close()
		else:
			self.run()
	
	def access(self, addr):
		# override me
		return None
		
	def run(self):
		# please use this only for 'channel initializer'.
		# NEVER BLOCK AT THIS POINT.
		# and you can implement 'init' method instead of this.
		pass


class asyncfarm(forkfarm):
	""" prefork daemon framework with medusa channel """

	def __init__(self, address, controller=asyncfarm_carrot, maxprocess=4, side={}):
		forkfarm.__init__(self, address, controller, maxprocess, 
										socket.SOCK_STREAM, 0, side)
		# limiting hard run time on AsyncFarm is so weird!
		self.childclass = asyncfarm_child


class asyncfarm_child(asyncore.dispatcher):
	
	def __init__(self, sock, controller, cinfo, side):
		asyncore.dispatcher.__init__(self, sock)
		self.childn = cinfo[0]
		self.sock   = sock
		self.side   = side
		self.goahead = 1
		self.accepting = 1
		self.controller = controller
	
	def handle_accept(self):
		try:
			conn, addr = self.sock.accept()
		except socket.error, why:
			if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
				return
			else:
				raise socket.error, why
		else:
			self.controller(conn, addr, self.side).start() # it doesn't block hehe :)

	def writable(self):
		return 0

	def loop(self):
		signal.signal(signal.SIGTERM, self.signal_terminator)
		
		while self.goahead: # TODO: check graceful terminating on forkfarm
			asyncore.loop()

		signal.signal(signal.SIGTERM, signal.SIG_DFL)
	
	def signal_terminator(self, signum, frame):
		self.goahead = 0
		asyncore.close_all()


try: # this is heavy extension for many non-essential extensions
	import pqueue
	from time import time
	import select
except:
	pass
else:
	class fancy_asyncfarm_carrot(asyncfarm_carrot):
		""" the fancy asyncfarm connection controller """
	
		def __init__(self, sock, addr, side):
			asyncfarm_carrot.__init__(self, sock, addr, side)
			self.alarmq = side['__alarmq']
		
		def setalarm(self, interval):
			if interval <= 0.0:
				del self.alarmq[self] # this can throw KeyError
			else:
				self.alarmq[self] = time() + interval
		
		def handle_alarm(self):
			self.log_info('unhandled alarm event', 'warning')
		
		def close(self):
			asynchat.async_chat.close(self)
			try:
				del self.alarmq[self]
			except KeyError, IndexError:
				pass
	
	class fancy_asyncfarm(asyncfarm):
		""" ''fancy'' prefork daemon framework with medusa channel """
	
		def __init__(self, address, controller=fancy_asyncfarm_carrot, maxprocess=4, side={}):
			forkfarm.__init__(self, address, controller, maxprocess, 
											socket.SOCK_STREAM, 0, side)
			self.childclass = fancy_asyncfarm_child
	
	class fancy_asyncfarm_child(asyncfarm_child):
		
		def __init__(self, sock, controller, cinfo, side):
			if side.has_key('map'):	self.map = side['map']
			else:					self.map = asyncore.socket_map
	
			if hasattr(select, 'poll'):	self.poll = asyncore.poll3
			else:						self.poll = asyncore.poll
	
			if side.has_key('timeunit'):	self.timeunit = side['timeunit']
			else:							self.timeunit = 30.0

			asyncfarm_child.__init__(self, sock, controller, cinfo, side)
			self.alarmq	= pqueue.PQueue()
			self.side['__alarmq'] = self.alarmq # fd: time
		
		def loop(self):
			signal.signal(signal.SIGTERM, self.signal_terminator)
			
			while self.goahead: # TODO: check graceful terminating on forkfarm
				self.poll(self.timeunit, self.map)
				while self.alarmq and self.alarmq.peek()[0] <= time():
					self.alarmq.pop()[1].handle_alarm()
	
			signal.signal(signal.SIGTERM, signal.SIG_DFL)


if __name__ == "__main__":
	import os

	class mycontrol(fancy_asyncfarm_carrot):
		def init(self):
			self.set_terminator('\n')
			self.push("Hello!! I'm process %d.\n" % os.getpid())
			self.setalarm(5)
		def collect_incoming_data(self, data):
			print ">>>", data
		def found_terminator(self):
			self.push("Oh Yeah!\n")
			self.setalarm(5)
		def handle_alarm(self):
			self.push("Hurry!!\n")
			self.setalarm(5)
	
	fancy_asyncfarm(('', 9988), mycontrol, side={'timeunit': 5.0}).loop()


