đźš© Create a free WhyLabs account to get more value out of whylogs!

Did you know you can store, visualize, and monitor whylogs profiles with theWhyLabs Observability Platform? Sign up for afree WhyLabs accountto leverage the power of whylogs and WhyLabs together!

Rolling Logs with Streaming Data#

Open in Colab

Now that you’ve become family with the “Getting Started” and the basic examples, let’s see what else whylogs can be used for! So far, you’ve seen it ingest rows and dataframes during the logging process, but now let’s look at ways to handle large amounts of changing data such as streaming with … rolling logs! (sometimes also called log rotation)

Instead of needing to plan out how you log in intervals with batching we handle all of that for you. The Logger will create your session, log information at the requested intervals of seconds, minutes, hours, or days and at that interval write out your profile to a .bin file and flush the log getting ready to receive more data.

Why would you want this?#

In production, however, data flows into the model in real-time, unbounded by business hours or other natural boundaries.

There are at least two challenges to monitoring continuous streams of data. The first is segmenting the stream so intermediate results can be made available. The second is scaling the monitoring solution to match the rate at which data is consumed with the rate at which data enters the stream. Monitoring with whylogs can easily meet both these challenges, and still provide complete statistical profile of the entire stream.

By using the rolling logs you can get data as it’s coming in and it will ingest at whatever rate the data enters without have any slow down or memory problems.

We recommend that you have multiple intervals per timeline of your analysis. For example, if you want to look at the changes daily taking it at least hourly will help get a good profile estimation. Doing it too frequently where a profile may only have a couple lines is not preferred so play around with the balance that is right for your needs.

Simple Example using Bitcoin Ticker#

To start off, let’s see how logging works; this will be an extremely basic example to show the syntax. We’ll get data from BlockChain’s ticker as this Jupyter notebook runs. To make you not wait for too long I’ll have it run while constantly gathering data and rolling over the file every 20 seconds. This will give enough data for an example for the notebook without making you wait too long.

The data picked is just a pull of the json API from the given website being used over time. This allows for easy streaming into a Jupyter that is quick and consistently changing, but in reality this is where you’d want to hook up your predictive models, larger data, CSV, etc.

Imports#

First let’s make sure we have everything installed and ready for input. We will be using the file structure to record the .bin files, and “psutil” to get the CPU information.

If you don’t have whylogs installed already, let’s install it:

[1]:
# Note: you may need to restart the kernel to use updated packages.
%pip install whylogs;
Requirement already satisfied: whylogs in /home/jamie/projects/v1/whylogs/python/.venv/lib/python3.8/site-packages (1.1.32)
Requirement already satisfied: whylogs-sketching>=3.4.1.dev3 in /home/jamie/projects/v1/whylogs/python/.venv/lib/python3.8/site-packages (from whylogs) (3.4.1.dev3)
Requirement already satisfied: protobuf>=3.19.4 in /home/jamie/projects/v1/whylogs/python/.venv/lib/python3.8/site-packages (from whylogs) (4.22.1)
Requirement already satisfied: typing-extensions>=3.10 in /home/jamie/projects/v1/whylogs/python/.venv/lib/python3.8/site-packages (from whylogs) (4.5.0)
Note: you may need to restart the kernel to use updated packages.
[2]:
import os
from os import listdir
from os.path import isfile

import pandas as pd
import random
import time
import datetime
import whylogs as why

tmp_path = "example_output"
input_path = "mock_input"
cwd = os.getcwd()

if not os.path.isdir(tmp_path):
    os.makedirs(tmp_path)

if not os.path.isdir(input_path):
    os.makedirs(input_path)

Here is a super simple function to see the amount of files that are here before and after the logging.

[3]:
def count_files(tmp_path):
    only_files = [f for f in listdir(tmp_path) if isfile(os.path.join(tmp_path, f))]
    return len(only_files)

print(count_files(tmp_path))
0

Now it’s on to the actual logging! We will first create the logger, mark it as “rolling”, and set the interval in terms of Seconds, Minutes, Hours, or Days. Lastly we want to make sure we give it the base file name, and create a writer. For this example we will be using the local writer to put files on the local system. The following will be broken into two sections: Production and Playground.

