webpyapp: installation du service

This commit is contained in:
Jephté Clain 2017-07-18 09:32:17 +04:00
parent ce52d25855
commit eca8c11900
25 changed files with 5755 additions and 4 deletions

View File

@ -0,0 +1,97 @@
#!/bin/bash
# -*- coding: utf-8 mode: sh -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
source /etc/ulibauto || exit 1
urequire debian service conf
function display_help() {
uecho "$scriptname: installer le service
USAGE
$scriptname [options]
OPTIONS
-n, --name NAME
Spécifier le nom du service
-p, --port PORT
Spécifier le port sur lequel doit écouter le serveur
-d, --destdir DESTDIR
Spécifier le répertoire d'installation. Par défaut, l'installation se
fait dans le répertoire $DEFAULT_DESTDIR
-s, --start
Activer et démarrer le service après installation"
}
DEFAULT_DESTDIR=/opt/webpyapps
name=
port=
destdir=
owner=root:
mode=u=rwX,go=rX
overwrite_config=
enable=
start=
args=(
--help '$exit_with display_help'
-n:,--name: name=
-p:,--port: port=
-d:,--destdir: destdir=
--overwrite-config overwrite_config=1
-s,--start '$enable=1; start=1'
)
parse_args "$@"; set -- "${args[@]}"
[ -n "$name" ] || setx name=basename "$scriptdir"
[ -n "$destdir" ] || destdir="$DEFAULT_DESTDIR"
run_as_root -n "$name" ${port:+-p "$port"} \
-d "$destdir" ${overwrite_config:+--overwrite-config} \
${start:+-s}
"$@"
if service "$name" check; then
estep "Arrêt du service"
service "$name" stop
start=1
fi
etitle "Copie des fichiers"
destdir="$destdir/$name"
mkdir -p "$destdir" || die
rsync -a --exclude /server.conf --exclude /install-or-update.sh "$scriptdir/" "$destdir"
eend
etitle "Vérification de la configuration"
if [ -n "$overwrite_config" -o ! -f "$destdir/server.conf" ]; then
estep "Copie de la configuration initiale"
cp "$scriptdir/server.conf" "$destdir"
if [ -n "$port" ]; then
estep "Configuration du port d'écoute $port"
conf_enable "$destdir/server.conf" PORT="$port"
fi
else
enote "Refus d'écraser la configuration existante $destdir/server.conf"
fi
eend
etitle "Correction des permissions"
chown -R "$owner" "$destdir"
chmod -R "$mode" "$destdir"
eend
etitle "Configuration du service"
estep "Copie des fichiers init..."
if [ -d /etc/systemd/system ]; then
estep "... /etc/systemd/system/$name.service"
sed "\
s|@@destdir@@|$destdir|g
s|@@name@@|$name|g" "$destdir/lib/server.service" >"/etc/systemd/system/$name.service"
fi
if [ -n "$enable" ]; then
estep "Activation du service"
service_enable "$name"
fi
if [ -n "$start" ]; then
estep "Démarrage du service"
service "$name" start
fi
eend

View File

@ -0,0 +1 @@
#

View File

@ -0,0 +1 @@
#

View File

@ -0,0 +1,461 @@
# Copyright (c) 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import select
import struct
import socket
import errno
__all__ = ['FCGIApp']
# Constants from the spec.
FCGI_LISTENSOCK_FILENO = 0
FCGI_HEADER_LEN = 8
FCGI_VERSION_1 = 1
FCGI_BEGIN_REQUEST = 1
FCGI_ABORT_REQUEST = 2
FCGI_END_REQUEST = 3
FCGI_PARAMS = 4
FCGI_STDIN = 5
FCGI_STDOUT = 6
FCGI_STDERR = 7
FCGI_DATA = 8
FCGI_GET_VALUES = 9
FCGI_GET_VALUES_RESULT = 10
FCGI_UNKNOWN_TYPE = 11
FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
FCGI_NULL_REQUEST_ID = 0
FCGI_KEEP_CONN = 1
FCGI_RESPONDER = 1
FCGI_AUTHORIZER = 2
FCGI_FILTER = 3
FCGI_REQUEST_COMPLETE = 0
FCGI_CANT_MPX_CONN = 1
FCGI_OVERLOADED = 2
FCGI_UNKNOWN_ROLE = 3
FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
FCGI_MAX_REQS = 'FCGI_MAX_REQS'
FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
FCGI_Header = '!BBHHBx'
FCGI_BeginRequestBody = '!HB5x'
FCGI_EndRequestBody = '!LB3x'
FCGI_UnknownTypeBody = '!B7x'
FCGI_BeginRequestBody_LEN = struct.calcsize(FCGI_BeginRequestBody)
FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
if __debug__:
import time
# Set non-zero to write debug output to a file.
DEBUG = 0
DEBUGLOG = '/tmp/fcgi_app.log'
def _debug(level, msg):
if DEBUG < level:
return
try:
f = open(DEBUGLOG, 'a')
f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
f.close()
except:
pass
def decode_pair(s, pos=0):
"""
Decodes a name/value pair.
The number of bytes decoded as well as the name/value pair
are returned.
"""
nameLength = ord(s[pos])
if nameLength & 128:
nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
pos += 4
else:
pos += 1
valueLength = ord(s[pos])
if valueLength & 128:
valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
pos += 4
else:
pos += 1
name = s[pos:pos+nameLength]
pos += nameLength
value = s[pos:pos+valueLength]
pos += valueLength
return (pos, (name, value))
def encode_pair(name, value):
"""
Encodes a name/value pair.
The encoded string is returned.
"""
nameLength = len(name)
if nameLength < 128:
s = chr(nameLength)
else:
s = struct.pack('!L', nameLength | 0x80000000L)
valueLength = len(value)
if valueLength < 128:
s += chr(valueLength)
else:
s += struct.pack('!L', valueLength | 0x80000000L)
return s + name + value
class Record(object):
"""
A FastCGI Record.
Used for encoding/decoding records.
"""
def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
self.version = FCGI_VERSION_1
self.type = type
self.requestId = requestId
self.contentLength = 0
self.paddingLength = 0
self.contentData = ''
def _recvall(sock, length):
"""
Attempts to receive length bytes from a socket, blocking if necessary.
(Socket may be blocking or non-blocking.)
"""
dataList = []
recvLen = 0
while length:
try:
data = sock.recv(length)
except socket.error, e:
if e[0] == errno.EAGAIN:
select.select([sock], [], [])
continue
else:
raise
if not data: # EOF
break
dataList.append(data)
dataLen = len(data)
recvLen += dataLen
length -= dataLen
return ''.join(dataList), recvLen
_recvall = staticmethod(_recvall)
def read(self, sock):
"""Read and decode a Record from a socket."""
try:
header, length = self._recvall(sock, FCGI_HEADER_LEN)
except:
raise EOFError
if length < FCGI_HEADER_LEN:
raise EOFError
self.version, self.type, self.requestId, self.contentLength, \
self.paddingLength = struct.unpack(FCGI_Header, header)
if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
'contentLength = %d' %
(sock.fileno(), self.type, self.requestId,
self.contentLength))
if self.contentLength:
try:
self.contentData, length = self._recvall(sock,
self.contentLength)
except:
raise EOFError
if length < self.contentLength:
raise EOFError
if self.paddingLength:
try:
self._recvall(sock, self.paddingLength)
except:
raise EOFError
def _sendall(sock, data):
"""
Writes data to a socket and does not return until all the data is sent.
"""
length = len(data)
while length:
try:
sent = sock.send(data)
except socket.error, e:
if e[0] == errno.EAGAIN:
select.select([], [sock], [])
continue
else:
raise
data = data[sent:]
length -= sent
_sendall = staticmethod(_sendall)
def write(self, sock):
"""Encode and write a Record to a socket."""
self.paddingLength = -self.contentLength & 7
if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
'contentLength = %d' %
(sock.fileno(), self.type, self.requestId,
self.contentLength))
header = struct.pack(FCGI_Header, self.version, self.type,
self.requestId, self.contentLength,
self.paddingLength)
self._sendall(sock, header)
if self.contentLength:
self._sendall(sock, self.contentData)
if self.paddingLength:
self._sendall(sock, '\x00'*self.paddingLength)
class FCGIApp(object):
def __init__(self, command=None, connect=None, host=None, port=None,
filterEnviron=True):
if host is not None:
assert port is not None
connect=(host, port)
assert (command is not None and connect is None) or \
(command is None and connect is not None)
self._command = command
self._connect = connect
self._filterEnviron = filterEnviron
#sock = self._getConnection()
#print self._fcgiGetValues(sock, ['FCGI_MAX_CONNS', 'FCGI_MAX_REQS', 'FCGI_MPXS_CONNS'])
#sock.close()
def __call__(self, environ, start_response):
# For sanity's sake, we don't care about FCGI_MPXS_CONN
# (connection multiplexing). For every request, we obtain a new
# transport socket, perform the request, then discard the socket.
# This is, I believe, how mod_fastcgi does things...
sock = self._getConnection()
# Since this is going to be the only request on this connection,
# set the request ID to 1.
requestId = 1
# Begin the request
rec = Record(FCGI_BEGIN_REQUEST, requestId)
rec.contentData = struct.pack(FCGI_BeginRequestBody, FCGI_RESPONDER, 0)
rec.contentLength = FCGI_BeginRequestBody_LEN
rec.write(sock)
# Filter WSGI environ and send it as FCGI_PARAMS
if self._filterEnviron:
params = self._defaultFilterEnviron(environ)
else:
params = self._lightFilterEnviron(environ)
# TODO: Anything not from environ that needs to be sent also?
self._fcgiParams(sock, requestId, params)
self._fcgiParams(sock, requestId, {})
# Transfer wsgi.input to FCGI_STDIN
content_length = int(environ.get('CONTENT_LENGTH') or 0)
while True:
chunk_size = min(content_length, 4096)
s = environ['wsgi.input'].read(chunk_size)
content_length -= len(s)
rec = Record(FCGI_STDIN, requestId)
rec.contentData = s
rec.contentLength = len(s)
rec.write(sock)
if not s: break
# Empty FCGI_DATA stream
rec = Record(FCGI_DATA, requestId)
rec.write(sock)
# Main loop. Process FCGI_STDOUT, FCGI_STDERR, FCGI_END_REQUEST
# records from the application.
result = []
while True:
inrec = Record()
inrec.read(sock)
if inrec.type == FCGI_STDOUT:
if inrec.contentData:
result.append(inrec.contentData)
else:
# TODO: Should probably be pedantic and no longer
# accept FCGI_STDOUT records?
pass
elif inrec.type == FCGI_STDERR:
# Simply forward to wsgi.errors
environ['wsgi.errors'].write(inrec.contentData)
elif inrec.type == FCGI_END_REQUEST:
# TODO: Process appStatus/protocolStatus fields?
break
# Done with this transport socket, close it. (FCGI_KEEP_CONN was not
# set in the FCGI_BEGIN_REQUEST record we sent above. So the
# application is expected to do the same.)
sock.close()
result = ''.join(result)
# Parse response headers from FCGI_STDOUT
status = '200 OK'
headers = []
pos = 0
while True:
eolpos = result.find('\n', pos)
if eolpos < 0: break
line = result[pos:eolpos-1]
pos = eolpos + 1
# strip in case of CR. NB: This will also strip other
# whitespace...
line = line.strip()
# Empty line signifies end of headers
if not line: break
# TODO: Better error handling
header, value = line.split(':', 1)
header = header.strip().lower()
value = value.strip()
if header == 'status':
# Special handling of Status header
status = value
if status.find(' ') < 0:
# Append a dummy reason phrase if one was not provided
status += ' FCGIApp'
else:
headers.append((header, value))
result = result[pos:]
# Set WSGI status, headers, and return result.
start_response(status, headers)
return [result]
def _getConnection(self):
if self._connect is not None:
# The simple case. Create a socket and connect to the
# application.
if type(self._connect) is str:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self._connect)
return sock
# To be done when I have more time...
raise NotImplementedError, 'Launching and managing FastCGI programs not yet implemented'
def _fcgiGetValues(self, sock, vars):
# Construct FCGI_GET_VALUES record
outrec = Record(FCGI_GET_VALUES)
data = []
for name in vars:
data.append(encode_pair(name, ''))
data = ''.join(data)
outrec.contentData = data
outrec.contentLength = len(data)
outrec.write(sock)
# Await response
inrec = Record()
inrec.read(sock)
result = {}
if inrec.type == FCGI_GET_VALUES_RESULT:
pos = 0
while pos < inrec.contentLength:
pos, (name, value) = decode_pair(inrec.contentData, pos)
result[name] = value
return result
def _fcgiParams(self, sock, requestId, params):
rec = Record(FCGI_PARAMS, requestId)
data = []
for name,value in params.items():
data.append(encode_pair(name, value))
data = ''.join(data)
rec.contentData = data
rec.contentLength = len(data)
rec.write(sock)
_environPrefixes = ['SERVER_', 'HTTP_', 'REQUEST_', 'REMOTE_', 'PATH_',
'CONTENT_']
_environCopies = ['SCRIPT_NAME', 'QUERY_STRING', 'AUTH_TYPE']
_environRenames = {}
def _defaultFilterEnviron(self, environ):
result = {}
for n in environ.keys():
for p in self._environPrefixes:
if n.startswith(p):
result[n] = environ[n]
if n in self._environCopies:
result[n] = environ[n]
if n in self._environRenames:
result[self._environRenames[n]] = environ[n]
return result
def _lightFilterEnviron(self, environ):
result = {}
for n in environ.keys():
if n.upper() == n:
result[n] = environ[n]
return result
if __name__ == '__main__':
from flup.server.ajp import WSGIServer
app = FCGIApp(connect=('localhost', 4242))
#import paste.lint
#app = paste.lint.middleware(app)
WSGIServer(app).run()

