Package madrona :: Package async :: Module ProcessHandler
[hide private]

Source Code for Module madrona.async.ProcessHandler

  1  from models import URLtoTaskID 
  2  from djcelery.models import TaskMeta 
  3  from django.template.loader import render_to_string 
  4   
  5  """ 
  6  NOTES: 
  7      url entries should be unique -- a single url maps to a single task_id in a one-to-one fashion 
  8      this strategy allows the asynchronous models to serve as a cache for those apps that wish to use it as such 
  9   
 10      check_status_or_begin does not check to see if process is already complete 
 11      if the caller wants the process to be run ONLY IF it hasn't been completed already, 
 12      the caller should check process_is_complete instead 
 13   
 14      process_is_running (called on its own or from check_status_or_begin or process_is_running_or_complete) checks 
 15      only if status == 'STARTED', this seems like the safest strategy although it does have it's shortcomings. 
 16      When a task is triggered, it is stored in the celery_taskmeta table with status == PENDING, as soon as  
 17      a worker is assigned to that task, the status becomes STARTED, when the task is complete, status becomes SUCCESS. 
 18      If something goes wrong during the STARTED phase (celeryd is shut down, etc), then status becomes FAILURE.  
 19      If celeryd is not communicating with ghettoq, or not running at all, then the status will remain as PENDING. 
 20      If we assume that PENDING means STARTED, then we may be mislead into thinking a task is being processed,  
 21      when in fact it is sitting idle.  This may only become apparent after waiting way-too-long for the task 
 22      to complete.   
 23      If we only check for STARTED, then each time the task is requested, we will add a new task to the queue.  This 
 24      may back up the queue, but only in situations when there is something wrong to begin with.  This will also 
 25      likely alert the user (or the developer) more immediately that something has gone wrong with celeryd (or ghettoq, 
 26      or whatever) since the returned statement will be something like 'the process has begun', even with subsequent 
 27      requests which should have changed the statement to something like 'the process is already running'.  There is 
 28      also the slight chance that a task will be requested in such quick succession that a worker will not yet be assigned 
 29      to that task and the status will still be PENDING, in which case a new task will be added to the queue.  This seems 
 30      a small price to pay for the more immediate recognition that there is indeed a problem.  As the developer in this 
 31      case, I'd rather get the impression of a False Negative (thinking there may be a problem, when in fact there is none), 
 32      than a False Positive (given the impression that everything is fine, when in fact there is a problem).  Also, the  
 33      user is more likely to notice a problem sooner in the case when there is a hint in the returned statement (that 
 34      the process has begun...yet again), than in the case when they must figure out there is a problem only because 
 35      the process is taking an inordinately long time (in many cases this could mean hours rather than minutes).   
 36  """ 
 37   
 38  ''' 
 39  begin_process starts the background process 
 40  if the process has already been run, the related URLtoTaskID entry is cleared and re-calculated 
 41  if the process is currently running, the related URLtoTaskID entry is cleared, and the process is started again 
 42  (in this case more than one identical process may be running at the same time -- to prevent this see NOTES above) 
 43  called by check_status_or_begin 
 44  ''' 
45 -def __begin_process(task_method, task_args=(), task_kwargs={}, polling_url=None, cache_results=True):
46 #see if task exists already 47 try: 48 URLtoTaskID.objects.get(url=polling_url).delete() 49 except: 50 pass 51 #initialize task 52 task = task_method.delay(*task_args, **task_kwargs) 53 if polling_url and cache_results: 54 URLtoTaskID(url=polling_url, task_id=task.task_id).save() 55 return task.task_id
56 57 ''' 58 check_status_or_begin 59 if the user has supplied neither the polling_url nor the task_id, it is assumed that they wish to start the process 60 (since no process_is_running check can be made without a process identifier anyway) 61 otherwise, if check_running is left to its default value of True, then a check will be made to see if the process is 62 already running, in which case a related message is returned to the user and the process is not re-started 63 if the process is not already running (or the user doesn't wish to check for that case), then the process will be 64 added to the queue, and a message indicating the process has begun will be returned to the user 65 if the process had already completed (status == SUCCESS rather than STARTED), it will be run again 66 if the caller wants the process to be run ONLY IF it hasn't been completed already, 67 the caller should check process_is_complete instead 68 '''
69 -def check_status_or_begin(task_method, task_args=(), task_kwargs={}, polling_url=None, task_id=None, check_running=True, cache_results=True):
70 if polling_url is None and task_id is None: 71 check_running = False 72 if check_running and process_is_running(polling_url, task_id): 73 if task_id is None: 74 task_id = get_taskid_from_url(polling_url) 75 return render_to_string('already_processing.html', {}), task_id 76 else: 77 task_id = __begin_process(task_method, task_args, task_kwargs, polling_url) 78 return render_to_string('starting_process.html', {}), task_id
79 80 #returns boolean based on process.status == 'STARTED' or 'SUCCESS' (currently running or complete)
81 -def process_is_running_or_complete(polling_url=None, task_id=None):
82 if process_is_running(polling_url, task_id) or process_is_complete(polling_url, task_id): 83 return True 84 else: 85 return False
86 87 #returns boolean based on whether process is in cache and marked as STARTED
88 -def process_is_running(polling_url=None, task_id=None):
89 result = __get_asyncresult(polling_url, task_id) 90 if result is not None and result.status == 'STARTED': 91 return True 92 else: 93 return False
94 95 #returns boolean based on whether process is in cache and marked as PENDING 96 # i.e. it has been sent to the queue but processing has not yet begun
97 -def process_is_pending(polling_url=None, task_id=None):
98 result = __get_asyncresult(polling_url, task_id) 99 if result is not None and result.status == 'PENDING': 100 return True 101 else: 102 return False
103 104 #returns boolean value based on result=='SUCCESS' from celery table
105 -def process_is_complete(polling_url=None, task_id=None):
106 result = __get_asyncresult(polling_url, task_id) 107 if result is not None and result.status == 'SUCCESS': 108 return True 109 else: 110 return False
111 112 #returns status string
113 -def get_process_status(polling_url=None, task_id=None):
114 result = __get_asyncresult(polling_url, task_id) 115 if result is not None: 116 return result.status 117 else: 118 return None
119 120 #returns result.result from celery table
121 -def get_process_result(polling_url=None, task_id=None):
122 result = __get_asyncresult(polling_url, task_id) 123 if result is not None: 124 return result.result 125 else: 126 return None
127 128 #get task_id from URLtoTaskID table
129 -def get_taskid_from_url(polling_url):
130 try: 131 entry = URLtoTaskID.objects.get(url=polling_url) 132 return entry.task_id 133 except: 134 #raise ValueError("Given URL does not map to any known task_id") 135 return None
136 137 #get url from URLtoTaskID table
138 -def get_url_from_taskid(task_id):
139 try: 140 entry = URLtoTaskID.objects.get(task_id=task_id) 141 except: 142 raise ValueError("Given task_id does not map to any known URL") 143 return entry.url
144 145 #get the AsyncResult object associated with the given (directly or indirectly) task_id 146 #(this object provides us access to the status field)
147 -def __get_asyncresult(polling_url=None, task_id=None):
148 if polling_url == task_id == None: 149 raise ValueError("Either polling_url or task_id must be passed a value") 150 if task_id is None: 151 task_id = get_taskid_from_url(polling_url) 152 from celery import result 153 result = result.AsyncResult(task_id) 154 if result.task_id == None: 155 return None 156 return result
157