whylogs.api.logger.experimental.logger.actor.pipe_signaler#

Module Contents#

Classes#

PipeSignaler

A thread that listens on a pipe for messages and signals the corresponding futures.

Attributes#

T

whylogs.api.logger.experimental.logger.actor.pipe_signaler.T#
class whylogs.api.logger.experimental.logger.actor.pipe_signaler.PipeSignaler#

Bases: threading.Thread, Generic[T]

A thread that listens on a pipe for messages and signals the corresponding futures. This class is used in the process logger to enable synchronous logging requests across processes. It’s essentially a dictionary of futures that are registered by the main process and signaled by the child process. A lot of the behavior is implicit because it involves properties of processes, so it’s worth documenting here. - This thread has to be started from the main process, which means it has to be started right before the

process logger is started (before the os.fork under the hood). It has to be started from the main process because the main process will be registering futures on it, and those can’t cross the process boundary.

  • The parent and child process each have references to the pipes and they each need to close their references,

    which means close_child has to be called from the child process and close has to be called from the parent. Calling close_child in the main processing code will have right effect.

  • The process actor does message batching so multiple ids may be signaled even though a single batch was processed

    because that batch could have contained multiple messages.

  • The signaler uses Events under the hood to know when to stop working. They can be th.Events even though this

    is being used in a multiprocessing environment because nothing the child does can affect them. Keep in mind that introducing any behavior on the child side that depends on knowing whether those events are set won’t work though, they would have to be switched to mp.Events for that.

This class should really never be used by anyone in most cases. It will just slow down the main process by making it wait for logging to complete, but it enables a lot of testing and debugging.

property name#

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

property ident#

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

property daemon#

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

signal(result: Tuple[str, Optional[Exception], Optional[T]]) None#

Signal that a message was handled by sending a tuple of (message id, exception, data). data and exception can be None. This should be called from the child process.

Parameters

result (Tuple[str, Optional[Exception], Optional[T]]) –

Return type

None

signal_many(results: List[Tuple[str, Optional[Exception], Optional[T]]]) None#
Parameters

results (List[Tuple[str, Optional[Exception], Optional[T]]]) –

Return type

None

register(future: Future[T], message_id: str) None#

Register a future to be signaled when the message id is received. This should be called from the parent process.

Parameters
  • future (Future[T]) –

  • message_id (str) –

Return type

None

run() None#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Return type

None

close() None#

Closes the thread and all resources. This should be called from the parent side.

Return type

None

start()#

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

join(timeout=None)#

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

is_alive()#

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

isDaemon()#
setDaemon(daemonic)#
getName()#
setName(name)#