| def threaded(function):
|
| """
|
| Simply starts a function in a thread.
|
| Adds it to an internal _threads list for handling.
|
| If join_wait is True, the thread will be joined before the function returns.
|
| This will also pass a queue "results" to the function, for returning the results.
|
| """
|
| def wrapper(self, *args, **kwargs):
|
| join_wait = kwargs.pop('join_wait', False)
|
|
|
| if not hasattr(self, '_threads'):
|
| self._threads = list()
|
|
|
| thread_exception = Queue()
|
| results = Queue()
|
|
|
| def exception_wrapper(*args, **kwargs):
|
| try:
|
| function(*args, **kwargs)
|
| except Exception as e:
|
| self.logger.warning("Exception in thread: %s" % function.__name__)
|
| thread_exception.put(e)
|
| self.logger.debug(e)
|
|
|
| if join_wait:
|
| kwargs['results'] = results
|
|
|
| thread = Thread(target=exception_wrapper, args=(self, *args), kwargs=kwargs, name=function.__name__)
|
| thread.start()
|
| self._threads.append((thread, thread_exception))
|
|
|
| if join_wait:
|
| thread.join()
|
| if not thread_exception.empty():
|
| raise thread_exception.get()
|
| return results.get()
|
| return wrapper
|
|
|
|
|
| def add_thread(name, target, description=None):
|
| """
|
| Adds a thread of a class which targets the target
|
| Creates a dict that contains the name of the thread as a key, with the thread as a value
|
| Cteates basic helper functions to manage the thread
|
| """
|
| def decorator(cls):
|
| def create_thread(self):
|
| if not hasattr(self, 'threads'):
|
| self.threads = dict()
|
|
|
| if "." in target:
|
| target_parts = target.split(".")
|
| target_attr = self
|
| for part in target_parts:
|
| target_attr = getattr(target_attr, part)
|
| else:
|
| target_attr = getattr(self, target)
|
|
|
| self.threads[name] = Thread(target=target_attr, name=description)
|
| self.logger.info("Created thread: %s" % name)
|
|
|
| def start_thread(self):
|
| thread = self.threads[name]
|
| setattr(self, f"_stop_processing_{name}", Event())
|
| if thread._is_stopped:
|
| self.logger.info("Re-creating thread")
|
| getattr(self, f"create_{name}_thread")()
|
| thread = self.threads[name]
|
|
|
| if thread._started.is_set() and not thread._is_stopped:
|
| self.logger.warning("%s thread is already started" % name)
|
| else:
|
| self.logger.info("Starting thread: %s" % name)
|
| thread.start()
|
| return True
|
|
|
| def stop_thread(self):
|
| thread = self.threads[name]
|
| dont_join = False
|
| if not thread._started.is_set() or thread._is_stopped:
|
| self.logger.warning("Thread is not active: %s" % name)
|
| dont_join = True
|
|
|
| if hasattr(self, f"_stop_processing_{name}"):
|
| self.logger.debug("Setting stop event for thread: %s" % name)
|
| getattr(self, f"_stop_processing_{name}").set()
|
|
|
| if hasattr(self, f"stop_{name}_thread_actions"):
|
| self.logger.debug("Calling: %s" % f"stop_{name}_thread_actions")
|
| getattr(self, f"stop_{name}_thread_actions")()
|
|
|
| if hasattr(self, f"_{name}_timer"):
|
| self.logger.info("Stopping the timer for thread: %s" % name)
|
| getattr(self, f"_{name}_timer").cancel()
|
|
|
| if not dont_join:
|
| self.logger.info("Waiting on thread to end: %s" % name)
|
| thread.join()
|
| return True
|
|
|
| setattr(cls, f"create_{name}_thread", create_thread)
|
| setattr(cls, f"start_{name}_thread", start_thread)
|
| setattr(cls, f"stop_{name}_thread", stop_thread)
|
|
|
| return update_init(create_thread)(cls)
|
| return decorator
|
|
|
|
|
| def thread_wrapped(thread_name):
|
| """
|
| Wrap a class function to be used with add_thread
|
| """
|
| def decorator(function):
|
| def wrapper(self, *args, **kwargs):
|
| self.logger.info("Starting the processing loop for thread: %s" % thread_name)
|
| while not getattr(self, f"_stop_processing_{thread_name}").is_set():
|
| function(self, *args, **kwargs)
|
| self.logger.info("The processing loop has ended for thread: %s" % thread_name)
|
| return wrapper
|
| return decorator
|