whylogs.api.logger.experimental.logger.actor.process_rolling_logger#

Module Contents#

Classes#

WriterFactory

WhyLabsWriterFactory

LoggerOptions

LoggerFactory

ThreadLoggerFactory

BaseProcessRollingLogger

Log data asynchronously using a separate process.

ProcessRollingLogger

Log data asynchronously using a separate process.

Attributes#

whylogs.api.logger.experimental.logger.actor.process_rolling_logger.DataTypes#
whylogs.api.logger.experimental.logger.actor.process_rolling_logger.DictType#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.WriterFactory#
abstract create_writers(dataset_id: str) List[whylogs.api.writer.Writer]#
Parameters

dataset_id (str) –

Return type

List[whylogs.api.writer.Writer]

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.WhyLabsWriterFactory#

Bases: WriterFactory

create_writers(dataset_id: str) List[whylogs.api.writer.Writer]#
Parameters

dataset_id (str) –

Return type

List[whylogs.api.writer.Writer]

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.LoggerOptions#
aggregate_by: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity#
write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule]#
schema: Optional[whylogs.core.schema.DatasetSchema]#
sync_enabled: bool = False#
current_time_fn: Optional[Callable[[], int]]#
queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig#
thread_queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig#
writer_factory: WriterFactory#
queue_type: whylogs.api.logger.experimental.logger.actor.process_actor.QueueType#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.LoggerFactory#
abstract create_logger(dataset_id: str, options: LoggerOptions) whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger#
Parameters
Return type

whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.ThreadLoggerFactory#

Bases: LoggerFactory

create_logger(dataset_id: str, options: LoggerOptions) whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger#
Parameters
Return type

whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger

whylogs.api.logger.experimental.logger.actor.process_rolling_logger.BuiltinMessageTypes#
whylogs.api.logger.experimental.logger.actor.process_rolling_logger.AdditionalMessages#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.BaseProcessRollingLogger(aggregate_by: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity = TimeGranularity.Day, write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule] = Schedule(cadence=TimeGranularity.Minute, interval=5), schema: Optional[whylogs.core.schema.DatasetSchema] = None, sync_enabled: bool = False, current_time_fn: Optional[Callable[[], int]] = None, queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), thread_queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), writer_factory: WriterFactory = WhyLabsWriterFactory(), queue_type: whylogs.api.logger.experimental.logger.actor.process_actor.QueueType = QueueType.FASTER_FIFO, logger_factory: LoggerFactory = ThreadLoggerFactory())#

Bases: whylogs.api.logger.experimental.logger.actor.process_actor.ProcessActor[Union[AdditionalMessages, BuiltinMessageTypes], whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessLoggerStatus], whylogs.api.logger.experimental.logger.actor.data_logger.DataLogger[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessLoggerStatus], Generic[AdditionalMessages]

Log data asynchronously using a separate process.

The ProcessRollingLogger is a rolling logger that manages a separate process to do the actual logging. This means it logs data over time and periodically uploads it in the background, using a separate process so that it doesn’t block the main one.

```python logger = ProcessRollingLogger(

aggregate_by=TimeGranularity.Day, write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5),

)

logger.start()

logger.log(data_frame) ```

This class mostly wraps and manages several ThreadRollingLoggers that do the real logging with whylogs.

MAC USERS: You’ll run into issues running this on Python>=3.8 because Python will use spawn instead of fork. You should be able to get around it by setting the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in the environment that the process logger runs in, but you can’t set it in Python (no using os.environ).

Most of the arguments that are passed to the underlying loggers are considered the default options for those loggers. If you supply a logger_factory then you can override the options for each dataset id’s logger.

Parameters
process_batch(batch: List[Union[AdditionalMessages, BuiltinMessageTypes]], batch_type: Type[Union[AdditionalMessages, BuiltinMessageTypes]]) None#
Parameters
  • batch (List[Union[AdditionalMessages, BuiltinMessageTypes]]) –

  • batch_type (Type[Union[AdditionalMessages, BuiltinMessageTypes]]) –