View File

@ -0,0 +1,176 @@
# Copyright (c) 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import select
import struct
import socket
import errno
__all__ = ['SCGIApp']
def encodeNetstring(s):
return ''.join([str(len(s)), ':', s, ','])
class SCGIApp(object):
def __init__(self, connect=None, host=None, port=None,
filterEnviron=True):
if host is not None:
assert port is not None
connect=(host, port)
assert connect is not None
self._connect = connect
self._filterEnviron = filterEnviron
def __call__(self, environ, start_response):
sock = self._getConnection()
outfile = sock.makefile('w')
infile = sock.makefile('r')
sock.close()
# Filter WSGI environ and send as request headers
if self._filterEnviron:
headers = self._defaultFilterEnviron(environ)
else:
headers = self._lightFilterEnviron(environ)
# TODO: Anything not from environ that needs to be sent also?
content_length = int(environ.get('CONTENT_LENGTH') or 0)
if headers.has_key('CONTENT_LENGTH'):
del headers['CONTENT_LENGTH']
headers_out = ['CONTENT_LENGTH', str(content_length), 'SCGI', '1']
for k,v in headers.items():
headers_out.append(k)
headers_out.append(v)
headers_out.append('') # For trailing NUL
outfile.write(encodeNetstring('\x00'.join(headers_out)))
# Transfer wsgi.input to outfile
while True:
chunk_size = min(content_length, 4096)
s = environ['wsgi.input'].read(chunk_size)
content_length -= len(s)
outfile.write(s)
if not s: break
outfile.close()
# Read result from SCGI server
result = []
while True:
buf = infile.read(4096)
if not buf: break
result.append(buf)
infile.close()
result = ''.join(result)
# Parse response headers
status = '200 OK'
headers = []
pos = 0
while True:
eolpos = result.find('\n', pos)
if eolpos < 0: break
line = result[pos:eolpos-1]
pos = eolpos + 1
# strip in case of CR. NB: This will also strip other
# whitespace...
line = line.strip()
# Empty line signifies end of headers
if not line: break
# TODO: Better error handling
header, value = line.split(':', 1)
header = header.strip().lower()
value = value.strip()
if header == 'status':
# Special handling of Status header
status = value
if status.find(' ') < 0:
# Append a dummy reason phrase if one was not provided
status += ' SCGIApp'
else:
headers.append((header, value))
result = result[pos:]
# Set WSGI status, headers, and return result.
start_response(status, headers)
return [result]
def _getConnection(self):
if type(self._connect) is str:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self._connect)
return sock
_environPrefixes = ['SERVER_', 'HTTP_', 'REQUEST_', 'REMOTE_', 'PATH_',
'CONTENT_']
_environCopies = ['SCRIPT_NAME', 'QUERY_STRING', 'AUTH_TYPE']
_environRenames = {}
def _defaultFilterEnviron(self, environ):
result = {}
for n in environ.keys():
for p in self._environPrefixes:
if n.startswith(p):
result[n] = environ[n]
if n in self._environCopies:
result[n] = environ[n]
if n in self._environRenames:
result[self._environRenames[n]] = environ[n]
return result
def _lightFilterEnviron(self, environ):
result = {}
for n in environ.keys():
if n.upper() == n:
result[n] = environ[n]
return result
if __name__ == '__main__':
from flup.server.ajp import WSGIServer
app = SCGIApp(connect=('localhost', 4000))
#import paste.lint
#app = paste.lint.middleware(app)
WSGIServer(app).run()

View File

@ -0,0 +1 @@
#

View File

