whylogs.api.logger.experimental.logger.actor.process_rolling_logger
#
Module Contents#
Classes#
Log data asynchronously using a separate process. |
|
Log data asynchronously using a separate process. |
Attributes#
- whylogs.api.logger.experimental.logger.actor.process_rolling_logger.DataTypes#
- whylogs.api.logger.experimental.logger.actor.process_rolling_logger.DictType#
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.WriterFactory#
- abstract create_writers(dataset_id: str) List[whylogs.api.writer.Writer] #
- Parameters
dataset_id (str) –
- Return type
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.WhyLabsWriterFactory#
Bases:
WriterFactory
- create_writers(dataset_id: str) List[whylogs.api.writer.Writer] #
- Parameters
dataset_id (str) –
- Return type
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.LoggerOptions#
-
- write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule]#
- schema: Optional[whylogs.core.schema.DatasetSchema]#
- thread_queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig#
- writer_factory: WriterFactory#
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.LoggerFactory#
- abstract create_logger(dataset_id: str, options: LoggerOptions) whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger #
- Parameters
dataset_id (str) –
options (LoggerOptions) –
- Return type
whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.ThreadLoggerFactory#
Bases:
LoggerFactory
- create_logger(dataset_id: str, options: LoggerOptions) whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger #
- Parameters
dataset_id (str) –
options (LoggerOptions) –
- Return type
whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.ThreadRollingLogger
- whylogs.api.logger.experimental.logger.actor.process_rolling_logger.BuiltinMessageTypes#
- whylogs.api.logger.experimental.logger.actor.process_rolling_logger.AdditionalMessages#
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.BaseProcessRollingLogger(aggregate_by: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity = TimeGranularity.Day, write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule] = Schedule(cadence=TimeGranularity.Minute, interval=5), schema: Optional[whylogs.core.schema.DatasetSchema] = None, sync_enabled: bool = False, current_time_fn: Optional[Callable[[], int]] = None, queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), thread_queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), writer_factory: WriterFactory = WhyLabsWriterFactory(), queue_type: whylogs.api.logger.experimental.logger.actor.process_actor.QueueType = QueueType.FASTER_FIFO, logger_factory: LoggerFactory = ThreadLoggerFactory())#
Bases:
whylogs.api.logger.experimental.logger.actor.process_actor.ProcessActor
[Union
[AdditionalMessages
,BuiltinMessageTypes
],whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessLoggerStatus
],whylogs.api.logger.experimental.logger.actor.data_logger.DataLogger
[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessLoggerStatus
],Generic
[AdditionalMessages
]Log data asynchronously using a separate process.
The ProcessRollingLogger is a rolling logger that manages a separate process to do the actual logging. This means it logs data over time and periodically uploads it in the background, using a separate process so that it doesn’t block the main one.
```python logger = ProcessRollingLogger(
aggregate_by=TimeGranularity.Day, write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5),
)
logger.start()
This class mostly wraps and manages several ThreadRollingLoggers that do the real logging with whylogs.
MAC USERS: You’ll run into issues running this on Python>=3.8 because Python will use spawn instead of fork. You should be able to get around it by setting the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in the environment that the process logger runs in, but you can’t set it in Python (no using os.environ).
Most of the arguments that are passed to the underlying loggers are considered the default options for those loggers. If you supply a logger_factory then you can override the options for each dataset id’s logger.
- Parameters
aggregate_by (whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity) – The time granularity to aggregate data by. This determines how the time bucketing is done. For the Hour type, the logger will end up pooling data into profiles by the hour.
write_schedule (Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule]) – The schedule to use for writing data. This is used to determine when to upload data.
schema (Optional[whylogs.core.schema.DatasetSchema]) – The DatasetSchema to use for whylogs under the hood.
sync_enabled (bool) – Whether to enable synchronous logging. If this is enabled then you can pass log(sync=True) to the log call. Without this you can’t use the sync flag.
queue_config (whylogs.api.logger.experimental.logger.actor.actor.QueueConfig) – Let’s you change various polling and timeout parameters.
thread_queue_config (whylogs.api.logger.experimental.logger.actor.actor.QueueConfig) – Same as queue_config, but for the wrapped ThreadRollingLoggers.
writer_factory (WriterFactory) – The writer factory to use for creating writers.
queue_type (whylogs.api.logger.experimental.logger.actor.process_actor.QueueType) – The type of queue to to manage multiprocessing. By default, faster_fifo is used because it’s a lot faster than the default multiprocessing queue, but you can use the built in mp.Queue by setting this to QueueType.MP.
current_time_fn (Optional[Callable[[], int]]) –
logger_factory (LoggerFactory) –
- process_batch(batch: List[Union[AdditionalMessages, BuiltinMessageTypes]], batch_type: Type[Union[AdditionalMessages, BuiltinMessageTypes]]) None #
- Parameters
batch (List[Union[AdditionalMessages, BuiltinMessageTypes]]) –
batch_type (Type[Union[AdditionalMessages, BuiltinMessageTypes]]) –
- Return type
- process_close_message(messages: List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) –
- Return type
- process_pubsub(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) –
- Return type
- process_pubsub_embedding(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) –
- Return type
- process_log_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) –
- Return type
- process_raw_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) –
- Return type
- process_log_embeddings_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) –
- Return type
- process_log_embeddings_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) –
- Return type
- process_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) –
- Return type
- process_flush_message(messages: Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]] = None) None #
- Parameters
messages (Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]]) –
- Return type
- log(data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData, timestamp_ms: Optional[int] = None, sync: bool = False, dataset_id: Optional[str] = None) None #
- class whylogs.api.logger.experimental.logger.actor.process_rolling_logger.ProcessRollingLogger(aggregate_by: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity = TimeGranularity.Day, write_schedule: Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule] = Schedule(cadence=TimeGranularity.Minute, interval=5), schema: Optional[whylogs.core.schema.DatasetSchema] = None, sync_enabled: bool = False, current_time_fn: Optional[Callable[[], int]] = None, queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), thread_queue_config: whylogs.api.logger.experimental.logger.actor.actor.QueueConfig = QueueConfig(), writer_factory: WriterFactory = WhyLabsWriterFactory(), queue_type: whylogs.api.logger.experimental.logger.actor.process_actor.QueueType = QueueType.FASTER_FIFO, logger_factory: LoggerFactory = ThreadLoggerFactory())#
Bases:
BaseProcessRollingLogger
[NoReturn
]Log data asynchronously using a separate process.
The ProcessRollingLogger is a rolling logger that manages a separate process to do the actual logging. This means it logs data over time and periodically uploads it in the background, using a separate process so that it doesn’t block the main one.
```python logger = ProcessRollingLogger(
aggregate_by=TimeGranularity.Day, write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5),
)
logger.start()
This class mostly wraps and manages several ThreadRollingLoggers that do the real logging with whylogs.
MAC USERS: You’ll run into issues running this on Python>=3.8 because Python will use spawn instead of fork. You should be able to get around it by setting the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in the environment that the process logger runs in, but you can’t set it in Python (no using os.environ).
Most of the arguments that are passed to the underlying loggers are considered the default options for those loggers. If you supply a logger_factory then you can override the options for each dataset id’s logger.
- Parameters
aggregate_by (whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity) – The time granularity to aggregate data by. This determines how the time bucketing is done. For the Hour type, the logger will end up pooling data into profiles by the hour.
write_schedule (Optional[whylogs.api.logger.experimental.logger.actor.time_util.Schedule]) – The schedule to use for writing data. This is used to determine when to upload data.
schema (Optional[whylogs.core.schema.DatasetSchema]) – The DatasetSchema to use for whylogs under the hood.
sync_enabled (bool) – Whether to enable synchronous logging. If this is enabled then you can pass log(sync=True) to the log call. Without this you can’t use the sync flag.
queue_config (whylogs.api.logger.experimental.logger.actor.actor.QueueConfig) – Let’s you change various polling and timeout parameters.
thread_queue_config (whylogs.api.logger.experimental.logger.actor.actor.QueueConfig) – Same as queue_config, but for the wrapped ThreadRollingLoggers.
writer_factory (WriterFactory) – The writer factory to use for creating writers.
queue_type (whylogs.api.logger.experimental.logger.actor.process_actor.QueueType) – The type of queue to to manage multiprocessing. By default, faster_fifo is used because it’s a lot faster than the default multiprocessing queue, but you can use the built in mp.Queue by setting this to QueueType.MP.
current_time_fn (Optional[Callable[[], int]]) –
logger_factory (LoggerFactory) –
- process_batch(batch: List[Union[AdditionalMessages, BuiltinMessageTypes]], batch_type: Type[Union[AdditionalMessages, BuiltinMessageTypes]]) None #
- Parameters
batch (List[Union[AdditionalMessages, BuiltinMessageTypes]]) –
batch_type (Type[Union[AdditionalMessages, BuiltinMessageTypes]]) –
- Return type
- process_close_message(messages: List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.actor.CloseMessage]) –
- Return type
- process_pubsub(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage]) –
- Return type
- process_pubsub_embedding(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage]) –
- Return type
- process_log_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage]) –
- Return type
- process_raw_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage]) –
- Return type
- process_log_embeddings_messages(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage]) –
- Return type
- process_log_embeddings_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict]) –
- Return type
- process_log_dicts(messages: List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) None #
- Parameters
messages (List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict]) –
- Return type
- process_flush_message(messages: Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]] = None) None #
- Parameters
messages (Optional[List[whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage]]) –
- Return type
- log(data: whylogs.api.logger.experimental.logger.actor.data_logger.TrackData, timestamp_ms: Optional[int] = None, sync: bool = False, dataset_id: Optional[str] = None) None #