This paste expires on 2023-08-14 15:31:48.796417. Repaste, or download this paste. . Pasted through v1-api.

def threaded(function):
    """
    Simply starts a function in a thread.
    Adds it to an internal _threads list for handling.
    If join_wait is True, the thread will be joined before the function returns.
      This will also pass a queue "results" to the function, for returning the results.
    """
    def wrapper(self, *args, **kwargs):
        join_wait = kwargs.pop('join_wait', False)
        if not hasattr(self, '_threads'):
            self._threads = list()
        thread_exception = Queue()
        results = Queue()
        def exception_wrapper(*args, **kwargs):
            try:
                function(*args, **kwargs)
            except Exception as e:
                self.logger.warning("Exception in thread: %s" % function.__name__)
                thread_exception.put(e)
                self.logger.debug(e)
        if join_wait:
            kwargs['results'] = results
        thread = Thread(target=exception_wrapper, args=(self, *args), kwargs=kwargs, name=function.__name__)
        thread.start()
        self._threads.append((thread, thread_exception))
        if join_wait:
            thread.join()
            if not thread_exception.empty():
                raise thread_exception.get()
            return results.get()
    return wrapper
def add_thread(name, target, description=None):
    """
    Adds a thread of a class which targets the target
    Creates a dict that contains the name of the thread as a key, with the thread as a value
    Cteates basic helper functions to manage the thread
    """
    def decorator(cls):
        def create_thread(self):
            if not hasattr(self, 'threads'):
                self.threads = dict()
            if "." in target:
                target_parts = target.split(".")
                target_attr = self
                for part in target_parts:
                    target_attr = getattr(target_attr, part)
            else:
                target_attr = getattr(self, target)
            self.threads[name] = Thread(target=target_attr, name=description)
            self.logger.info("Created thread: %s" % name)
        def start_thread(self):
            thread = self.threads[name]
            setattr(self, f"_stop_processing_{name}", Event())
            if thread._is_stopped:
                self.logger.info("Re-creating thread")
                getattr(self, f"create_{name}_thread")()
                thread = self.threads[name]
            if thread._started.is_set() and not thread._is_stopped:
                self.logger.warning("%s thread is already started" % name)
            else:
                self.logger.info("Starting thread: %s" % name)
                thread.start()
                return True
        def stop_thread(self):
            thread = self.threads[name]
            dont_join = False
            if not thread._started.is_set() or thread._is_stopped:
                self.logger.warning("Thread is not active: %s" % name)
                dont_join = True
            if hasattr(self, f"_stop_processing_{name}"):
                self.logger.debug("Setting stop event for thread: %s" % name)
                getattr(self, f"_stop_processing_{name}").set()
            if hasattr(self, f"stop_{name}_thread_actions"):
                self.logger.debug("Calling: %s" % f"stop_{name}_thread_actions")
                getattr(self, f"stop_{name}_thread_actions")()
            if hasattr(self, f"_{name}_timer"):
                self.logger.info("Stopping the timer for thread: %s" % name)
                getattr(self, f"_{name}_timer").cancel()
            if not dont_join:
                self.logger.info("Waiting on thread to end: %s" % name)
                thread.join()
            return True
        setattr(cls, f"create_{name}_thread", create_thread)
        setattr(cls, f"start_{name}_thread", start_thread)
        setattr(cls, f"stop_{name}_thread", stop_thread)
        return update_init(create_thread)(cls)
    return decorator
def thread_wrapped(thread_name):
    """
    Wrap a class function to be used with add_thread
    """
    def decorator(function):
        def wrapper(self, *args, **kwargs):
            self.logger.info("Starting the processing loop for thread: %s" % thread_name)
            while not getattr(self, f"_stop_processing_{thread_name}").is_set():
                function(self, *args, **kwargs)
            self.logger.info("The processing loop has ended for thread: %s" % thread_name)
        return wrapper
    return decorator
Filename: stdin. Size: 4kb. View raw, , hex, or download this file.