In Production you’ll see code that is more in line with what you’d see in an every day environment. This will still need to be customized for your use case as the time period of a log is dependent on how often your data is pulled and how often you’ll be observing. Although you’re more than welcome to run this it will take quite a while as typically you’d be logging over a dedicated time span such as hours or days or further.

In Playground you’ll get to use our example at fast speed. This will be modified to run continuously. This is the best place to try things out and learn more about how the logging works.

In both examples you’ll see a with which enables your data to be written on exit even if it’s not at the interval time.

Data Set#

Alright, I know blockchain is big right now, but that’s not why we picked it. We wanted to have an very fast, allows up ticker so the play ground could be messed with at any time of the day. This public data source allows us to do just that. Now you don’t need to be a blockchain user or enthusiast at all. This ticker is just like the US stocks or Currency exchange, all it’s doing is showing the exchange rate for certain type of bitcoin in USD. The code block below shows an example of one of the messages.

In the comments you’ll see a placeholder where you’d add your ML model and log it’s output into whylogs as well!

Please note, we don’t do anything directly with block chains or bitcoins in any way.

[4]:
%%writefile mock_input/moc_message.json
{
  "ARS": {
    "last": 4629222.76
  },
  "AUD": {
    "last": 29167.28
  },
  "BRL": {
    "last": 105202.49
  },
  "CAD": {
    "last": 26189.14
  },
  "CHF": {
    "last": 19424.96
  },
  "CLP": {
    "last": 1.816303349e7
  },
  "CNY": {
    "last": 146442.74
  },
  "CZK": {
    "last": 481208.62
  },
  "DKK": {
    "last": 155741.55
  },
  "EUR": {
    "last": 19170.13
  },
  "GBP": {
    "last": 16520.15
  },
  "HKD": {
    "last": 158850.21
  },
  "HRK": {
    "last": 147280.03
  },
  "HUF": {
    "last": 8483533.11
  },
  "INR": {
    "last": 1592864.31
  },
  "ISK": {
    "last": 5805855.21
  },
  "JPY": {
    "last": 2765175.89
  },
  "KRW": {
    "last": 2.654004532e7
  },
  "NZD": {
    "last": 32247.65
  },
  "PLN": {
    "last": 89876.17
  },
  "RON": {
    "last": 72010.47
  },
  "RUB": {
    "last": 1115075.7
  },
  "SEK": {
    "last": 204322.49
  },
  "SGD": {
    "last": 28201.47
  },
  "THB": {
    "last": 715312.44
  },
  "TRY": {
    "last": 354510.87
  },
  "TWD": {
    "last": 2.63539628e7
  },
  "USD": {
    "last": 20270.23
  }
}
Writing mock_input/moc_message.json
[5]:
example_path = os.path.join("mock_input", "moc_message.json")
example_df = pd.read_json(example_path)
example_df
[5]:
ARS AUD BRL CAD CHF CLP CNY CZK DKK EUR ... NZD PLN RON RUB SEK SGD THB TRY TWD USD
last 4629222.76 29167.28 105202.49 26189.14 19424.96 18163033.49 146442.74 481208.62 155741.55 19170.13 ... 32247.65 89876.17 72010.47 1115075.7 204322.49 28201.47 715312.44 354510.87 26353962.8 20270.23

1 rows Ă— 28 columns

Example#

This example will be more like what you’ll see in an environment. Imagine we want to see our data every hour you’ll want to have at logs every 5-15 min. It’s a balance between making sure you log often enough for better analysis, but not so often that you’re rolling the log over with little or no logs in in it.

If you choose to run the example below you’ll have it run roll the log every 15 min with simulated traffic comming in through the moc json we created above. We use a random in the test_driver to simulate how really traffic can come quickly or with big games. If you’d like to try it with live data just set the live_feed=True and it will pull in live data from the ticker to work with.

