blob: 3feae1fa46b00f5fce3e41e8498471ea008bb6bb [file] [log] [blame]
"""
Implement an on-disk queue for stuff
"""
import os
import tempfile
import time
import logging
log = logging.getLogger(__name__)
try:
import pyinotify
assert pyinotify
class _MovedHandler(pyinotify.ProcessEvent):
def process_IN_MOVED_TO(self, event):
pass
except ImportError:
pyinotify = None
class QueueDir(object):
# How long before things are considered to be "old"
# Also how long between cleanup jobs
cleanup_time = 300 # 5 minutes
# Should the producer do cleanup?
producer_cleanup = True
# Mapping of names to QueueDir instances
_objects = {}
def __init__(self, name, queue_dir):
self._objects[name] = self
self.queue_dir = queue_dir
self.pid = os.getpid()
self.started = int(time.time())
self.count = 0
self.last_cleanup = 0
# List of time, item_id for items to move from cur back into new
self.to_requeue = []
self.tmp_dir = os.path.join(self.queue_dir, 'tmp')
self.new_dir = os.path.join(self.queue_dir, 'new')
self.cur_dir = os.path.join(self.queue_dir, 'cur')
self.log_dir = os.path.join(self.queue_dir, 'logs')
self.dead_dir = os.path.join(self.queue_dir, 'dead')
self.setup()
@classmethod
def getQueue(cls, name):
return cls._objects[name]
def setup(self):
for d in (self.tmp_dir, self.new_dir, self.cur_dir, self.log_dir, self.dead_dir):
# Create our directories a bit at a time so we can make sure the
# modes are created properly
parts = d.split("/")
path = "/"
for p in parts:
path = os.path.join(path, p)
if not os.path.exists(path):
os.mkdir(path)
os.chmod(path, 0755)
self.cleanup()
def cleanup(self):
"""
Removes old items from tmp
Removes old logs from log_dir
Moves old items from cur into new
'old' is defined by the cleanup_time property
"""
now = time.time()
if now - self.last_cleanup < self.cleanup_time:
return
self.last_cleanup = now
dirs = [self.tmp_dir, self.log_dir]
for d in dirs:
for f in os.listdir(d):
fn = os.path.join(d, f)
try:
if os.path.getmtime(fn) < now - self.cleanup_time:
os.unlink(fn)
except OSError:
pass
for f in os.listdir(self.cur_dir):
fn = os.path.join(self.cur_dir, f)
try:
if os.path.getmtime(fn) < now - self.cleanup_time:
self.requeue(f)
except OSError:
pass
###
# For producers
###
def add(self, data):
"""
Adds a new item to the queue.
"""
# write data to tmp
fd, tmp_name = tempfile.mkstemp(prefix="%i-%i-%i" % (self.started,
self.count, self.pid), dir=self.tmp_dir)
os.write(fd, data)
os.close(fd)
dst_name = os.path.join(self.new_dir, os.path.basename(tmp_name))
os.rename(tmp_name, dst_name)
self.count += 1
if self.producer_cleanup:
self.cleanup()
###
# For consumers
###
def pop(self, sorted=True):
"""
Moves an item from new into cur
Returns item_id, file handle
Returns None if queue is empty
If sorted is True, then the earliest item is returned
"""
self._check_to_requeue()
self.cleanup()
items = os.listdir(self.new_dir)
if sorted:
items.sort(
key=lambda f: os.path.getmtime(os.path.join(self.new_dir, f)))
for item in items:
try:
dst_name = os.path.join(self.cur_dir, item)
os.rename(os.path.join(self.new_dir, item), dst_name)
os.utime(dst_name, None)
return item, open(dst_name, 'rb')
except OSError:
pass
return None
def peek(self):
"""
Returns True if there are new items in the queue
"""
items = os.listdir(self.new_dir)
return len(items) > 0
def touch(self, item_id):
"""
Indicate that we're still working on this item
"""
fn = os.path.join(self.cur_dir, item_id)
try:
os.utime(fn, None)
except OSError:
# Somebody else moved this; that's probably ok
pass
def getcount(self, item_id):
"""
Returns how many times this item has been run
"""
try:
return int(item_id.split(".")[1])
except:
return 0
def getlogname(self, item_id):
if "." in item_id:
item_id = item_id.split(".")[0]
fn = os.path.join(self.log_dir, "%s.log" % item_id)
return fn
def getlog(self, item_id):
"""
Creates and returns a file object for a log file for this item
"""
return open(self.getlogname(item_id), "a+")
def log(self, item_id, msg):
self.getlog(item_id).write(msg + "\n")
def remove(self, item_id):
"""
Removes item_id from cur
"""
os.unlink(os.path.join(self.cur_dir, item_id))
def _check_to_requeue(self):
if not self.to_requeue:
return
now = time.time()
for t, item_id in self.to_requeue[:]:
if now > t:
self.requeue(item_id)
self.to_requeue.remove((t, item_id))
else:
self.touch(item_id)
def requeue(self, item_id, delay=None, max_retries=None):
"""
Moves item_id from cur back into new, incrementing the counter at the
end.
If delay is set, it is a number of seconds to wait before moving the
item back into new. It will remain in cur until the appropriate time
has expired.
The state for this is kept in the QueueDir instance, so if the instance
goes away, the item won't be requeued on schedule. It will eventually
be moved out of cur when the cleanup time expires however.
You must be call pop() at some point in the future for requeued items
to be processed.
"""
try:
bits = item_id.split(".")
core_item_id = ".".join(bits[:-1])
count = int(bits[-1]) + 1
except:
core_item_id = item_id
count = 1
if max_retries is not None and count > max_retries:
log.info("Maximum retry count exceeded; murdering %s", item_id)
self.murder(item_id)
return
if delay:
self.to_requeue.append((time.time() + delay, item_id))
self.to_requeue.sort()
return
dst_name = os.path.join(self.new_dir, "%s.%i" % (core_item_id, count))
try:
os.rename(os.path.join(self.cur_dir, item_id), dst_name)
os.utime(dst_name, None)
except OSError:
# Somebody else got to it first
pass
def murder(self, item_id):
"""
Moves item_id and log from cur into dead for future inspection
"""
dst_name = os.path.join(self.dead_dir, item_id)
os.rename(os.path.join(self.cur_dir, item_id), dst_name)
if os.path.exists(self.getlogname(item_id)):
dst_name = os.path.join(self.dead_dir, "%s.log" % item_id)
os.rename(self.getlogname(item_id), dst_name)
if pyinotify:
def wait(self, timeout=None):
"""
Waits for new items to arrive in new.
timeout is in seconds, and is the maximum amount of time to wait. we
might return before that.
"""
# Check if we have any items to requeue
if self.to_requeue:
reque_time = self.to_requeue[0][0] - time.time()
# Need to do something right now!
if reque_time < 0:
return
if timeout:
timeout = min(reque_time, timeout)
else:
timeout = reque_time
if timeout:
timeout *= 1000
log.debug("Sleeping for %s", timeout)
wm = pyinotify.WatchManager()
try:
wm.add_watch(self.new_dir, pyinotify.IN_MOVED_TO)
notifier = pyinotify.Notifier(wm, _MovedHandler())
notifier.check_events(timeout)
notifier.process_events()
finally:
wm.close()
else:
def wait(self, timeout=None):
"""
Waits for new items to arrive in new.
timeout is in seconds, and is the maximum amount of time to wait. we
might return before that.
"""
# Check if we have any items to requeue
if self.to_requeue:
reque_time = self.to_requeue[0][0] - time.time()
# Need to do something right now!
if reque_time < 0:
return
if timeout:
timeout = min(reque_time, timeout)
else:
timeout = reque_time
log.debug("Sleeping for %s", timeout)
start = time.time()
while True:
if self.peek():
return
time.sleep(1)
if timeout and time.time() - start > timeout:
return