whylogs.api.logger.experimental.logger.actor.thread_rolling_logger#

Module Contents#

Classes#

DatasetProfileContainer

A container that abstracts over different types of profiles.

TrackMessage

Send some data to be tracked.

FlushMessage

Trigger a flush, converting all managed profiles to result sets and attempt to write them if there are writers.

GetResultsMessage

LoggerStatus

Various status metrics.

StatusMessage

Get various status metrics.

PendingWritable

ThreadRollingLogger

A logger that manages profiles and segments for various dataset timestamps.

Attributes#

whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.pd: Any#
whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.Row#
class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.DatasetProfileContainer(dataset_timestamp: int, schema: Optional[whylogs.core.DatasetSchema])#

A container that abstracts over different types of profiles.

This does the work of deciding how to track data and how to create profiles given a DatasetSchema. This can only be used to manage a single entity for a given time. For example, this can represent a normal DatasetProfile or segment that has a given dataset timestamp.

Parameters
has_segments() bool#
Return type

bool

track(data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData) None#

Track data against the contained profile or segment.

Parameters

data (whylogs.api.logger.experimental.logger.actor.data_logger.TrackData) –

Return type

None

to_result_set() whylogs.api.logger.result_set.ResultSet#

Get the ResultSet of the contained profile/segment.

This doesn’t have any side effects. It generates a ResultSet of whatever is inside when this is called.

Return type

whylogs.api.logger.result_set.ResultSet

to_views() List[whylogs.core.DatasetProfileView]#
Return type

List[whylogs.core.DatasetProfileView]

to_serialized_views() List[bytes]#
Return type

List[bytes]

class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.TrackMessage#

Send some data to be tracked.

data#

The data to be tracked.

timestamp_ms#

The time in milliseconds when the data occurred.

result#

an optional Future that is fulfilled when the track has completed. It will either be a success (None) or a failure (Exception).

data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData#
timestamp_ms: int#
result: Optional[Future[None]]#
class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.FlushMessage#

Trigger a flush, converting all managed profiles to result sets and attempt to write them if there are writers.

class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.GetResultsMessage#
result: Future[Dict[int, List[DatasetProfileView]]]#
class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.LoggerStatus#

Various status metrics.

This returns various metadata about the current state. Useful for logging, testing, and debugging.

dataset_timestamps#

The amount of dataset timestamps being managed. Each of these will map to either a profile or a segment.

dataset_profiles#

The amount of dataset profiles being managed. One of these is created for each time period that the logger is configured to manage. For example, if the logger is configured to aggregate by hour and TrackMessages come in for two hours, then there will be two of these.

segment_caches#

Same as dataset_profiles, but for segments.

writers#

Amount of writers that the logger is configured to have.

pending_writables#

The amount of items that have been flushed but have not yet been written.

dataset_timestamps: int#
dataset_profiles: int#
segment_caches: int#
writers: int#
pending_writables: int#
pending_views: List[bytes]#
views: List[bytes]#
class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.StatusMessage#

Get various status metrics.

result: Future[LoggerStatus]#
class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.PendingWritable#
attempts: int#
writable: whylogs.api.writer.writer.Writable#
whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.LoggerMessage#
class whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger(aggregate_by: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity = TimeGranularity.Hour, write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule] = Schedule(cadence=TimeGranularity.Minute, interval=10), schema: Optional[whylogs.core.DatasetSchema] = None, writers: List[whylogs.api.writer.Writer] = [], current_time_fn: Optional[Callable[[], int]] = None, queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig())#

Bases: whylogs.api.logger.experimental.logger.actor.thread_actor.ThreadActor[LoggerMessage], whylogs.api.logger.experimental.logger.actor.data_logger.DataLogger[LoggerStatus]

A logger that manages profiles and segments for various dataset timestamps.

This logger manages a map of dataset timestamp to dataset profile/segment and handles proper logging to each type. Given a TimeGranularity to aggregate by, for each call to track(), roughly the following will happen:

  • The timestamp_ms will be truncated to the start of the day/hour (depending on aggregate_by). This

    is the dataset timestamp.

  • That dataset timestamp is used as the key to either create a dataset profile/segment, or to add

    the current data to.

The logger also periodically attempts to write out the internal state according to the write_schedule. It will attempt to write three times before considering a result set unwritable and dropping it. o

The logger is associated with one or no dataset schema as well. That will determine if the logger creates normal profiles or segments internally, among other things.

Parameters
process_batch(batch: List[LoggerMessage], batch_type: Type[LoggerMessage]) None#
Parameters
  • batch (List[LoggerMessage]) –

  • batch_type (Type[LoggerMessage]) –

Return type

None

status(timeout: Optional[float] = None) LoggerStatus#

Get the status of the logger. This is always synchronous.

Parameters

timeout (Optional[float]) –

Return type

LoggerStatus

log(data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData, timestamp_ms: Optional[int] = None, sync: bool = False) None#

Log some data.

Parameters
  • data (whylogs.api.logger.experimental.logger.actor.data_logger.TrackData) – The data to log. This can either be a pandas data frame, a row (dictionary of str to str/int/float/etc), or a list of rows.

  • timestamp_ms (Optional[int]) – The timestamp of the data. If this isn’t supplied then it is assumed to have happened now.

  • sync (bool) – Whether or not to perform this action synchronously. By default, this is an asynchronous operation. You can make this synchronous in order to react to errors. Mostly useful when initially setting up logging since the only errors that can be responded to are data format related.

Return type

None

flush() None#

Flush the internal state, causing everything to be written using the configured writers.

Return type

None

get_profile_views() Dict[int, List[whylogs.core.DatasetProfileView]]#

Get all of the profile views for each dataset timestamp being maintained.

Return type

Dict[int, List[whylogs.core.DatasetProfileView]]

close() None#

Close the logger, causing all resources to be released.

Return type

None