
Messages that the process logger can process.

These types are core types that you can send to the process logger. They are either subclasses of TypedDict or wrappers around serialized byte versions of those dicts. Everything in here is a TypedDict because we use orjson to deserialize them for performance reasons and that library prefers to output everything as dictionaries.

The dataclass containers for those messages have helper methods to extract/deserialize the dicts. It’s important to not raise exceptions in those data classes because the actor does a lot of large batch processing and exceptions would result in losing the entire batch. Instead, they return None and log errors if there is some issue deserializing or validating.

Module Contents#



dict() -> new empty dictionary


dict() -> new empty dictionary


dict() -> new empty dictionary


dict() -> new empty dictionary


dict() -> new empty dictionary







A message can be sent synchronously if it has an id and it has a sync flag set to True.




dict() -> new empty dictionary




data_dict_from_pandas(→ DataDict)

get_columns(→ List[str])

log_dict_to_data_frame(→ Tuple[pandas.DataFrame, int])

log_dict_to_embedding_matrix(→ Tuple[Dict[str, ...)

reduce_log_requests(→ LogRequestDict)

Reduce requests, assuming that each request has the same columns.

reduce_embeddings_request(→ LogEmbeddingRequestDict)

determine_dataset_timestamp(→ Optional[int])


class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.DataDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

columns: List[str]#
data: List[List[DataTypes]]#

D.clear() -> None. Remove all items from D.


D.copy() -> a shallow copy of D


Return the value for key if key is in the dictionary, else default.


D.items() -> a set-like object providing a view on D’s items


D.keys() -> a set-like object providing a view on D’s keys


D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised


Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.


Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.


D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]


D.values() -> an object providing a view on D’s values

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.data_dict_from_pandas(df: pandas.DataFrame) DataDict#

df (pandas.DataFrame) –

Return type


class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

datasetId: str#
timestamp: Optional[int]#
multiple: DataDict#

D.clear() -> None. Remove all items from D.


D.copy() -> a shallow copy of D


Return the value for key if key is in the dictionary, else default.


D.items() -> a set-like object providing a view on D’s items


D.keys() -> a set-like object providing a view on D’s keys


D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised


Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.


Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.


D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]


D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

datasetId: str#
timestamp: Optional[int]#
embeddings: Dict[str, List[DataTypes]]#

D.clear() -> None. Remove all items from D.


D.copy() -> a shallow copy of D


Return the value for key if key is in the dictionary, else default.


D.items() -> a set-like object providing a view on D’s items


D.keys() -> a set-like object providing a view on D’s keys


D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised


Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.


Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.


D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]


D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.PubSubMessage#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

attributes: Dict[str, str]#
data: str#
message_id: str#
publish_time: str#

D.clear() -> None. Remove all items from D.


D.copy() -> a shallow copy of D


Return the value for key if key is in the dictionary, else default.


D.items() -> a set-like object providing a view on D’s items


D.keys() -> a set-like object providing a view on D’s keys


D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised


Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.


Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.


D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]


D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.PubSubDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

subscription: str#
message: PubSubMessage#
log_request: LogRequestDict#

D.clear() -> None. Remove all items from D.


D.copy() -> a shallow copy of D


Return the value for key if key is in the dictionary, else default.


D.items() -> a set-like object providing a view on D’s items


D.keys() -> a set-like object providing a view on D’s keys


D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised


Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.


Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.


D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]


D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.DebugMessage#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessLoggerStatus#
statuses: Dict[str, whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.LoggerStatus]#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessStatusMessage#
id: str#
timeout: float = 1.0#
sync: bool = True#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage#
request_time: int#
log: LogRequestDict#
id: str#
sync: bool = False#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.SyncMessage#

Bases: Protocol

A message can be sent synchronously if it has an id and it has a sync flag set to True. It doesnt magically make the message synchronous, but allows us to create a synchronous convenience method for that message type. See log and status on the ProcessRollingLogger.

id: str#
sync: bool#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage#
request: bytes#

Bytes that represent json stringified LogRequestDict

request_time: int#
id: str#
sync: bool = False#
to_log_request_dict() Optional[LogRequestDict]#
Return type


whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.get_columns(request: Union[LogRequestDict, LogEmbeddingRequestDict]) List[str]#

request (Union[LogRequestDict, LogEmbeddingRequestDict]) –

Return type


class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage#
request: bytes#
request_time: int#
to_pubsub_message() Optional[PubSubDict]#
Return type


class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.PubSubEmbeddingDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

subscription: str#
message: PubSubMessage#
log_embedding_request: LogEmbeddingRequestDict#

D.clear() -> None. Remove all items from D.


D.copy() -> a shallow copy of D


Return the value for key if key is in the dictionary, else default.


D.items() -> a set-like object providing a view on D’s items


D.keys() -> a set-like object providing a view on D’s keys


D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised


Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.


Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.


D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]


D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage#
request: bytes#
request_time: int#
to_pubsub_embedding_message() Optional[PubSubEmbeddingDict]#
Return type


class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage#
request: bytes#
request_time: int#
to_log_embeddings_request_dict() Optional[LogEmbeddingRequestDict]#
Return type


whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.log_dict_to_data_frame(request: LogRequestDict) Tuple[pandas.DataFrame, int]#

request (LogRequestDict) –

Return type

Tuple[pandas.DataFrame, int]

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.log_dict_to_embedding_matrix(request: LogEmbeddingRequestDict) Tuple[Dict[str, numpy.ndarray[Any, Any]], int]#

request (LogEmbeddingRequestDict) –

Return type

Tuple[Dict[str, numpy.ndarray[Any, Any]], int]

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.reduce_log_requests(acc: LogRequestDict, cur: LogRequestDict) LogRequestDict#

Reduce requests, assuming that each request has the same columns. That assumption should be enforced before this is used by grouping by set of columns.

Return type


whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.reduce_embeddings_request(acc: LogEmbeddingRequestDict, cur: LogEmbeddingRequestDict) LogEmbeddingRequestDict#
Return type


whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.determine_dataset_timestamp(cadence: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity, request: Union[LogRequestDict, LogEmbeddingRequestDict]) Optional[int]#
Return type
