whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages#

Messages that the process logger can process.

These types are core types that you can send to the process logger. They are either subclasses of TypedDict or wrappers around serialized byte versions of those dicts. Everything in here is a TypedDict because we use orjson to deserialize them for performance reasons and that library prefers to output everything as dictionaries.

The dataclass containers for those messages have helper methods to extract/deserialize the dicts. It’s important to not raise exceptions in those data classes because the actor does a lot of large batch processing and exceptions would result in losing the entire batch. Instead, they return None and log errors if there is some issue deserializing or validating.

Module Contents#

Classes#

DataDict

dict() -> new empty dictionary

LogRequestDict

dict() -> new empty dictionary

LogEmbeddingRequestDict

dict() -> new empty dictionary

PubSubMessage

dict() -> new empty dictionary

PubSubDict

dict() -> new empty dictionary

DebugMessage

FlushMessage

ProcessLoggerStatus

ProcessStatusMessage

LogMessage

SyncMessage

A message can be sent synchronously if it has an id and it has a sync flag set to True.

RawLogMessage

RawPubSubMessage

PubSubEmbeddingDict

dict() -> new empty dictionary

RawPubSubEmbeddingMessage

RawLogEmbeddingsMessage

Functions#

data_dict_from_pandas(→ DataDict)

get_columns(→ List[str])

log_dict_to_data_frame(→ Tuple[pandas.DataFrame, int])

log_dict_to_embedding_matrix(→ Tuple[Dict[str, ...)

reduce_log_requests(→ LogRequestDict)

Reduce requests, assuming that each request has the same columns.

reduce_embeddings_request(→ LogEmbeddingRequestDict)

determine_dataset_timestamp(→ Optional[int])

Attributes#

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.DataTypes#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.DataDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

columns: List[str]#
data: List[List[DataTypes]]#
clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.data_dict_from_pandas(df: pandas.DataFrame) DataDict#
Parameters

df (pandas.DataFrame) –

Return type

DataDict

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogRequestDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

datasetId: str#
timestamp: Optional[int]#
multiple: DataDict#
clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogEmbeddingRequestDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

datasetId: str#
timestamp: Optional[int]#
embeddings: Dict[str, List[DataTypes]]#
clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.PubSubMessage#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

attributes: Dict[str, str]#
data: str#
message_id: str#
publish_time: str#
clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.PubSubDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

subscription: str#
message: PubSubMessage#
log_request: LogRequestDict#
clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.DebugMessage#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.FlushMessage#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessLoggerStatus#
statuses: Dict[str, whylogs.api.logger.experimental.logger.actor.thread_rolling_logger.LoggerStatus]#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.ProcessStatusMessage#
id: str#
timeout: float = 1.0#
sync: bool = True#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.LogMessage#
request_time: int#
log: LogRequestDict#
id: str#
sync: bool = False#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.SyncMessage#

Bases: Protocol

A message can be sent synchronously if it has an id and it has a sync flag set to True. It doesnt magically make the message synchronous, but allows us to create a synchronous convenience method for that message type. See log and status on the ProcessRollingLogger.

id: str#
sync: bool#
class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogMessage#
request: bytes#

Bytes that represent json stringified LogRequestDict

request_time: int#
id: str#
sync: bool = False#
to_log_request_dict() Optional[LogRequestDict]#
Return type

Optional[LogRequestDict]

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.get_columns(request: Union[LogRequestDict, LogEmbeddingRequestDict]) List[str]#
Parameters

request (Union[LogRequestDict, LogEmbeddingRequestDict]) –

Return type

List[str]

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubMessage#
request: bytes#
request_time: int#
to_pubsub_message() Optional[PubSubDict]#
Return type

Optional[PubSubDict]

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.PubSubEmbeddingDict#

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

subscription: str#
message: PubSubMessage#
log_embedding_request: LogEmbeddingRequestDict#
clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If key is not found, default is returned if given, otherwise KeyError is raised

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawPubSubEmbeddingMessage#
request: bytes#
request_time: int#
to_pubsub_embedding_message() Optional[PubSubEmbeddingDict]#
Return type

Optional[PubSubEmbeddingDict]

class whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.RawLogEmbeddingsMessage#
request: bytes#
request_time: int#
to_log_embeddings_request_dict() Optional[LogEmbeddingRequestDict]#
Return type

Optional[LogEmbeddingRequestDict]

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.log_dict_to_data_frame(request: LogRequestDict) Tuple[pandas.DataFrame, int]#
Parameters

request (LogRequestDict) –

Return type

Tuple[pandas.DataFrame, int]

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.log_dict_to_embedding_matrix(request: LogEmbeddingRequestDict) Tuple[Dict[str, numpy.ndarray[Any, Any]], int]#
Parameters

request (LogEmbeddingRequestDict) –

Return type

Tuple[Dict[str, numpy.ndarray[Any, Any]], int]

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.reduce_log_requests(acc: LogRequestDict, cur: LogRequestDict) LogRequestDict#

Reduce requests, assuming that each request has the same columns. That assumption should be enforced before this is used by grouping by set of columns.

Parameters
Return type

LogRequestDict

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.reduce_embeddings_request(acc: LogEmbeddingRequestDict, cur: LogEmbeddingRequestDict) LogEmbeddingRequestDict#
Parameters
Return type

LogEmbeddingRequestDict

whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages.determine_dataset_timestamp(cadence: whylogs.api.logger.experimental.logger.actor.time_util.TimeGranularity, request: Union[LogRequestDict, LogEmbeddingRequestDict]) Optional[int]#
Parameters
Return type

Optional[int]