[6]:
class MyApp:
    def __init__(self):
        # example of the rolilng logger at a 15 min interval
        self.logger = why.logger(mode="rolling", interval=15, when="M",
                                base_name="message_profile_")
        # write to our local path, there are other writers though
        self.logger.append_writer("local", base_dir="example_output")

        self.dataset_logged=0    # this is simple for our logging

    def close(self):
        # On exit the rest of the logging will be saved
        self.logger.close()

    def consume(self, data_df):
        self.logger.log(data_df)     # log it into our data set profile
        self.dataset_logged += 1

        ## fancy_output = fancy_ml.predict(data_df),    use your ML model
        ## app.logger.log(fancy_output),                log your ML output as well

        # We are printing the log to stdout for the example, substitute how you work with logging
        print("Inputs Processed: " + str(app.dataset_logged) +
              "    Dataset Files Written to Local: " + str(count_files(tmp_path)))
[7]:
def data_feeder(live_feed=False):
    # Feel free to turn this on to play with live data
    if live_feed:
        url = "https://blockchain.info/ticker"
        data_df = pd.read_json(url)
        data_df = data_df.drop(['15m', 'sell','buy', 'symbol'])
    # brings in moc messages as show in the Data Section
    else:
        example_path = os.path.join("mock_input", "moc_message.json")
        data_df = pd.read_json(example_path)
        data_df
    return data_df

# This driver mimics data coming in from a source.
def test_driver(running_app, live_feed=False):
    for i in range(30):
        data_df = data_feeder(live_feed)
        running_app.consume(data_df)
        # This is only random to simulate how data
        # would be coming in at unknown times.
        time.sleep(random.randrange(0, 2))
[8]:
app = MyApp()
test_driver(app)
app.close()
Inputs Processed: 1    Dataset Files Written to Local: 0
Inputs Processed: 2    Dataset Files Written to Local: 0
Inputs Processed: 3    Dataset Files Written to Local: 0
Inputs Processed: 4    Dataset Files Written to Local: 0
Inputs Processed: 5    Dataset Files Written to Local: 0
Inputs Processed: 6    Dataset Files Written to Local: 0
Inputs Processed: 7    Dataset Files Written to Local: 0
Inputs Processed: 8    Dataset Files Written to Local: 0
Inputs Processed: 9    Dataset Files Written to Local: 0
Inputs Processed: 10    Dataset Files Written to Local: 0
Inputs Processed: 11    Dataset Files Written to Local: 0
Inputs Processed: 12    Dataset Files Written to Local: 0
Inputs Processed: 13    Dataset Files Written to Local: 0
Inputs Processed: 14    Dataset Files Written to Local: 0
Inputs Processed: 15    Dataset Files Written to Local: 0
Inputs Processed: 16    Dataset Files Written to Local: 0
Inputs Processed: 17    Dataset Files Written to Local: 0
Inputs Processed: 18    Dataset Files Written to Local: 0
Inputs Processed: 19    Dataset Files Written to Local: 0
Inputs Processed: 20    Dataset Files Written to Local: 0
Inputs Processed: 21    Dataset Files Written to Local: 0
Inputs Processed: 22    Dataset Files Written to Local: 0
Inputs Processed: 23    Dataset Files Written to Local: 0
Inputs Processed: 24    Dataset Files Written to Local: 0
Inputs Processed: 25    Dataset Files Written to Local: 0
Inputs Processed: 26    Dataset Files Written to Local: 0
Inputs Processed: 27    Dataset Files Written to Local: 0
Inputs Processed: 28    Dataset Files Written to Local: 0
Inputs Processed: 29    Dataset Files Written to Local: 0
Inputs Processed: 30    Dataset Files Written to Local: 0

Next steps - the .bin#

Congrats! Now you’ve got data safely stored away, but what exactly are these .bin files? As you are logging datasets the session tracks many inputs (done through why.log()) into a dataset profile. When we use the rolling logger it will write out the dataset profile to the .bin then flush it to start logging again. This allows you to have that data safely stored in an incremental fashion which you can then merge back together as one piece, individually, or any number.

For example let’s bring up the just one of the files to see what’s in it.

[9]:
# Get the first file
all_files = [f for f in os.listdir(tmp_path) if not f.startswith('.')]
if len(all_files) > 0:
    example = all_files[0]
    path = os.path.join(tmp_path, example)

    # This .bin can be read using the path
    result_view = why.read(path).view()
    result_view.to_pandas()

Merging Profiles from .bin#

Ok, so we have saved .bin!! Huzzah! These .bins can now be loaded into whylogs individually or merged for any time frame you’d like to analyze. Head over to “Merging Profiles” to understand how.

What’s next?#