@ -0,0 +1,197 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
ajp - an AJP 1.3/WSGI gateway.
For more information about AJP and AJP connectors for your web server, see
<http://jakarta.apache.org/tomcat/connectors-doc/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
import sys
from myapplication import app # Assume app is your WSGI application object
from ajp import WSGIServer
ret = WSGIServer(app).run()
sys.exit(ret and 42 or 0)
See the documentation for WSGIServer for more information.
About the bit of logic at the end:
Upon receiving SIGHUP, the python script will exit with status code 42. This
can be used by a wrapper script to determine if the python script should be
re-run. When a SIGINT or SIGTERM is received, the script exits with status
code 0, possibly indicating a normal exit.
Example wrapper script:
#!/bin/sh
STATUS=42
while test $STATUS -eq 42; do
python "$@" that_script_above.py
STATUS=$?
done
Example workers.properties (for mod_jk):
worker.list=foo
worker.foo.port=8009
worker.foo.host=localhost
worker.foo.type=ajp13
Example httpd.conf (for mod_jk):
JkWorkersFile /path/to/workers.properties
JkMount /* foo
Note that if you mount your ajp application anywhere but the root ("/"), you
SHOULD specifiy scriptName to the WSGIServer constructor. This will ensure
that SCRIPT_NAME/PATH_INFO are correctly deduced.
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import socket
import logging
from flup.server.ajp_base import BaseAJPServer, Connection
from flup.server.threadedserver import ThreadedServer
__all__ = ['WSGIServer']
class WSGIServer(BaseAJPServer, ThreadedServer):
"""
AJP1.3/WSGI server. Runs your WSGI application as a persistant program
that understands AJP1.3. Opens up a TCP socket, binds it, and then
waits for forwarded requests from your webserver.
Why AJP? Two good reasons are that AJP provides load-balancing and
fail-over support. Personally, I just wanted something new to
implement. :)
Of course you will need an AJP1.3 connector for your webserver (e.g.
mod_jk) - see <http://jakarta.apache.org/tomcat/connectors-doc/>.
"""
def __init__(self, application, scriptName='', environ=None,
multithreaded=True, multiprocess=False,
bindAddress=('localhost', 8009), allowedServers=None,
loggingLevel=logging.INFO, debug=True, **kw):
"""
scriptName is the initial portion of the URL path that "belongs"
to your application. It is used to determine PATH_INFO (which doesn't
seem to be passed in). An empty scriptName means your application
is mounted at the root of your virtual host.
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
bindAddress is the address to bind to, which must be a tuple of
length 2. The first element is a string, which is the host name
or IPv4 address of a local interface. The 2nd element is the port
number.
allowedServers must be None or a list of strings representing the
IPv4 addresses of servers allowed to connect. None means accept
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
"""
BaseAJPServer.__init__(self, application,
scriptName=scriptName,
environ=environ,
multithreaded=multithreaded,
multiprocess=multiprocess,
bindAddress=bindAddress,
allowedServers=allowedServers,
loggingLevel=loggingLevel,
debug=debug)
for key in ('jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,),
**kw)
def run(self):
"""
Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
is caught, this method returns True. Returns False otherwise.)
"""
self.logger.info('%s starting up', self.__class__.__name__)
try:
sock = self._setupSocket()
except socket.error, e:
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
self.logger.info('%s shutting down%s', self.__class__.__name__,
self._hupReceived and ' (reload requested)' or '')
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
# Explicitly set bindAddress to *:8009 for testing.
WSGIServer(test_app,
bindAddress=('', 8009), allowedServers=None,
loggingLevel=logging.DEBUG).run()

View File

@ -0,0 +1,956 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import socket
import select
import struct
import signal
import logging
import errno
import datetime
import time
# Unfortunately, for now, threads are required.
import thread
import threading
__all__ = ['BaseAJPServer']
class NoDefault(object):
pass
# Packet header prefixes.
SERVER_PREFIX = '\x12\x34'
CONTAINER_PREFIX = 'AB'
# Server packet types.
PKTTYPE_FWD_REQ = '\x02'
PKTTYPE_SHUTDOWN = '\x07'
PKTTYPE_PING = '\x08'
PKTTYPE_CPING = '\x0a'
# Container packet types.
PKTTYPE_SEND_BODY = '\x03'
PKTTYPE_SEND_HEADERS = '\x04'
PKTTYPE_END_RESPONSE = '\x05'
PKTTYPE_GET_BODY = '\x06'
PKTTYPE_CPONG = '\x09'
# Code tables for methods/headers/attributes.
methodTable = [
None,
'OPTIONS',
'GET',
'HEAD',
'POST',
'PUT',
'DELETE',
'TRACE',
'PROPFIND',
'PROPPATCH',
'MKCOL',
'COPY',
'MOVE',
'LOCK',
'UNLOCK',
'ACL',
'REPORT',
'VERSION-CONTROL',
'CHECKIN',
'CHECKOUT',
'UNCHECKOUT',
'SEARCH',
'MKWORKSPACE',
'UPDATE',
'LABEL',
'MERGE',
'BASELINE_CONTROL',
'MKACTIVITY'
]
requestHeaderTable = [
None,
'Accept',
'Accept-Charset',
'Accept-Encoding',
'Accept-Language',
'Authorization',
'Connection',
'Content-Type',
'Content-Length',
'Cookie',
'Cookie2',
'Host',
'Pragma',
'Referer',
'User-Agent'
]
attributeTable = [
None,
'CONTEXT',
'SERVLET_PATH',
'REMOTE_USER',
'AUTH_TYPE',
'QUERY_STRING',
'JVM_ROUTE',
'SSL_CERT',
'SSL_CIPHER',
'SSL_SESSION',
None, # name follows
'SSL_KEY_SIZE'
]
responseHeaderTable = [
None,
'content-type',
'content-language',
'content-length',
'date',
'last-modified',
'location',
'set-cookie',
'set-cookie2',
'servlet-engine',
'status',
'www-authenticate'
]
# The main classes use this name for logging.
LoggerName = 'ajp-wsgi'
# Set up module-level logger.
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
'%Y-%m-%d %H:%M:%S'))
logging.getLogger(LoggerName).addHandler(console)
del console
class ProtocolError(Exception):
"""
Exception raised when the server does something unexpected or
sends garbled data. Usually leads to a Connection closing.
"""
pass
def decodeString(data, pos=0):
"""Decode a string."""
try:
length = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
if length == 0xffff: # This was undocumented!
return '', pos
s = data[pos:pos+length]
return s, pos+length+1 # Don't forget NUL
except Exception, e:
raise ProtocolError, 'decodeString: '+str(e)
def decodeRequestHeader(data, pos=0):
"""Decode a request header/value pair."""
try:
if data[pos] == '\xa0':
# Use table
i = ord(data[pos+1])
name = requestHeaderTable[i]
if name is None:
raise ValueError, 'bad request header code'
pos += 2
else:
name, pos = decodeString(data, pos)
value, pos = decodeString(data, pos)
return name, value, pos
except Exception, e:
raise ProtocolError, 'decodeRequestHeader: '+str(e)
def decodeAttribute(data, pos=0):
"""Decode a request attribute."""
try:
i = ord(data[pos])
pos += 1
if i == 0xff:
# end
return None, None, pos
elif i == 0x0a:
# name follows
name, pos = decodeString(data, pos)
elif i == 0x0b:
# Special handling of SSL_KEY_SIZE.
name = attributeTable[i]
# Value is an int, not a string.
value = struct.unpack('>H', data[pos:pos+2])[0]
return name, str(value), pos+2
else:
name = attributeTable[i]
if name is None:
raise ValueError, 'bad attribute code'
value, pos = decodeString(data, pos)
return name, value, pos
except Exception, e:
raise ProtocolError, 'decodeAttribute: '+str(e)
def encodeString(s):
"""Encode a string."""
return struct.pack('>H', len(s)) + s + '\x00'
def encodeResponseHeader(name, value):
"""Encode a response header/value pair."""
lname = name.lower()
if lname in responseHeaderTable:
# Use table
i = responseHeaderTable.index(lname)
out = '\xa0' + chr(i)
else:
out = encodeString(name)
out += encodeString(value)
return out
class Packet(object):
"""An AJP message packet."""
def __init__(self):
self.data = ''
# Don't set this on write, it will be calculated automatically.
self.length = 0
def _recvall(sock, length):
"""
Attempts to receive length bytes from a socket, blocking if necessary.
(Socket may be blocking or non-blocking.)
"""
dataList = []
recvLen = 0
while length:
try:
data = sock.recv(length)
except socket.error, e:
if e[0] == errno.EAGAIN:
select.select([sock], [], [])
continue
else:
raise
if not data: # EOF
break
dataList.append(data)
dataLen = len(data)
recvLen += dataLen
length -= dataLen
return ''.join(dataList), recvLen
_recvall = staticmethod(_recvall)
def read(self, sock):
"""Attempt to read a packet from the server."""
try:
header, length = self._recvall(sock, 4)
except socket.error:
# Treat any sort of socket errors as EOF (close Connection).
raise EOFError
if length < 4:
raise EOFError
if header[:2] != SERVER_PREFIX:
raise ProtocolError, 'invalid header'
self.length = struct.unpack('>H', header[2:4])[0]
if self.length:
try:
self.data, length = self._recvall(sock, self.length)
except socket.error:
raise EOFError
if length < self.length:
raise EOFError
def _sendall(sock, data):
"""
Writes data to a socket and does not return until all the data is sent.
"""
length = len(data)
while length:
try:
sent = sock.send(data)
except socket.error, e:
if e[0] == errno.EAGAIN:
select.select([], [sock], [])
continue
else:
raise
data = data[sent:]
length -= sent
_sendall = staticmethod(_sendall)
def write(self, sock):
"""Send a packet to the server."""
self.length = len(self.data)
self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length))
if self.length:
self._sendall(sock, self.data)
class InputStream(object):
"""
File-like object that represents the request body (if any). Supports
the bare mininum methods required by the WSGI spec. Thanks to
StringIO for ideas.
"""
def __init__(self, conn):
self._conn = conn
# See WSGIServer.
self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
self._buf = ''
self._bufList = []
self._pos = 0 # Current read position.
self._avail = 0 # Number of bytes currently available.
self._length = 0 # Set to Content-Length in request.
self.logger = logging.getLogger(LoggerName)
def bytesAvailForAdd(self):
return self._length - self._avail
def _shrinkBuffer(self):
"""Gets rid of already read data (since we can't rewind)."""
if self._pos >= self._shrinkThreshold:
self._buf = self._buf[self._pos:]
self._avail -= self._pos
self._length -= self._pos
self._pos = 0
assert self._avail >= 0 and self._length >= 0
def _waitForData(self):
toAdd = min(self.bytesAvailForAdd(), 0xffff)
assert toAdd > 0
pkt = Packet()
pkt.data = PKTTYPE_GET_BODY + \
struct.pack('>H', toAdd)
self._conn.writePacket(pkt)
self._conn.processInput()
def read(self, n=-1):
if self._pos == self._length:
return ''
while True:
if n < 0 or (self._avail - self._pos) < n:
# Not enough data available.
if not self.bytesAvailForAdd():
# And there's no more coming.
newPos = self._avail
break
else:
# Ask for more data and wait.
self._waitForData()
continue
else:
newPos = self._pos + n
break
# Merge buffer list, if necessary.
if self._bufList:
self._buf += ''.join(self._bufList)
self._bufList = []
r = self._buf[self._pos:newPos]
self._pos = newPos
self._shrinkBuffer()
return r
def readline(self, length=None):
if self._pos == self._length:
return ''
while True:
# Unfortunately, we need to merge the buffer list early.
if self._bufList:
self._buf += ''.join(self._bufList)
self._bufList = []
# Find newline.
i = self._buf.find('\n', self._pos)
if i < 0:
# Not found?
if not self.bytesAvailForAdd():
# No more data coming.
newPos = self._avail
break
else:
if length is not None and len(self._buf) >= length + self._pos:
newPos = self._pos + length
break
# Wait for more to come.
self._waitForData()
continue
else:
newPos = i + 1
break
r = self._buf[self._pos:newPos]
self._pos = newPos
self._shrinkBuffer()
return r
def readlines(self, sizehint=0):
total = 0
lines = []
line = self.readline()
while line:
lines.append(line)
total += len(line)
if 0 < sizehint <= total:
break
line = self.readline()
return lines
def __iter__(self):
return self
def next(self):
r = self.readline()
if not r:
raise StopIteration
return r
def setDataLength(self, length):
"""
Once Content-Length is known, Request calls this method to set it.
"""
self._length = length
def addData(self, data):
"""
Adds data from the server to this InputStream. Note that we never ask
the server for data beyond the Content-Length, so the server should
never send us an EOF (empty string argument).
"""
if not data:
raise ProtocolError, 'short data'
self._bufList.append(data)
length = len(data)
self._avail += length
if self._avail > self._length:
raise ProtocolError, 'too much data'
class Request(object):
"""
A Request object. A more fitting name would probably be Transaction, but
it's named Request to mirror my FastCGI driver. :) This object
encapsulates all the data about the HTTP request and allows the handler
to send a response.
The only attributes/methods that the handler should concern itself
with are: environ, input, startResponse(), and write().
"""
# Do not ever change the following value.
_maxWrite = 8192 - 4 - 3 - 1 # 8k - pkt header - send body header - NUL
def __init__(self, conn):
self._conn = conn
self.environ = {}
self.input = InputStream(conn)
self._headersSent = False
self.logger = logging.getLogger(LoggerName)
def run(self):
self.logger.info('%s %s',
self.environ['REQUEST_METHOD'],
self.environ['REQUEST_URI'])
start = datetime.datetime.now()
try:
self._conn.server.handler(self)
except:
self.logger.exception('Exception caught from handler')
if not self._headersSent:
self._conn.server.error(self)
end = datetime.datetime.now()
# Notify server of end of response (reuse flag is set to true).
pkt = Packet()
pkt.data = PKTTYPE_END_RESPONSE + '\x01'
self._conn.writePacket(pkt)
handlerTime = end - start
self.logger.debug('%s %s done (%.3f secs)',
self.environ['REQUEST_METHOD'],
self.environ['REQUEST_URI'],
handlerTime.seconds +
handlerTime.microseconds / 1000000.0)
# The following methods are called from the Connection to set up this
# Request.
def setMethod(self, value):
self.environ['REQUEST_METHOD'] = value
def setProtocol(self, value):
self.environ['SERVER_PROTOCOL'] = value
def setRequestURI(self, value):
self.environ['REQUEST_URI'] = value
def setRemoteAddr(self, value):
self.environ['REMOTE_ADDR'] = value
def setRemoteHost(self, value):
self.environ['REMOTE_HOST'] = value
def setServerName(self, value):
self.environ['SERVER_NAME'] = value
def setServerPort(self, value):
self.environ['SERVER_PORT'] = str(value)
def setIsSSL(self, value):
if value:
self.environ['HTTPS'] = 'on'
def addHeader(self, name, value):
name = name.replace('-', '_').upper()
if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
self.environ[name] = value
if name == 'CONTENT_LENGTH':
length = int(value)
self.input.setDataLength(length)
else:
self.environ['HTTP_'+name] = value
def addAttribute(self, name, value):
self.environ[name] = value
# The only two methods that should be called from the handler.
def startResponse(self, statusCode, statusMsg, headers):
"""
Begin the HTTP response. This must only be called once and it
must be called before any calls to write().
statusCode is the integer status code (e.g. 200). statusMsg
is the associated reason message (e.g.'OK'). headers is a list
of 2-tuples - header name/value pairs. (Both header name and value
must be strings.)
"""
assert not self._headersSent, 'Headers already sent!'
pkt = Packet()
pkt.data = PKTTYPE_SEND_HEADERS + \
struct.pack('>H', statusCode) + \
encodeString(statusMsg) + \
struct.pack('>H', len(headers)) + \
''.join([encodeResponseHeader(name, value)
for name,value in headers])
self._conn.writePacket(pkt)
self._headersSent = True
def write(self, data):
"""
Write data (which comprises the response body). Note that due to
restrictions on AJP packet size, we limit our writes to 8185 bytes
each packet.
"""
assert self._headersSent, 'Headers must be sent first!'
bytesLeft = len(data)
while bytesLeft:
toWrite = min(bytesLeft, self._maxWrite)
pkt = Packet()
pkt.data = PKTTYPE_SEND_BODY + \
struct.pack('>H', toWrite) + \
data[:toWrite] + '\x00' # Undocumented
self._conn.writePacket(pkt)
data = data[toWrite:]
bytesLeft -= toWrite
class Connection(object):
"""
A single Connection with the server. Requests are not multiplexed over the
same connection, so at any given time, the Connection is either
waiting for a request, or processing a single request.
"""
def __init__(self, sock, addr, server):
self.server = server
self._sock = sock
self._addr = addr
self._request = None
self.logger = logging.getLogger(LoggerName)
def run(self):
self.logger.debug('Connection starting up (%s:%d)',
self._addr[0], self._addr[1])
# Main loop. Errors will cause the loop to be exited and
# the socket to be closed.
while True:
try:
self.processInput()
except ProtocolError, e:
self.logger.error("Protocol error '%s'", str(e))
break
except (EOFError, KeyboardInterrupt):
break
except:
self.logger.exception('Exception caught in Connection')
break
self.logger.debug('Connection shutting down (%s:%d)',
self._addr[0], self._addr[1])
self._sock.close()
def processInput(self):
"""Wait for and process a single packet."""
pkt = Packet()
select.select([self._sock], [], [])
pkt.read(self._sock)
# Body chunks have no packet type code.
if self._request is not None:
self._processBody(pkt)
return
if not pkt.length:
raise ProtocolError, 'unexpected empty packet'
pkttype = pkt.data[0]
if pkttype == PKTTYPE_FWD_REQ:
self._forwardRequest(pkt)
elif pkttype == PKTTYPE_SHUTDOWN:
self._shutdown(pkt)
elif pkttype == PKTTYPE_PING:
self._ping(pkt)
elif pkttype == PKTTYPE_CPING:
self._cping(pkt)
else:
raise ProtocolError, 'unknown packet type'
def _forwardRequest(self, pkt):
"""
Creates a Request object, fills it in from the packet, then runs it.
"""
assert self._request is None
req = self.server.requestClass(self)
i = ord(pkt.data[1])
method = methodTable[i]
if method is None:
raise ValueError, 'bad method field'
req.setMethod(method)
value, pos = decodeString(pkt.data, 2)
req.setProtocol(value)
value, pos = decodeString(pkt.data, pos)
req.setRequestURI(value)
value, pos = decodeString(pkt.data, pos)
req.setRemoteAddr(value)
value, pos = decodeString(pkt.data, pos)
req.setRemoteHost(value)
value, pos = decodeString(pkt.data, pos)
req.setServerName(value)
value = struct.unpack('>H', pkt.data[pos:pos+2])[0]
req.setServerPort(value)
i = ord(pkt.data[pos+2])
req.setIsSSL(i != 0)
# Request headers.
numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0]
pos += 5
for i in range(numHeaders):
name, value, pos = decodeRequestHeader(pkt.data, pos)
req.addHeader(name, value)
# Attributes.
while True:
name, value, pos = decodeAttribute(pkt.data, pos)
if name is None:
break
req.addAttribute(name, value)
self._request = req
# Read first body chunk, if needed.
if req.input.bytesAvailForAdd():
self.processInput()
# Run Request.
req.run()
self._request = None
def _shutdown(self, pkt):
"""Not sure what to do with this yet."""
self.logger.info('Received shutdown request from server')
def _ping(self, pkt):
"""I have no idea what this packet means."""
self.logger.debug('Received ping')
def _cping(self, pkt):
"""Respond to a PING (CPING) packet."""
self.logger.debug('Received PING, sending PONG')
pkt = Packet()
pkt.data = PKTTYPE_CPONG
self.writePacket(pkt)
def _processBody(self, pkt):
"""
Handles a body chunk from the server by appending it to the
InputStream.
"""
if pkt.length:
length = struct.unpack('>H', pkt.data[:2])[0]
self._request.input.addData(pkt.data[2:2+length])
else:
# Shouldn't really ever get here.
self._request.input.addData('')
def writePacket(self, pkt):
"""Sends a Packet to the server."""
pkt.write(self._sock)
class BaseAJPServer(object):
# What Request class to use.
requestClass = Request
# Limits the size of the InputStream's string buffer to this size + 8k.
# Since the InputStream is not seekable, we throw away already-read
# data once this certain amount has been read. (The 8k is there because
# it is the maximum size of new data added per chunk.)
inputStreamShrinkThreshold = 102400 - 8192
def __init__(self, application, scriptName='', environ=None,
multithreaded=True, multiprocess=False,
bindAddress=('localhost', 8009), allowedServers=NoDefault,
loggingLevel=logging.INFO, debug=True):
"""
scriptName is the initial portion of the URL path that "belongs"
to your application. It is used to determine PATH_INFO (which doesn't
seem to be passed in). An empty scriptName means your application
is mounted at the root of your virtual host.
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
Set multithreaded to False if your application is not thread-safe.
Set multiprocess to True to explicitly set wsgi.multiprocess to
True. (Only makes sense with threaded servers.)
bindAddress is the address to bind to, which must be a tuple of
length 2. The first element is a string, which is the host name
or IPv4 address of a local interface. The 2nd element is the port
number.
allowedServers must be None or a list of strings representing the
IPv4 addresses of servers allowed to connect. None means accept
connections from anywhere. By default, it is a list containing
the single item '127.0.0.1'.
loggingLevel sets the logging level of the module-level logger.
"""
if environ is None:
environ = {}
self.application = application
self.scriptName = scriptName
self.environ = environ
self.multithreaded = multithreaded
self.multiprocess = multiprocess
self.debug = debug
self._bindAddress = bindAddress
if allowedServers is NoDefault:
allowedServers = ['127.0.0.1']
self._allowedServers = allowedServers
# Used to force single-threadedness.
self._appLock = thread.allocate_lock()
self.logger = logging.getLogger(LoggerName)
self.logger.setLevel(loggingLevel)
def _setupSocket(self):
"""Creates and binds the socket for communication with the server."""
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(self._bindAddress)
sock.listen(socket.SOMAXCONN)
return sock
def _cleanupSocket(self, sock):
"""Closes the main socket."""
sock.close()
def _isClientAllowed(self, addr):
ret = self._allowedServers is None or addr[0] in self._allowedServers
if not ret:
self.logger.warning('Server connection from %s disallowed',
addr[0])
return ret
def handler(self, request):
"""
WSGI handler. Sets up WSGI environment, calls the application,
and sends the application's response.
"""
environ = request.environ
environ.update(self.environ)
environ['wsgi.version'] = (1,0)
environ['wsgi.input'] = request.input
environ['wsgi.errors'] = sys.stderr
environ['wsgi.multithread'] = self.multithreaded
environ['wsgi.multiprocess'] = self.multiprocess
environ['wsgi.run_once'] = False
if environ.get('HTTPS', 'off') in ('on', '1'):
environ['wsgi.url_scheme'] = 'https'
else:
environ['wsgi.url_scheme'] = 'http'
self._sanitizeEnv(environ)
headers_set = []
headers_sent = []
result = None
def write(data):
assert type(data) is str, 'write() argument must be string'
assert headers_set, 'write() before start_response()'
if not headers_sent:
status, responseHeaders = headers_sent[:] = headers_set
statusCode = int(status[:3])
statusMsg = status[4:]
found = False
for header,value in responseHeaders:
if header.lower() == 'content-length':
found = True
break
if not found and result is not None:
try:
if len(result) == 1:
responseHeaders.append(('Content-Length',
str(len(data))))
except:
pass
request.startResponse(statusCode, statusMsg, responseHeaders)
request.write(data)
def start_response(status, response_headers, exc_info=None):
if exc_info:
try:
if headers_sent:
# Re-raise if too late
raise exc_info[0], exc_info[1], exc_info[2]
finally:
exc_info = None # avoid dangling circular ref
else:
assert not headers_set, 'Headers already set!'
assert type(status) is str, 'Status must be a string'
assert len(status) >= 4, 'Status must be at least 4 characters'
assert int(status[:3]), 'Status must begin with 3-digit code'
assert status[3] == ' ', 'Status must have a space after code'
assert type(response_headers) is list, 'Headers must be a list'
if __debug__:
for name,val in response_headers:
assert type(name) is str, 'Header name "%s" must be a string' % name
assert type(val) is str, 'Value of header "%s" must be a string' % name
headers_set[:] = [status, response_headers]
return write
if not self.multithreaded:
self._appLock.acquire()
try:
try:
result = self.application(environ, start_response)
try:
for data in result:
if data:
write(data)
if not headers_sent:
write('') # in case body was empty
finally:
if hasattr(result, 'close'):
result.close()
except socket.error, e:
if e[0] != errno.EPIPE:
raise # Don't let EPIPE propagate beyond server
finally:
if not self.multithreaded:
self._appLock.release()
def _sanitizeEnv(self, environ):
"""Fill-in/deduce missing values in environ."""
# Namely SCRIPT_NAME/PATH_INFO
value = environ['REQUEST_URI']
scriptName = environ.get('WSGI_SCRIPT_NAME', self.scriptName)
if not value.startswith(scriptName):
self.logger.warning('scriptName does not match request URI')
environ['PATH_INFO'] = value[len(scriptName):]
environ['SCRIPT_NAME'] = scriptName
reqUri = None
if environ.has_key('REQUEST_URI'):
reqUri = environ['REQUEST_URI'].split('?', 1)
if not environ.has_key('QUERY_STRING') or not environ['QUERY_STRING']:
if reqUri is not None and len(reqUri) > 1:
environ['QUERY_STRING'] = reqUri[1]
else:
environ['QUERY_STRING'] = ''
def error(self, request):
"""
Override to provide custom error handling. Ideally, however,
all errors should be caught at the application level.
"""
if self.debug:
request.startResponse(200, 'OK', [('Content-Type', 'text/html')])
import cgitb
request.write(cgitb.html(sys.exc_info()))
else:
errorpage = """<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html><head>
<title>Unhandled Exception</title>
</head><body>
<h1>Unhandled Exception</h1>
<p>An unhandled exception was thrown by the application.</p>
</body></html>
"""
request.startResponse(200, 'OK', [('Content-Type', 'text/html')])
request.write(errorpage)

View File

@ -0,0 +1,195 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
ajp - an AJP 1.3/WSGI gateway.
For more information about AJP and AJP connectors for your web server, see
<http://jakarta.apache.org/tomcat/connectors-doc/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
import sys
from myapplication import app # Assume app is your WSGI application object
from ajp import WSGIServer
ret = WSGIServer(app).run()
sys.exit(ret and 42 or 0)
See the documentation for WSGIServer for more information.
About the bit of logic at the end:
Upon receiving SIGHUP, the python script will exit with status code 42. This
can be used by a wrapper script to determine if the python script should be
re-run. When a SIGINT or SIGTERM is received, the script exits with status
code 0, possibly indicating a normal exit.
Example wrapper script:
#!/bin/sh
STATUS=42
while test $STATUS -eq 42; do
python "$@" that_script_above.py
STATUS=$?
done
Example workers.properties (for mod_jk):
worker.list=foo
worker.foo.port=8009
worker.foo.host=localhost
worker.foo.type=ajp13
Example httpd.conf (for mod_jk):
JkWorkersFile /path/to/workers.properties
JkMount /* foo
Note that if you mount your ajp application anywhere but the root ("/"), you
SHOULD specifiy scriptName to the WSGIServer constructor. This will ensure
that SCRIPT_NAME/PATH_INFO are correctly deduced.
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import socket
import logging
from flup.server.ajp_base import BaseAJPServer, Connection
from flup.server.preforkserver import PreforkServer
__all__ = ['WSGIServer']
class WSGIServer(BaseAJPServer, PreforkServer):
"""
AJP1.3/WSGI server. Runs your WSGI application as a persistant program
that understands AJP1.3. Opens up a TCP socket, binds it, and then
waits for forwarded requests from your webserver.
Why AJP? Two good reasons are that AJP provides load-balancing and
fail-over support. Personally, I just wanted something new to
implement. :)
Of course you will need an AJP1.3 connector for your webserver (e.g.
mod_jk) - see <http://jakarta.apache.org/tomcat/connectors-doc/>.
"""
def __init__(self, application, scriptName='', environ=None,
bindAddress=('localhost', 8009), allowedServers=None,
loggingLevel=logging.INFO, debug=True, **kw):
"""
scriptName is the initial portion of the URL path that "belongs"
to your application. It is used to determine PATH_INFO (which doesn't
seem to be passed in). An empty scriptName means your application
is mounted at the root of your virtual host.
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
bindAddress is the address to bind to, which must be a tuple of
length 2. The first element is a string, which is the host name
or IPv4 address of a local interface. The 2nd element is the port
number.
allowedServers must be None or a list of strings representing the
IPv4 addresses of servers allowed to connect. None means accept
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
"""
BaseAJPServer.__init__(self, application,
scriptName=scriptName,
environ=environ,
multithreaded=False,
multiprocess=True,
bindAddress=bindAddress,
allowedServers=allowedServers,
loggingLevel=loggingLevel,
debug=debug)
for key in ('multithreaded', 'multiprocess', 'jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
def run(self):
"""
Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
is caught, this method returns True. Returns False otherwise.)
"""
self.logger.info('%s starting up', self.__class__.__name__)
try:
sock = self._setupSocket()
except socket.error, e:
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
ret = PreforkServer.run(self, sock)
self._cleanupSocket(sock)
self.logger.info('%s shutting down%s', self.__class__.__name__,
self._hupReceived and ' (reload requested)' or '')
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
# Explicitly set bindAddress to *:8009 for testing.
WSGIServer(test_app,
bindAddress=('', 8009), allowedServers=None,
loggingLevel=logging.DEBUG).run()

View File

@ -0,0 +1,71 @@
# Taken from <http://www.python.org/dev/peps/pep-0333/>
# which was placed in the public domain.
import os, sys
__all__ = ['WSGIServer']
class WSGIServer(object):
def __init__(self, application):
self.application = application
def run(self):
environ = dict(os.environ.items())
environ['wsgi.input'] = sys.stdin
environ['wsgi.errors'] = sys.stderr
environ['wsgi.version'] = (1,0)
environ['wsgi.multithread'] = False
environ['wsgi.multiprocess'] = True
environ['wsgi.run_once'] = True
if environ.get('HTTPS','off') in ('on','1'):
environ['wsgi.url_scheme'] = 'https'
else:
environ['wsgi.url_scheme'] = 'http'
headers_set = []
headers_sent = []
def write(data):
if not headers_set:
raise AssertionError("write() before start_response()")
elif not headers_sent:
# Before the first output, send the stored headers
status, response_headers = headers_sent[:] = headers_set
sys.stdout.write('Status: %s\r\n' % status)
for header in response_headers:
sys.stdout.write('%s: %s\r\n' % header)
sys.stdout.write('\r\n')
sys.stdout.write(data)
sys.stdout.flush()
def start_response(status,response_headers,exc_info=None):
if exc_info:
try:
if headers_sent:
# Re-raise original exception if headers sent
raise exc_info[0], exc_info[1], exc_info[2]
finally:
exc_info = None # avoid dangling circular ref
elif headers_set:
raise AssertionError("Headers already set!")
headers_set[:] = [status,response_headers]
return write
result = self.application(environ, start_response)
try:
for data in result:
if data: # don't send headers until body appears
write(data)
if not headers_sent:
write('') # send headers now if body was empty
finally:
if hasattr(result,'close'):
result.close()

View File

@ -0,0 +1,149 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
fcgi - a FastCGI/WSGI gateway.
For more information about FastCGI, see <http://www.fastcgi.com/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
from myapplication import app # Assume app is your WSGI application object
from fcgi import WSGIServer
WSGIServer(app).run()
See the documentation for WSGIServer for more information.
On most platforms, fcgi will fallback to regular CGI behavior if run in a
non-FastCGI context. If you want to force CGI behavior, set the environment
variable FCGI_FORCE_CGI to "Y" or "y".
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import os
from flup.server.fcgi_base import BaseFCGIServer, FCGI_RESPONDER
from flup.server.threadedserver import ThreadedServer
__all__ = ['WSGIServer']
class WSGIServer(BaseFCGIServer, ThreadedServer):
"""
FastCGI server that supports the Web Server Gateway Interface. See
<http://www.python.org/peps/pep-0333.html>.
"""
def __init__(self, application, environ=None,
multithreaded=True, multiprocess=False,
bindAddress=None, umask=None, multiplexed=False,
debug=True, roles=(FCGI_RESPONDER,), forceCGI=False, **kw):
"""
environ, if present, must be a dictionary-like object. Its
contents will be copied into application's environ. Useful
for passing application-specific variables.
bindAddress, if present, must either be a string or a 2-tuple. If
present, run() will open its own listening socket. You would use
this if you wanted to run your application as an 'external' FastCGI
app. (i.e. the webserver would no longer be responsible for starting
your app) If a string, it will be interpreted as a filename and a UNIX
socket will be opened. If a tuple, the first element, a string,
is the interface name/IP to bind to, and the second element (an int)
is the port number.
"""
BaseFCGIServer.__init__(self, application,
environ=environ,
multithreaded=multithreaded,
multiprocess=multiprocess,
bindAddress=bindAddress,
umask=umask,
multiplexed=multiplexed,
debug=debug,
roles=roles,
forceCGI=forceCGI)
for key in ('jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
ThreadedServer.__init__(self, jobClass=self._connectionClass,
jobArgs=(self,), **kw)
def _isClientAllowed(self, addr):
return self._web_server_addrs is None or \
(len(addr) == 2 and addr[0] in self._web_server_addrs)
def run(self):
"""
The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
SIGHUP was received, False otherwise.
"""
self._web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
if self._web_server_addrs is not None:
self._web_server_addrs = map(lambda x: x.strip(),
self._web_server_addrs.split(','))
sock = self._setupSocket()
ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
WSGIServer(test_app).run()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,168 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
fcgi - a FastCGI/WSGI gateway.
For more information about FastCGI, see <http://www.fastcgi.com/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
from myapplication import app # Assume app is your WSGI application object
from fcgi import WSGIServer
WSGIServer(app).run()
See the documentation for WSGIServer for more information.
On most platforms, fcgi will fallback to regular CGI behavior if run in a
non-FastCGI context. If you want to force CGI behavior, set the environment
variable FCGI_FORCE_CGI to "Y" or "y".
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import os
from flup.server.fcgi_base import BaseFCGIServer, FCGI_RESPONDER, \
FCGI_MAX_CONNS, FCGI_MAX_REQS, FCGI_MPXS_CONNS
from flup.server.preforkserver import PreforkServer
__all__ = ['WSGIServer']
class WSGIServer(BaseFCGIServer, PreforkServer):
"""
FastCGI server that supports the Web Server Gateway Interface. See
<http://www.python.org/peps/pep-0333.html>.
"""
def __init__(self, application, environ=None,
bindAddress=None, umask=None, multiplexed=False,
debug=True, roles=(FCGI_RESPONDER,), forceCGI=False, **kw):
"""
environ, if present, must be a dictionary-like object. Its
contents will be copied into application's environ. Useful
for passing application-specific variables.
bindAddress, if present, must either be a string or a 2-tuple. If
present, run() will open its own listening socket. You would use
this if you wanted to run your application as an 'external' FastCGI
app. (i.e. the webserver would no longer be responsible for starting
your app) If a string, it will be interpreted as a filename and a UNIX
socket will be opened. If a tuple, the first element, a string,
is the interface name/IP to bind to, and the second element (an int)
is the port number.
"""
BaseFCGIServer.__init__(self, application,
environ=environ,
multithreaded=False,
multiprocess=True,
bindAddress=bindAddress,
umask=umask,
multiplexed=multiplexed,
debug=debug,
roles=roles,
forceCGI=forceCGI)
for key in ('multithreaded', 'multiprocess', 'jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
PreforkServer.__init__(self, jobClass=self._connectionClass,
jobArgs=(self,), **kw)
try:
import resource
# Attempt to glean the maximum number of connections
# from the OS.
try:
maxProcs = resource.getrlimit(resource.RLIMIT_NPROC)[0]
maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
maxConns = min(maxConns, maxProcs)
except AttributeError:
maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
except ImportError:
maxConns = 100 # Just some made up number.
maxReqs = maxConns
self.capability = {
FCGI_MAX_CONNS: maxConns,
FCGI_MAX_REQS: maxReqs,
FCGI_MPXS_CONNS: 0
}
def _isClientAllowed(self, addr):
return self._web_server_addrs is None or \
(len(addr) == 2 and addr[0] in self._web_server_addrs)
def run(self):
"""
The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
SIGHUP was received, False otherwise.
"""
self._web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
if self._web_server_addrs is not None:
self._web_server_addrs = map(lambda x: x.strip(),
self._web_server_addrs.split(','))
sock = self._setupSocket()
ret = PreforkServer.run(self, sock)
self._cleanupSocket(sock)
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
WSGIServer(test_app).run()

View File

@ -0,0 +1,154 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
fcgi - a FastCGI/WSGI gateway.
For more information about FastCGI, see <http://www.fastcgi.com/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
from myapplication import app # Assume app is your WSGI application object
from fcgi import WSGIServer
WSGIServer(app).run()
See the documentation for WSGIServer for more information.
On most platforms, fcgi will fallback to regular CGI behavior if run in a
non-FastCGI context. If you want to force CGI behavior, set the environment
variable FCGI_FORCE_CGI to "Y" or "y".
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import os
from flup.server.fcgi_base import BaseFCGIServer, FCGI_RESPONDER, \
FCGI_MAX_CONNS, FCGI_MAX_REQS, FCGI_MPXS_CONNS
from flup.server.singleserver import SingleServer
__all__ = ['WSGIServer']
class WSGIServer(BaseFCGIServer, SingleServer):
"""
FastCGI server that supports the Web Server Gateway Interface. See
<http://www.python.org/peps/pep-0333.html>.
"""
def __init__(self, application, environ=None,
bindAddress=None, umask=None, multiplexed=False,
debug=True, roles=(FCGI_RESPONDER,), forceCGI=False, **kw):
"""
environ, if present, must be a dictionary-like object. Its
contents will be copied into application's environ. Useful
for passing application-specific variables.
bindAddress, if present, must either be a string or a 2-tuple. If
present, run() will open its own listening socket. You would use
this if you wanted to run your application as an 'external' FastCGI
app. (i.e. the webserver would no longer be responsible for starting
your app) If a string, it will be interpreted as a filename and a UNIX
socket will be opened. If a tuple, the first element, a string,
is the interface name/IP to bind to, and the second element (an int)
is the port number.
"""
BaseFCGIServer.__init__(self, application,
environ=environ,
multithreaded=False,
multiprocess=False,
bindAddress=bindAddress,
umask=umask,
multiplexed=multiplexed,
debug=debug,
roles=roles,
forceCGI=forceCGI)
for key in ('jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
SingleServer.__init__(self, jobClass=self._connectionClass,
jobArgs=(self,), **kw)
self.capability = {
FCGI_MAX_CONNS: 1,
FCGI_MAX_REQS: 1,
FCGI_MPXS_CONNS: 0
}
def _isClientAllowed(self, addr):
return self._web_server_addrs is None or \
(len(addr) == 2 and addr[0] in self._web_server_addrs)
def run(self):
"""
The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
SIGHUP was received, False otherwise.
"""
self._web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
if self._web_server_addrs is not None:
self._web_server_addrs = map(lambda x: x.strip(),
self._web_server_addrs.split(','))
sock = self._setupSocket()
ret = SingleServer.run(self, sock)
self._cleanupSocket(sock)
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
WSGIServer(test_app).run()

View File

@ -0,0 +1,121 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
def asbool(obj):
if isinstance(obj, (str, unicode)):
obj = obj.strip().lower()
if obj in ['true', 'yes', 'on', 'y', 't', '1']:
return True
elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
return False
else:
raise ValueError(
"String is not true/false: %r" % obj)
return bool(obj)
def aslist(obj, sep=None, strip=True):
if isinstance(obj, (str, unicode)):
lst = obj.split(sep)
if strip:
lst = [v.strip() for v in lst]
return lst
elif isinstance(obj, (list, tuple)):
return obj
elif obj is None:
return []
else:
return [obj]
def run_ajp_thread(wsgi_app, global_conf,
scriptName='', host='localhost', port='8009',
allowedServers='127.0.0.1'):
import flup.server.ajp
addr = (host, int(port))
s = flup.server.ajp.WSGIServer(
wsgi_app,
scriptName=scriptName,
bindAddress=addr,
allowedServers=aslist(allowedServers),
)
s.run()
def run_ajp_fork(wsgi_app, global_conf,
scriptName='', host='localhost', port='8009',
allowedServers='127.0.0.1'):
import flup.server.ajp_fork
addr = (host, int(port))
s = flup.server.ajp_fork.WSGIServer(
wsgi_app,
scriptName=scriptName,
bindAddress=addr,
allowedServers=aslist(allowedServers),
)
s.run()
def run_fcgi_thread(wsgi_app, global_conf,
host=None, port=None,
socket=None, umask=None,
multiplexed=False):
import flup.server.fcgi
if socket:
assert host is None and port is None
sock = socket
elif host:
assert host is not None and port is not None
sock = (host, int(port))
else:
sock = None
if umask is not None:
umask = int(umask)
s = flup.server.fcgi.WSGIServer(
wsgi_app,
bindAddress=sock, umask=umask,
multiplexed=asbool(multiplexed))
s.run()
def run_fcgi_fork(wsgi_app, global_conf,
host=None, port=None,
socket=None, umask=None,
multiplexed=False):
import flup.server.fcgi_fork
if socket:
assert host is None and port is None
sock = socket
elif host:
assert host is not None and port is not None
sock = (host, int(port))
else:
sock = None
if umask is not None:
umask = int(umask)
s = flup.server.fcgi_fork.WSGIServer(
wsgi_app,
bindAddress=sock, umask=umask,
multiplexed=asbool(multiplexed))
s.run()
def run_scgi_thread(wsgi_app, global_conf,
scriptName='', host='localhost', port='4000',
allowedServers='127.0.0.1'):
import flup.server.scgi
addr = (host, int(port))
s = flup.server.scgi.WSGIServer(
wsgi_app,
scriptName=scriptName,
bindAddress=addr,
allowedServers=aslist(allowedServers),
)
s.run()
def run_scgi_fork(wsgi_app, global_conf,
scriptName='', host='localhost', port='4000',
allowedServers='127.0.0.1'):
import flup.server.scgi_fork
addr = (host, int(port))
s = flup.server.scgi_fork.WSGIServer(
wsgi_app,
scriptName=scriptName,
bindAddress=addr,
allowedServers=aslist(allowedServers),
)
s.run()

View File

@ -0,0 +1,433 @@
# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import os
import socket
import select
import errno
import signal
import random
import time
try:
import fcntl
except ImportError:
def setCloseOnExec(sock):
pass
else:
def setCloseOnExec(sock):
fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
# If running Python < 2.4, require eunuchs module for socket.socketpair().
# See <http://www.inoi.fi/open/trac/eunuchs>.
if not hasattr(socket, 'socketpair'):
try:
import eunuchs.socketpair
except ImportError:
# TODO: Other alternatives? Perhaps using os.pipe()?
raise ImportError, 'Requires eunuchs module for Python < 2.4'
def socketpair():
s1, s2 = eunuchs.socketpair.socketpair()
p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
os.close(s1)
os.close(s2)
return p, c
socket.socketpair = socketpair
class PreforkServer(object):
"""
A preforked server model conceptually similar to Apache httpd(2). At
any given time, ensures there are at least minSpare children ready to
process new requests (up to a maximum of maxChildren children total).
If the number of idle children is ever above maxSpare, the extra
children are killed.
If maxRequests is positive, each child will only handle that many
requests in its lifetime before exiting.
jobClass should be a class whose constructor takes at least two
arguments: the client socket and client address. jobArgs, which
must be a list or tuple, is any additional (static) arguments you
wish to pass to the constructor.
jobClass should have a run() method (taking no arguments) that does
the actual work. When run() returns, the request is considered
complete and the child process moves to idle state.
"""
def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
maxRequests=0, jobClass=None, jobArgs=()):
self._minSpare = minSpare
self._maxSpare = maxSpare
self._maxChildren = max(maxSpare, maxChildren)
self._maxRequests = maxRequests
self._jobClass = jobClass
self._jobArgs = jobArgs
# Internal state of children. Maps pids to dictionaries with two
# members: 'file' and 'avail'. 'file' is the socket to that
# individidual child and 'avail' is whether or not the child is
# free to process requests.
self._children = {}
def run(self, sock):
"""
The main loop. Pass a socket that is ready to accept() client
connections. Return value will be True or False indiciating whether
or not the loop was exited due to SIGHUP.
"""
# Set up signal handlers.
self._keepGoing = True
self._hupReceived = False
self._installSignalHandlers()
# Don't want operations on main socket to block.
sock.setblocking(0)
# Set close-on-exec
setCloseOnExec(sock)
# Main loop.
while self._keepGoing:
# Maintain minimum number of children.
while len(self._children) < self._maxSpare:
if not self._spawnChild(sock): break
# Wait on any socket activity from live children.
r = [x['file'] for x in self._children.values()
if x['file'] is not None]
if len(r) == len(self._children):
timeout = None
else:
# There are dead children that need to be reaped, ensure
# that they are by timing out, if necessary.
timeout = 2
try:
r, w, e = select.select(r, [], [], timeout)
except select.error, e:
if e[0] != errno.EINTR:
raise
# Scan child sockets and tend to those that need attention.
for child in r:
# Receive status byte.
try:
state = child.recv(1)
except socket.error, e:
if e[0] in (errno.EAGAIN, errno.EINTR):
# Guess it really didn't need attention?
continue
raise
# Try to match it with a child. (Do we need a reverse map?)
for pid,d in self._children.items():
if child is d['file']:
if state:
# Set availability status accordingly.
self._children[pid]['avail'] = state != '\x00'
else:
# Didn't receive anything. Child is most likely
# dead.
d = self._children[pid]
d['file'].close()
d['file'] = None
d['avail'] = False
# Reap children.
self._reapChildren()
# See who and how many children are available.
availList = filter(lambda x: x[1]['avail'], self._children.items())
avail = len(availList)
if avail < self._minSpare:
# Need to spawn more children.
while avail < self._minSpare and \
len(self._children) < self._maxChildren:
if not self._spawnChild(sock): break
avail += 1
elif avail > self._maxSpare:
# Too many spares, kill off the extras.
pids = [x[0] for x in availList]
pids.sort()
pids = pids[self._maxSpare:]
for pid in pids:
d = self._children[pid]
d['file'].close()
d['file'] = None
d['avail'] = False
# Clean up all child processes.
self._cleanupChildren()
# Restore signal handlers.
self._restoreSignalHandlers()
# Return bool based on whether or not SIGHUP was received.
return self._hupReceived
def _cleanupChildren(self):
"""
Closes all child sockets (letting those that are available know
that it's time to exit). Sends SIGINT to those that are currently
processing (and hopes that it finishses ASAP).
Any children remaining after 10 seconds is SIGKILLed.
"""
# Let all children know it's time to go.
for pid,d in self._children.items():
if d['file'] is not None:
d['file'].close()
d['file'] = None
if not d['avail']:
# Child is unavailable. SIGINT it.
try:
os.kill(pid, signal.SIGINT)
except OSError, e:
if e[0] != errno.ESRCH:
raise
def alrmHandler(signum, frame):
pass
# Set up alarm to wake us up after 10 seconds.
oldSIGALRM = signal.getsignal(signal.SIGALRM)
signal.signal(signal.SIGALRM, alrmHandler)
signal.alarm(10)
# Wait for all children to die.
while len(self._children):
try:
pid, status = os.wait()
except OSError, e:
if e[0] in (errno.ECHILD, errno.EINTR):
break
if self._children.has_key(pid):
del self._children[pid]
signal.signal(signal.SIGALRM, oldSIGALRM)
# Forcefully kill any remaining children.
for pid in self._children.keys():
try:
os.kill(pid, signal.SIGKILL)
except OSError, e:
if e[0] != errno.ESRCH:
raise
def _reapChildren(self):
"""Cleans up self._children whenever children die."""
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
except OSError, e:
if e[0] == errno.ECHILD:
break
raise
if pid <= 0:
break
if self._children.has_key(pid): # Sanity check.
if self._children[pid]['file'] is not None:
self._children[pid]['file'].close()
del self._children[pid]
def _spawnChild(self, sock):
"""
Spawn a single child. Returns True if successful, False otherwise.
"""
# This socket pair is used for very simple communication between
# the parent and its children.
parent, child = socket.socketpair()
parent.setblocking(0)
setCloseOnExec(parent)
child.setblocking(0)
setCloseOnExec(child)
try:
pid = os.fork()
except OSError, e:
if e[0] in (errno.EAGAIN, errno.ENOMEM):
return False # Can't fork anymore.
raise
if not pid:
# Child
child.close()
# Put child into its own process group.
pid = os.getpid()
os.setpgid(pid, pid)
# Restore signal handlers.
self._restoreSignalHandlers()
# Close copies of child sockets.
for f in [x['file'] for x in self._children.values()
if x['file'] is not None]:
f.close()
self._children = {}
try:
# Enter main loop.
self._child(sock, parent)
except KeyboardInterrupt:
pass
sys.exit(0)
else:
# Parent
parent.close()
d = self._children[pid] = {}
d['file'] = child
d['avail'] = True
return True
def _isClientAllowed(self, addr):
"""Override to provide access control."""
return True
def _notifyParent(self, parent, msg):
"""Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
while True:
try:
parent.send(msg)
return True
except socket.error, e:
if e[0] == errno.EPIPE:
return False # Parent is gone
if e[0] == errno.EAGAIN:
# Wait for socket change before sending again
select.select([], [parent], [])
else:
raise
def _child(self, sock, parent):
"""Main loop for children."""
requestCount = 0
# Re-seed random module
preseed = ''
# urandom only exists in Python >= 2.4
if hasattr(os, 'urandom'):
try:
preseed = os.urandom(16)
except NotImplementedError:
pass
# Have doubts about this. random.seed will just hash the string
random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
del preseed
while True:
# Wait for any activity on the main socket or parent socket.
r, w, e = select.select([sock, parent], [], [])
for f in r:
# If there's any activity on the parent socket, it
# means the parent wants us to die or has died itself.
# Either way, exit.
if f is parent:
return
# Otherwise, there's activity on the main socket...
try:
clientSock, addr = sock.accept()
except socket.error, e:
if e[0] == errno.EAGAIN:
# Or maybe not.
continue
raise
setCloseOnExec(clientSock)
# Check if this client is allowed.
if not self._isClientAllowed(addr):
clientSock.close()
continue
# Notify parent we're no longer available.
self._notifyParent(parent, '\x00')
# Do the job.
self._jobClass(clientSock, addr, *self._jobArgs).run()
# If we've serviced the maximum number of requests, exit.
if self._maxRequests > 0:
requestCount += 1
if requestCount >= self._maxRequests:
break
# Tell parent we're free again.
if not self._notifyParent(parent, '\xff'):
return # Parent is gone.
# Signal handlers
def _hupHandler(self, signum, frame):
self._keepGoing = False
self._hupReceived = True
def _intHandler(self, signum, frame):
self._keepGoing = False
def _chldHandler(self, signum, frame):
# Do nothing (breaks us out of select and allows us to reap children).
pass
def _installSignalHandlers(self):
supportedSignals = [signal.SIGINT, signal.SIGTERM]
if hasattr(signal, 'SIGHUP'):
supportedSignals.append(signal.SIGHUP)
self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
for sig in supportedSignals:
if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
signal.signal(sig, self._hupHandler)
else:
signal.signal(sig, self._intHandler)
def _restoreSignalHandlers(self):
"""Restores previous signal handlers."""
for signum,handler in self._oldSIGs:
signal.signal(signum, handler)
if __name__ == '__main__':
class TestJob(object):
def __init__(self, sock, addr):
self._sock = sock
self._addr = addr
def run(self):
print "Client connection opened from %s:%d" % self._addr
self._sock.send('Hello World!\n')
self._sock.setblocking(1)
self._sock.recv(1)
self._sock.close()
print "Client connection closed from %s:%d" % self._addr
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 8080))
sock.listen(socket.SOMAXCONN)
PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)

View File

@ -0,0 +1,190 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
scgi - an SCGI/WSGI gateway.
For more information about SCGI and mod_scgi for Apache1/Apache2, see
<http://www.mems-exchange.org/software/scgi/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
import sys
from myapplication import app # Assume app is your WSGI application object
from scgi import WSGIServer
ret = WSGIServer(app).run()
sys.exit(ret and 42 or 0)
See the documentation for WSGIServer for more information.
About the bit of logic at the end:
Upon receiving SIGHUP, the python script will exit with status code 42. This
can be used by a wrapper script to determine if the python script should be
re-run. When a SIGINT or SIGTERM is received, the script exits with status
code 0, possibly indicating a normal exit.
Example wrapper script:
#!/bin/sh
STATUS=42
while test $STATUS -eq 42; do
python "$@" that_script_above.py
STATUS=$?
done
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import logging
import socket
from flup.server.scgi_base import BaseSCGIServer, Connection, NoDefault
from flup.server.threadedserver import ThreadedServer
__all__ = ['WSGIServer']
class WSGIServer(BaseSCGIServer, ThreadedServer):
"""
SCGI/WSGI server. For information about SCGI (Simple Common Gateway
Interface), see <http://www.mems-exchange.org/software/scgi/>.
This server is similar to SWAP <http://www.idyll.org/~t/www-tools/wsgi/>,
another SCGI/WSGI server.
It differs from SWAP in that it isn't based on scgi.scgi_server and
therefore, it allows me to implement concurrency using threads. (Also,
this server was written from scratch and really has no other depedencies.)
Which server to use really boils down to whether you want multithreading
or forking. (But as an aside, I've found scgi.scgi_server's implementation
of preforking to be quite superior. So if your application really doesn't
mind running in multiple processes, go use SWAP. ;)
"""
def __init__(self, application, scriptName=NoDefault, environ=None,
multithreaded=True, multiprocess=False,
bindAddress=('localhost', 4000), umask=None,
allowedServers=None,
loggingLevel=logging.INFO, debug=True, **kw):
"""
scriptName is the initial portion of the URL path that "belongs"
to your application. It is used to determine PATH_INFO (which doesn't
seem to be passed in). An empty scriptName means your application
is mounted at the root of your virtual host.
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
bindAddress is the address to bind to, which must be a string or
a tuple of length 2. If a tuple, the first element must be a string,
which is the host name or IPv4 address of a local interface. The
2nd element of the tuple is the port number. If a string, it will
be interpreted as a filename and a UNIX socket will be opened.
If binding to a UNIX socket, umask may be set to specify what
the umask is to be changed to before the socket is created in the
filesystem. After the socket is created, the previous umask is
restored.
allowedServers must be None or a list of strings representing the
IPv4 addresses of servers allowed to connect. None means accept
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
"""
BaseSCGIServer.__init__(self, application,
scriptName=scriptName,
environ=environ,
multithreaded=multithreaded,
multiprocess=multiprocess,
bindAddress=bindAddress,
umask=umask,
allowedServers=allowedServers,
loggingLevel=loggingLevel,
debug=debug)
for key in ('jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,),
**kw)
def run(self):
"""
Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
is caught, this method returns True. Returns False otherwise.)
"""
self.logger.info('%s starting up', self.__class__.__name__)
try:
sock = self._setupSocket()
except socket.error, e:
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
self.logger.info('%s shutting down%s', self.__class__.__name__,
self._hupReceived and ' (reload requested)' or '')
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
WSGIServer(test_app,
loggingLevel=logging.DEBUG).run()

View File

@ -0,0 +1,544 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import logging
import socket
import select
import errno
import cStringIO as StringIO
import signal
import datetime
import os
import warnings
# Threads are required. If you want a non-threaded (forking) version, look at
# SWAP <http://www.idyll.org/~t/www-tools/wsgi/>.
import thread
import threading
__all__ = ['BaseSCGIServer']
class NoDefault(object):
pass
# The main classes use this name for logging.
LoggerName = 'scgi-wsgi'
# Set up module-level logger.
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
'%Y-%m-%d %H:%M:%S'))
logging.getLogger(LoggerName).addHandler(console)
del console
class ProtocolError(Exception):
"""
Exception raised when the server does something unexpected or
sends garbled data. Usually leads to a Connection closing.
"""
pass
def recvall(sock, length):
"""
Attempts to receive length bytes from a socket, blocking if necessary.
(Socket may be blocking or non-blocking.)
"""
dataList = []
recvLen = 0
while length:
try:
data = sock.recv(length)
except socket.error, e:
if e[0] == errno.EAGAIN:
select.select([sock], [], [])
continue
else:
raise
if not data: # EOF
break
dataList.append(data)
dataLen = len(data)
recvLen += dataLen
length -= dataLen
return ''.join(dataList), recvLen
def readNetstring(sock):
"""
Attempt to read a netstring from a socket.
"""
# First attempt to read the length.
size = ''
while True:
try:
c = sock.recv(1)
except socket.error, e:
if e[0] == errno.EAGAIN:
select.select([sock], [], [])
continue
else:
raise
if c == ':':
break
if not c:
raise EOFError
size += c
# Try to decode the length.
try:
size = int(size)
if size < 0:
raise ValueError
except ValueError:
raise ProtocolError, 'invalid netstring length'
# Now read the string.
s, length = recvall(sock, size)
if length < size:
raise EOFError
# Lastly, the trailer.
trailer, length = recvall(sock, 1)
if length < 1:
raise EOFError
if trailer != ',':
raise ProtocolError, 'invalid netstring trailer'
return s
class StdoutWrapper(object):
"""
Wrapper for sys.stdout so we know if data has actually been written.
"""
def __init__(self, stdout):
self._file = stdout
self.dataWritten = False
def write(self, data):
if data:
self.dataWritten = True
self._file.write(data)
def writelines(self, lines):
for line in lines:
self.write(line)
def __getattr__(self, name):
return getattr(self._file, name)
class Request(object):
"""
Encapsulates data related to a single request.
Public attributes:
environ - Environment variables from web server.
stdin - File-like object representing the request body.
stdout - File-like object for writing the response.
"""
def __init__(self, conn, environ, input, output):
self._conn = conn
self.environ = environ
self.stdin = input
self.stdout = StdoutWrapper(output)
self.logger = logging.getLogger(LoggerName)
def run(self):
self.logger.info('%s %s%s',
self.environ['REQUEST_METHOD'],
self.environ.get('SCRIPT_NAME', ''),
self.environ.get('PATH_INFO', ''))
start = datetime.datetime.now()
try:
self._conn.server.handler(self)
except:
self.logger.exception('Exception caught from handler')
if not self.stdout.dataWritten:
self._conn.server.error(self)
end = datetime.datetime.now()
handlerTime = end - start
self.logger.debug('%s %s%s done (%.3f secs)',
self.environ['REQUEST_METHOD'],
self.environ.get('SCRIPT_NAME', ''),
self.environ.get('PATH_INFO', ''),
handlerTime.seconds +
handlerTime.microseconds / 1000000.0)
class Connection(object):
"""
Represents a single client (web server) connection. A single request
is handled, after which the socket is closed.
"""
def __init__(self, sock, addr, server):
self._sock = sock
self._addr = addr
self.server = server
self.logger = logging.getLogger(LoggerName)
def run(self):
if len(self._addr) == 2:
self.logger.debug('Connection starting up (%s:%d)',
self._addr[0], self._addr[1])
try:
self.processInput()
except (EOFError, KeyboardInterrupt):
pass
except ProtocolError, e:
self.logger.error("Protocol error '%s'", str(e))
except:
self.logger.exception('Exception caught in Connection')
if len(self._addr) == 2:
self.logger.debug('Connection shutting down (%s:%d)',
self._addr[0], self._addr[1])
# All done!
self._sock.close()
def processInput(self):
# Read headers
headers = readNetstring(self._sock)
headers = headers.split('\x00')[:-1]
if len(headers) % 2 != 0:
raise ProtocolError, 'invalid headers'
environ = {}
for i in range(len(headers) / 2):
environ[headers[2*i]] = headers[2*i+1]
clen = environ.get('CONTENT_LENGTH')
if clen is None:
raise ProtocolError, 'missing CONTENT_LENGTH'
try:
clen = int(clen)
if clen < 0:
raise ValueError
except ValueError:
raise ProtocolError, 'invalid CONTENT_LENGTH'
self._sock.setblocking(1)
if clen:
input = self._sock.makefile('r')
else:
# Empty input.
input = StringIO.StringIO()
# stdout
output = self._sock.makefile('w')
# Allocate Request
req = Request(self, environ, input, output)
# Run it.
req.run()
output.close()
input.close()
class BaseSCGIServer(object):
# What Request class to use.
requestClass = Request
def __init__(self, application, scriptName=NoDefault, environ=None,
multithreaded=True, multiprocess=False,
bindAddress=('localhost', 4000), umask=None,
allowedServers=NoDefault,
loggingLevel=logging.INFO, debug=True):
"""
scriptName is the initial portion of the URL path that "belongs"
to your application. It is used to determine PATH_INFO (which doesn't
seem to be passed in). An empty scriptName means your application
is mounted at the root of your virtual host.
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
Set multithreaded to False if your application is not thread-safe.
Set multiprocess to True to explicitly set wsgi.multiprocess to
True. (Only makes sense with threaded servers.)
bindAddress is the address to bind to, which must be a string or
a tuple of length 2. If a tuple, the first element must be a string,
which is the host name or IPv4 address of a local interface. The
2nd element of the tuple is the port number. If a string, it will
be interpreted as a filename and a UNIX socket will be opened.
If binding to a UNIX socket, umask may be set to specify what
the umask is to be changed to before the socket is created in the
filesystem. After the socket is created, the previous umask is
restored.
allowedServers must be None or a list of strings representing the
IPv4 addresses of servers allowed to connect. None means accept
connections from anywhere. By default, it is a list containing
the single item '127.0.0.1'.
loggingLevel sets the logging level of the module-level logger.
"""
if environ is None:
environ = {}
self.application = application
self.scriptName = scriptName
self.environ = environ
self.multithreaded = multithreaded
self.multiprocess = multiprocess
self.debug = debug
self._bindAddress = bindAddress
self._umask = umask
if allowedServers is NoDefault:
allowedServers = ['127.0.0.1']
self._allowedServers = allowedServers
# Used to force single-threadedness.
self._appLock = thread.allocate_lock()
self.logger = logging.getLogger(LoggerName)
self.logger.setLevel(loggingLevel)
def _setupSocket(self):
"""Creates and binds the socket for communication with the server."""
oldUmask = None
if type(self._bindAddress) is str:
# Unix socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.unlink(self._bindAddress)
except OSError:
pass
if self._umask is not None:
oldUmask = os.umask(self._umask)
else:
# INET socket
assert type(self._bindAddress) is tuple
assert len(self._bindAddress) == 2
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(self._bindAddress)
sock.listen(socket.SOMAXCONN)
if oldUmask is not None:
os.umask(oldUmask)
return sock
def _cleanupSocket(self, sock):
"""Closes the main socket."""
sock.close()
def _isClientAllowed(self, addr):
ret = self._allowedServers is None or \
len(addr) != 2 or \
(len(addr) == 2 and addr[0] in self._allowedServers)
if not ret:
self.logger.warning('Server connection from %s disallowed',
addr[0])
return ret
def handler(self, request):
"""
WSGI handler. Sets up WSGI environment, calls the application,
and sends the application's response.
"""
environ = request.environ
environ.update(self.environ)
environ['wsgi.version'] = (1,0)
environ['wsgi.input'] = request.stdin
environ['wsgi.errors'] = sys.stderr
environ['wsgi.multithread'] = self.multithreaded
environ['wsgi.multiprocess'] = self.multiprocess
environ['wsgi.run_once'] = False
if environ.get('HTTPS', 'off') in ('on', '1'):
environ['wsgi.url_scheme'] = 'https'
else:
environ['wsgi.url_scheme'] = 'http'
self._sanitizeEnv(environ)
headers_set = []
headers_sent = []
result = None
def write(data):
assert type(data) is str, 'write() argument must be string'
assert headers_set, 'write() before start_response()'
if not headers_sent:
status, responseHeaders = headers_sent[:] = headers_set
found = False
for header,value in responseHeaders:
if header.lower() == 'content-length':
found = True
break
if not found and result is not None:
try:
if len(result) == 1:
responseHeaders.append(('Content-Length',
str(len(data))))
except:
pass
s = 'Status: %s\r\n' % status
for header in responseHeaders:
s += '%s: %s\r\n' % header
s += '\r\n'
request.stdout.write(s)
request.stdout.write(data)
request.stdout.flush()
def start_response(status, response_headers, exc_info=None):
if exc_info:
try:
if headers_sent:
# Re-raise if too late
raise exc_info[0], exc_info[1], exc_info[2]
finally:
exc_info = None # avoid dangling circular ref
else:
assert not headers_set, 'Headers already set!'
assert type(status) is str, 'Status must be a string'
assert len(status) >= 4, 'Status must be at least 4 characters'
assert int(status[:3]), 'Status must begin with 3-digit code'
assert status[3] == ' ', 'Status must have a space after code'
assert type(response_headers) is list, 'Headers must be a list'
if __debug__:
for name,val in response_headers:
assert type(name) is str, 'Header name "%s" must be a string' % name
assert type(val) is str, 'Value of header "%s" must be a string' % name
headers_set[:] = [status, response_headers]
return write
if not self.multithreaded:
self._appLock.acquire()
try:
try:
result = self.application(environ, start_response)
try:
for data in result:
if data:
write(data)
if not headers_sent:
write('') # in case body was empty
finally:
if hasattr(result, 'close'):
result.close()
except socket.error, e:
if e[0] != errno.EPIPE:
raise # Don't let EPIPE propagate beyond server
finally:
if not self.multithreaded:
self._appLock.release()
def _sanitizeEnv(self, environ):
"""Fill-in/deduce missing values in environ."""
reqUri = None
if environ.has_key('REQUEST_URI'):
reqUri = environ['REQUEST_URI'].split('?', 1)
# Ensure QUERY_STRING exists
if not environ.has_key('QUERY_STRING') or not environ['QUERY_STRING']:
if reqUri is not None and len(reqUri) > 1:
environ['QUERY_STRING'] = reqUri[1]
else:
environ['QUERY_STRING'] = ''
# Check WSGI_SCRIPT_NAME
scriptName = environ.get('WSGI_SCRIPT_NAME')
if scriptName is None:
scriptName = self.scriptName
else:
warnings.warn('WSGI_SCRIPT_NAME environment variable for scgi '
'servers is deprecated',
DeprecationWarning)
if scriptName.lower() == 'none':
scriptName = None
if scriptName is None:
# Do nothing (most likely coming from cgi2scgi)
return
if scriptName is NoDefault:
# Pull SCRIPT_NAME/PATH_INFO from environment, with empty defaults
if not environ.has_key('SCRIPT_NAME'):
environ['SCRIPT_INFO'] = ''
if not environ.has_key('PATH_INFO') or not environ['PATH_INFO']:
if reqUri is not None:
environ['PATH_INFO'] = reqUri[0]
else:
environ['PATH_INFO'] = ''
else:
# Configured scriptName
warnings.warn('Configured SCRIPT_NAME is deprecated\n'
'Do not use WSGI_SCRIPT_NAME or the scriptName\n'
'keyword parameter -- they will be going away',
DeprecationWarning)
value = environ['SCRIPT_NAME']
value += environ.get('PATH_INFO', '')
if not value.startswith(scriptName):
self.logger.warning('scriptName does not match request URI')
environ['PATH_INFO'] = value[len(scriptName):]
environ['SCRIPT_NAME'] = scriptName
def error(self, request):
"""
Override to provide custom error handling. Ideally, however,
all errors should be caught at the application level.
"""
if self.debug:
import cgitb
request.stdout.write('Content-Type: text/html\r\n\r\n' +
cgitb.html(sys.exc_info()))
else:
errorpage = """<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html><head>
<title>Unhandled Exception</title>
</head><body>
<h1>Unhandled Exception</h1>
<p>An unhandled exception was thrown by the application.</p>
</body></html>
"""
request.stdout.write('Content-Type: text/html\r\n\r\n' +
errorpage)

View File

@ -0,0 +1,188 @@
# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
"""
scgi - an SCGI/WSGI gateway.
For more information about SCGI and mod_scgi for Apache1/Apache2, see
<http://www.mems-exchange.org/software/scgi/>.
For more information about the Web Server Gateway Interface, see
<http://www.python.org/peps/pep-0333.html>.
Example usage:
#!/usr/bin/env python
import sys
from myapplication import app # Assume app is your WSGI application object
from scgi import WSGIServer
ret = WSGIServer(app).run()
sys.exit(ret and 42 or 0)
See the documentation for WSGIServer for more information.
About the bit of logic at the end:
Upon receiving SIGHUP, the python script will exit with status code 42. This
can be used by a wrapper script to determine if the python script should be
re-run. When a SIGINT or SIGTERM is received, the script exits with status
code 0, possibly indicating a normal exit.
Example wrapper script:
#!/bin/sh
STATUS=42
while test $STATUS -eq 42; do
python "$@" that_script_above.py
STATUS=$?
done
"""
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import logging
import socket
from flup.server.scgi_base import BaseSCGIServer, Connection, NoDefault
from flup.server.preforkserver import PreforkServer
__all__ = ['WSGIServer']
class WSGIServer(BaseSCGIServer, PreforkServer):
"""
SCGI/WSGI server. For information about SCGI (Simple Common Gateway
Interface), see <http://www.mems-exchange.org/software/scgi/>.
This server is similar to SWAP <http://www.idyll.org/~t/www-tools/wsgi/>,
another SCGI/WSGI server.
It differs from SWAP in that it isn't based on scgi.scgi_server and
therefore, it allows me to implement concurrency using threads. (Also,
this server was written from scratch and really has no other depedencies.)
Which server to use really boils down to whether you want multithreading
or forking. (But as an aside, I've found scgi.scgi_server's implementation
of preforking to be quite superior. So if your application really doesn't
mind running in multiple processes, go use SWAP. ;)
"""
def __init__(self, application, scriptName=NoDefault, environ=None,
bindAddress=('localhost', 4000), umask=None,
allowedServers=None,
loggingLevel=logging.INFO, debug=True, **kw):
"""
scriptName is the initial portion of the URL path that "belongs"
to your application. It is used to determine PATH_INFO (which doesn't
seem to be passed in). An empty scriptName means your application
is mounted at the root of your virtual host.
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
bindAddress is the address to bind to, which must be a string or
a tuple of length 2. If a tuple, the first element must be a string,
which is the host name or IPv4 address of a local interface. The
2nd element of the tuple is the port number. If a string, it will
be interpreted as a filename and a UNIX socket will be opened.
If binding to a UNIX socket, umask may be set to specify what
the umask is to be changed to before the socket is created in the
filesystem. After the socket is created, the previous umask is
restored.
allowedServers must be None or a list of strings representing the
IPv4 addresses of servers allowed to connect. None means accept
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
"""
BaseSCGIServer.__init__(self, application,
scriptName=scriptName,
environ=environ,
multithreaded=False,
multiprocess=True,
bindAddress=bindAddress,
umask=umask,
allowedServers=allowedServers,
loggingLevel=loggingLevel,
debug=debug)
for key in ('multithreaded', 'multiprocess', 'jobClass', 'jobArgs'):
if kw.has_key(key):
del kw[key]
PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
def run(self):
"""
Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
is caught, this method returns True. Returns False otherwise.)
"""
self.logger.info('%s starting up', self.__class__.__name__)
try:
sock = self._setupSocket()
except socket.error, e:
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
ret = PreforkServer.run(self, sock)
self._cleanupSocket(sock)
self.logger.info('%s shutting down%s', self.__class__.__name__,
self._hupReceived and ' (reload requested)' or '')
return ret
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
from wsgiref import validate
test_app = validate.validator(test_app)
WSGIServer(test_app,
loggingLevel=logging.DEBUG).run()

View File

@ -0,0 +1,166 @@
# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import socket
import select
import signal
import errno
try:
import fcntl
except ImportError:
def setCloseOnExec(sock):
pass
else:
def setCloseOnExec(sock):
fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
__all__ = ['SingleServer']
class SingleServer(object):
def __init__(self, jobClass=None, jobArgs=(), **kw):
self._jobClass = jobClass
self._jobArgs = jobArgs
def run(self, sock, timeout=1.0):
"""
The main loop. Pass a socket that is ready to accept() client
connections. Return value will be True or False indiciating whether
or not the loop was exited due to SIGHUP.
"""
# Set up signal handlers.
self._keepGoing = True
self._hupReceived = False
# Might need to revisit this?
if not sys.platform.startswith('win'):
self._installSignalHandlers()
# Set close-on-exec
setCloseOnExec(sock)
# Main loop.
while self._keepGoing:
try:
r, w, e = select.select([sock], [], [], timeout)
except select.error, e:
if e[0] == errno.EINTR:
continue
raise
if r:
try:
clientSock, addr = sock.accept()
except socket.error, e:
if e[0] in (errno.EINTR, errno.EAGAIN):
continue
raise
setCloseOnExec(clientSock)
if not self._isClientAllowed(addr):
clientSock.close()
continue
# Hand off to Connection.
conn = self._jobClass(clientSock, addr, *self._jobArgs)
conn.run()
self._mainloopPeriodic()
# Restore signal handlers.
self._restoreSignalHandlers()
# Return bool based on whether or not SIGHUP was received.
return self._hupReceived
def _mainloopPeriodic(self):
"""
Called with just about each iteration of the main loop. Meant to
be overridden.
"""
pass
def _exit(self, reload=False):
"""
Protected convenience method for subclasses to force an exit. Not
really thread-safe, which is why it isn't public.
"""
if self._keepGoing:
self._keepGoing = False
self._hupReceived = reload
def _isClientAllowed(self, addr):
"""Override to provide access control."""
return True
# Signal handlers
def _hupHandler(self, signum, frame):
self._hupReceived = True
self._keepGoing = False
def _intHandler(self, signum, frame):
self._keepGoing = False
def _installSignalHandlers(self):
supportedSignals = [signal.SIGINT, signal.SIGTERM]
if hasattr(signal, 'SIGHUP'):
supportedSignals.append(signal.SIGHUP)
self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
for sig in supportedSignals:
if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
signal.signal(sig, self._hupHandler)
else:
signal.signal(sig, self._intHandler)
def _restoreSignalHandlers(self):
for signum,handler in self._oldSIGs:
signal.signal(signum, handler)
if __name__ == '__main__':
class TestJob(object):
def __init__(self, sock, addr):
self._sock = sock
self._addr = addr
def run(self):
print "Client connection opened from %s:%d" % self._addr
self._sock.send('Hello World!\n')
self._sock.setblocking(1)
self._sock.recv(1)
self._sock.close()
print "Client connection closed from %s:%d" % self._addr
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 8080))
sock.listen(socket.SOMAXCONN)
SingleServer(jobClass=TestJob).run(sock)

View File

@ -0,0 +1,175 @@
# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import socket
import select
import signal
import errno
try:
import fcntl
except ImportError:
def setCloseOnExec(sock):
pass
else:
def setCloseOnExec(sock):
fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
from flup.server.threadpool import ThreadPool
__all__ = ['ThreadedServer']
class ThreadedServer(object):
def __init__(self, jobClass=None, jobArgs=(), **kw):
self._jobClass = jobClass
self._jobArgs = jobArgs
self._threadPool = ThreadPool(**kw)
def run(self, sock, timeout=1.0):
"""
The main loop. Pass a socket that is ready to accept() client
connections. Return value will be True or False indiciating whether
or not the loop was exited due to SIGHUP.
"""
# Set up signal handlers.
self._keepGoing = True
self._hupReceived = False
# Might need to revisit this?
if not sys.platform.startswith('win'):
self._installSignalHandlers()
# Set close-on-exec
setCloseOnExec(sock)
# Main loop.
while self._keepGoing:
try:
r, w, e = select.select([sock], [], [], timeout)
except select.error, e:
if e[0] == errno.EINTR:
continue
raise
if r:
try:
clientSock, addr = sock.accept()
except socket.error, e:
if e[0] in (errno.EINTR, errno.EAGAIN):
continue
raise
setCloseOnExec(clientSock)
if not self._isClientAllowed(addr):
clientSock.close()
continue
# Hand off to Connection.
conn = self._jobClass(clientSock, addr, *self._jobArgs)
if not self._threadPool.addJob(conn, allowQueuing=False):
# No thread left, immediately close the socket to hopefully
# indicate to the web server that we're at our limit...
# and to prevent having too many opened (and useless)
# files.
clientSock.close()
self._mainloopPeriodic()
# Restore signal handlers.
self._restoreSignalHandlers()
# Return bool based on whether or not SIGHUP was received.
return self._hupReceived
def _mainloopPeriodic(self):
"""
Called with just about each iteration of the main loop. Meant to
be overridden.
"""
pass
def _exit(self, reload=False):
"""
Protected convenience method for subclasses to force an exit. Not
really thread-safe, which is why it isn't public.
"""
if self._keepGoing:
self._keepGoing = False
self._hupReceived = reload
def _isClientAllowed(self, addr):
"""Override to provide access control."""
return True
# Signal handlers
def _hupHandler(self, signum, frame):
self._hupReceived = True
self._keepGoing = False
def _intHandler(self, signum, frame):
self._keepGoing = False
def _installSignalHandlers(self):
supportedSignals = [signal.SIGINT, signal.SIGTERM]
if hasattr(signal, 'SIGHUP'):
supportedSignals.append(signal.SIGHUP)
self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
for sig in supportedSignals:
if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
signal.signal(sig, self._hupHandler)
else:
signal.signal(sig, self._intHandler)
def _restoreSignalHandlers(self):
for signum,handler in self._oldSIGs:
signal.signal(signum, handler)
if __name__ == '__main__':
class TestJob(object):
def __init__(self, sock, addr):
self._sock = sock
self._addr = addr
def run(self):
print "Client connection opened from %s:%d" % self._addr
self._sock.send('Hello World!\n')
self._sock.setblocking(1)
self._sock.recv(1)
self._sock.close()
print "Client connection closed from %s:%d" % self._addr
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 8080))
sock.listen(socket.SOMAXCONN)
ThreadedServer(maxThreads=10, jobClass=TestJob).run(sock)

View File

@ -0,0 +1,121 @@
# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import thread
import threading
class ThreadPool(object):
"""
Thread pool that maintains the number of idle threads between
minSpare and maxSpare inclusive. By default, there is no limit on
the number of threads that can be started, but this can be controlled
by maxThreads.
"""
def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint):
self._minSpare = minSpare
self._maxSpare = maxSpare
self._maxThreads = max(minSpare, maxThreads)
self._lock = threading.Condition()
self._workQueue = []
self._idleCount = self._workerCount = maxSpare
# Start the minimum number of worker threads.
for i in range(maxSpare):
thread.start_new_thread(self._worker, ())
def addJob(self, job, allowQueuing=True):
"""
Adds a job to the work queue. The job object should have a run()
method. If allowQueuing is True (the default), the job will be
added to the work queue regardless if there are any idle threads
ready. (The only way for there to be no idle threads is if maxThreads
is some reasonable, finite limit.)
Otherwise, if allowQueuing is False, and there are no more idle
threads, the job will not be queued.
Returns True if the job was queued, False otherwise.
"""
self._lock.acquire()
try:
# Maintain minimum number of spares.
while self._idleCount < self._minSpare and \
self._workerCount < self._maxThreads:
self._workerCount += 1
self._idleCount += 1
thread.start_new_thread(self._worker, ())
# Hand off the job.
if self._idleCount or allowQueuing:
self._workQueue.append(job)
self._lock.notify()
return True
else:
return False
finally:
self._lock.release()
def _worker(self):
"""
Worker thread routine. Waits for a job, executes it, repeat.
"""
self._lock.acquire()
while True:
while not self._workQueue:
self._lock.wait()
# We have a job to do...
job = self._workQueue.pop(0)
assert self._idleCount > 0
self._idleCount -= 1
self._lock.release()
try:
job.run()
except:
# FIXME: This should really be reported somewhere.
# But we can't simply report it to stderr because of fcgi
pass
self._lock.acquire()
if self._idleCount == self._maxSpare:
break # NB: lock still held
self._idleCount += 1
assert self._idleCount <= self._maxSpare
# Die off...
assert self._workerCount > self._maxSpare
self._workerCount -= 1
self._lock.release()

View File

@ -3,7 +3,7 @@ Description=@@name@@ server
[Service] [Service]
Type=simple Type=simple
ExecStart=/usr/bin/python2 @@basedir@@/server.py ExecStart=@@destdir@@/server.py
Restart=on-failure Restart=on-failure
[Install] [Install]

View File

@ -3,9 +3,6 @@
__all__ = ('Server',) __all__ = ('Server',)
try: True, False
except: True, False = 1, 0
from ulib.web import web, Application, Page, defaults from ulib.web import web, Application, Page, defaults
class Server(Application): class Server(Application):
@ -20,4 +17,5 @@ if __name__ == '__main__':
from os import path from os import path
basedir = path.abspath(path.split(__file__)[0]) basedir = path.abspath(path.split(__file__)[0])
sys.path.append(path.join(basedir, 'lib'))
Server(basedir).run(sys.argv[1:]) Server(basedir).run(sys.argv[1:])