Return type

None

process_close_message(messages: List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) –

Return type

None

process_pubsub(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) –

Return type

None

process_pubsub_embedding(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) –

Return type

None

process_log_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) –

Return type

None

process_raw_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) –

Return type

None

process_log_embeddings_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) –

Return type

None

process_log_embeddings_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) –

Return type

None

process_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) –

Return type

None

process_flush_message(messages: Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]] = None) None#
Parameters

messages (Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]]) –

Return type

None

log(data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData, timestamp_ms: Optional[int] = None, sync: bool = False, dataset_id: Optional[str] = None) None#
Parameters
  • data (whylogs.api.logger.experimental.logger.actor.data_logger.TrackData) –

  • timestamp_ms (Optional[int]) –

  • sync (bool) –

  • dataset_id (Optional[str]) –

Return type

None

flush() None#

Flush the internal state, causing everything to be written using the configured writers.

Return type

None

run() None#
Return type

None

close() None#
Return type

None

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.ProcessRollingLogger(aggregate_by: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity = TimeGranularity.Day, write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule] = Schedule(cadence=TimeGranularity.Minute, interval=5), schema: Optional[whylogs.core.schema.DatasetSchema] = None, sync_enabled: bool = False, current_time_fn: Optional[Callable[[], int]] = None, queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), thread_queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), writer_factory: WriterFactory = WhyLabsWriterFactory(), queue_type: whylogs.api.logger.experimental.logger.actor.process_actor.QueueType = QueueType.FASTER_FIFO, logger_factory: LoggerFactory = ThreadLoggerFactory())#

Bases: BaseProcessRollingLogger[NoReturn]

Log data asynchronously using a separate process.

The ProcessRollingLogger is a rolling logger that manages a separate process to do the actual logging. This means it logs data over time and periodically uploads it in the background, using a separate process so that it doesn’t block the main one.

```python logger = ProcessRollingLogger(

aggregate_by=TimeGranularity.Day, write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5),

)

logger.start()

logger.log(data_frame) ```

This class mostly wraps and manages several ThreadRollingLoggers that do the real logging with whylogs.

MAC USERS: You’ll run into issues running this on Python>=3.8 because Python will use spawn instead of fork. You should be able to get around it by setting the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in the environment that the process logger runs in, but you can’t set it in Python (no using os.environ).

Most of the arguments that are passed to the underlying loggers are considered the default options for those loggers. If you supply a logger_factory then you can override the options for each dataset id’s logger.

Parameters
process_batch(batch: List[Union[AdditionalMessages, BuiltinMessageTypes]], batch_type: Type[Union[AdditionalMessages, BuiltinMessageTypes]]) None#
Parameters
  • batch (List[Union[AdditionalMessages, BuiltinMessageTypes]]) –

  • batch_type (Type[Union[AdditionalMessages, BuiltinMessageTypes]]) –

Return type

None

process_close_message(messages: List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) –

Return type

None

process_pubsub(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) –

Return type

None

process_pubsub_embedding(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) –

Return type

None

process_log_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) –

Return type

None

process_raw_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) –

Return type

None

process_log_embeddings_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) –

Return type

None

process_log_embeddings_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) –

Return type

None

process_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) None#
Parameters

messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) –

Return type

None

process_flush_message(messages: Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]] = None) None#
Parameters

messages (Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]]) –

Return type

None

log(data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData, timestamp_ms: Optional[int] = None, sync: bool = False, dataset_id: Optional[str] = None) None#
Parameters
  • data (whylogs.api.logger.experimental.logger.actor.data_logger.TrackData) –

  • timestamp_ms (Optional[int]) –

  • sync (bool) –

  • dataset_id (Optional[str]) –

Return type

None

flush() None#

Flush the internal state, causing everything to be written using the configured writers.

Return type

None

run() None#
Return type

None

close() None#
Return type

None