Module markov.common.markov_queue.queue_consumer_thread

Classes

class QueueConsumerConfig (batch_size: int, batch_processing_interval: int, register_batch_timeout: int, sync_data_timeout: int)

The config class required by QueueConsumerThread to configure consumption of queue elements.

Attributes

batch_size : int
QueueConsumerThread consumes the queue elements in batches. This attribute configures the number of elements consumed in one go.
batch_processing_interval : int
The time interval in which exactly one batch of queue elements is consumed.
register_batch_timeout : int
The timeout in seconds for consumption of one batch of queue elements.
sync_data_timeout : int
QueueConsumerThread gives the ability to consume all remaining elements in queue at once. sync_data_timeout is the timeout in seconds for consumption of all remaining queue elements.

Class variables

var batch_processing_interval : int
var batch_size : int
var register_batch_timeout : int
var sync_data_timeout : int
class QueueConsumerThread (queue_wrapper: PersistAckQueueWrapper[~QUEUE_MESSAGE_TYPE], consume_method: Callable[[Any], bool], tpe: concurrent.futures.thread.ThreadPoolExecutor, config: QueueConsumerConfig, spinner: halo.halo.Halo = None)

A consumer thread that will consume elements from a queue batch by batch using the provided consume_method

Attributes

queue_wrapper
The wrapper around the queue that needs to be consumed.
consume_method : Callable
A method that consumes a batch of queue elements and returns whether consumption is successful or not.
tpe
ThreadPoolExecutor so that each call to consume_method can be done in a separate thread.
config
The config which configures batch_size, frequency of each batch and timeouts.
spinner
Optional argument to print text to the user interface.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Ancestors

  • threading.Thread
  • typing.Generic

Methods

def is_success(self) ‑> bool
def run(self)

Schedules the consumption of batches of queue elements.

The scheduling works in two phases: - When flush_event is NOT set: One batch of queue elements will be consumed every config.batch_processing_interval seconds - When flush_event is set: All queue elements will be consumed serially.

def sync_remaining_elements(self)