|
from StringIO import StringIO
|
|
from urlparse import urljoin
|
|
import logging
|
|
|
|
logger = logging.getLogger()
|
|
logger.addHandler(logging.StreamHandler())
|
|
|
|
from nectar import config, listener, request as nectar_request
|
|
from nectar.downloaders import threaded
|
|
from twisted.internet import reactor, endpoints
|
|
from twisted.web import server, resource
|
|
from twisted.web.server import NOT_DONE_YET
|
|
|
|
class StreamerNectarListener(listener.DownloadEventListener):
|
|
|
|
def __init__(self, request):
|
|
super(StreamerNectarListener, self).__init__(request)
|
|
self.request = request
|
|
|
|
def download_headers(self, report):
|
|
print report.headers
|
|
# TODO these headers need to be sent to the client before nectar makes the first call to Responder.write()
|
|
# I think this needs to be done by having reactor.callFromThread call self.request.write
|
|
# The cache control header needs to be set to the value from server.conf and included even if the server did not return a cache-control value
|
|
# The headers need to be written just as the HTTP spec says they should be, IIRC its HEADER_NAME: HEADER_VALUE
|
|
# maybe this will be done with request.setHeader like the commented out line below?
|
|
|
|
|
|
HOST = 'http://localhost:8000' # TODO This value really should come from the catalog per request
|
|
|
|
|
|
class Streamer(resource.Resource):
|
|
isLeaf = True
|
|
|
|
def render_GET(self, request):
|
|
reactor.callInThread(self._download, request)
|
|
return NOT_DONE_YET
|
|
|
|
def _download(self, request):
|
|
# example of the cache-control header
|
|
#request.setHeader('cache-control', 'public, max-age=31536000')
|
|
responder = Responder(request)
|
|
d_config = config.DownloaderConfig()
|
|
listner = StreamerNectarListner(request)
|
|
request.downloader = threaded.HTTPThreadedDownloader(d_config, listner)
|
|
download_request = nectar_request.DownloadRequest(
|
|
urljoin(HOST, request.path), responder) # TODO: This needs to come from the importer config w/ the catalog
|
|
request.downloader.download_one(download_request, events=True)
|
|
responder.close()
|
|
|
|
|
|
class Responder(StringIO):
|
|
def __init__(self, request):
|
|
self.request = request
|
|
|
|
def close(self): # TODO someone needs to determine if close is called even if nectar experiences an error. If it doesn't maybe use the download_error callback on StreamerNectarListener
|
|
reactor.callFromThread(self.request.finish)
|
|
|
|
def write(self, data):
|
|
reactor.callFromThread(self.request.write, data)
|
|
|
|
|
|
endpoints.serverFromString(reactor, "tcp:8080").listen(server.Site(Streamer())) # TODO: Needs to be interface and port configurable from server.conf
|
|
reactor.run() # TODO: be able to start/stop/restart/status on EL6 and EL7 using init scripts / systemd
|