đźš© Create a free WhyLabs account to get more value out of whylogs!

Did you know you can store, visualize, and monitor whylogs profiles with theWhyLabs Observability Platform? Sign up for afree WhyLabs accountto leverage the power of whylogs and WhyLabs together!

Profiling with whylogs from a Kafka topic#

In this example we will show how you can profile and merge different profiles from a Kafka topic. To simplify our example and make it reproducible anywhere, we will create a Kafka topic, generate the data from an existing CSV file and ingest it, consume the messages from the topic and then profile these consumed messages.

NOTE: In order to get this example going, we will use Apache Zookeper and Apache Kafka locally with Docker Compose, so be sure to have it installed and ready in your environment. If you want to read more on how this YAML file was built, check out this blogpost.

To get things going, we will put the services up and create the topic in kafka with the following commands:

$ docker-compose up -d

% docker exec -ti kafka bash

root@kafka: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream

If you haven’t already, make sure to also install kafka-python and whylogs in your environment by uncommenting the following cell.

[1]:
# Note: you may need to restart the kernel to use updated packages.
%pip install whylogs
%pip install kafka-python

Generating Data#

To generate the data, we will fetch a small CSV file from a publicly available s3 endpoint and then use the KafkaProducer to send this data over to the topic we have created above

[2]:
import json
import os.path
import warnings

import pandas as pd
from kafka import KafkaProducer


warnings.simplefilter("ignore")

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))


data_url = "https://whylabs-public.s3.us-west-2.amazonaws.com/datasets/tour/current.csv"
full_data = pd.read_csv(os.path.join(data_url))

for i, row in full_data.iterrows():
    producer.send('whylogs-stream', row.to_dict())

Consuming the messages with KafkaConsumer#

[3]:
from kafka import KafkaConsumer, TopicPartition


consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

assignments = []
topics=['whylogs-stream']

for topic in topics:
    partitions = consumer.partitions_for_topic(topic)
    for p in partitions:
        print(f'topic {topic} - partition {p}')
        assignments.append(TopicPartition(topic, p))
consumer.assign(assignments)
topic whylogs-stream - partition 0

Profiling with whylogs#

For the sake of simplicity, we will build a pandas.DataFrame from the read messages and then profile and write profiles locally until there aren’t more messages in the topic. This is done with our log rotation implementation, which we will see in the code block below. You will also need a directory called “profiles”, which is the base where the logger will save profiles to, so let’s go ahead and create it as well.

[4]:
import whylogs as why
import pandas as pd


try:
    os.mkdir("profiles")
except FileExistsError as e:
    pass

consumer.seek_to_beginning()

total = 0
with why.logger(mode="rolling", interval=5, when="M", base_name="whylogs-kafka") as logger:
    logger.append_writer("local", base_dir="profiles")
    while True:
        finished = True
        record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)

        for k,v in record.items():
            print(f'{k} - {len(v)}')
            df = pd.DataFrame([row.value for row in v])
            logger.log(df)
            total += len(v)
            finished = False

        if finished:
            print(f"total {total}")
            break
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 100
TopicPartition(topic='whylogs-stream', partition=0) - 45
total 945
[5]:
import whylogs as why
from glob import glob

profiles_binaries = glob("profiles/*")
profiles_list = []

for profile in profiles_binaries:
    profiles_list.append(why.read(profile).view())
[6]:
from functools import reduce

merged_profile = reduce((lambda x, y: x.merge(y)), profiles_list)
[7]:
merged_profile.to_pandas()
[7]:
types/integral types/fractional types/boolean types/string types/object cardinality/est cardinality/upper_1 cardinality/lower_1 distribution/mean distribution/stddev ... distribution/q_75 distribution/q_90 distribution/q_95 distribution/q_99 counts/n counts/null type frequent_items/frequent_strings ints/max ints/min
column
Age 0 945 0 0 0 25.000001 25.001250 25.000000 31.609524 6.747796 ... 38.0000 41.000 42.0000 43.0000 945 0 SummaryType.COLUMN NaN NaN NaN
Customer ID 0 0 0 945 0 869.683985 881.067672 858.577213 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [FrequentItem(value='C268100', est=3, upper=2,... NaN NaN
Gender 0 0 0 945 0 2.000000 2.000100 2.000000 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [FrequentItem(value='M', est=489, upper=489, l... NaN NaN
Item Price 0 945 0 0 0 705.028228 714.256661 696.024282 79.848148 41.921716 ... 116.6000 138.200 145.1000 149.0000 945 0 SummaryType.COLUMN NaN NaN NaN
Product Category 0 0 0 945 0 6.000000 6.000300 6.000000 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [FrequentItem(value='Books', est=243, upper=24... NaN NaN
Product Subcategory 0 0 0 945 0 18.000001 18.000899 18.000000 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [FrequentItem(value='Mens', est=141, upper=141... NaN NaN
Quantity 945 0 0 0 0 10.000000 10.000500 10.000000 2.450794 2.279227 ... 4.0000 5.000 5.0000 5.0000 945 0 SummaryType.COLUMN [FrequentItem(value='2.000000', est=183, upper... 5.0 -5.0
Store Type 0 0 0 945 0 4.000000 4.000200 4.000000 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [FrequentItem(value='e-Shop', est=392, upper=3... NaN NaN
Total Amount 0 945 0 0 0 844.069184 855.117588 833.289540 214.615556 261.215174 ... 361.5560 580.346 656.8120 804.4400 945 0 SummaryType.COLUMN NaN NaN NaN
Total Tax 0 945 0 0 0 828.657950 839.504628 818.075123 25.664756 19.314519 ... 36.7605 57.834 62.7375 76.5975 945 0 SummaryType.COLUMN NaN NaN NaN
Transaction ID 0 0 0 945 0 935.275741 947.517988 923.331294 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [] NaN NaN
Transaction Type 0 0 0 945 0 2.000000 2.000100 2.000000 NaN NaN ... NaN NaN NaN NaN 945 0 SummaryType.COLUMN [FrequentItem(value='Purchase', est=859, upper... NaN NaN

12 rows Ă— 28 columns

[8]:
import shutil
shutil.rmtree("profiles")

And voilĂ ! With just a few lines of code we could profile and track incoming messages from a Kafka topic. Hopefully this tutorial will get you going for your existing streaming pipelines. If there are any other integrations you wanted to see, or maybe see how other users are getting the most out of whylogs, please check out our community Slack.