Fugue (Spark, Ray, Dask) Integration#

Open in Colab

Hi! Perhaps you’re already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing Spark, Dask or Ray clusters or existing Databricks, Coiled or Anyscale platforms. Well, glad you’ve made it here, because this is what we are going to cover in this example notebook 😃

If you wish to have other insights on how to use whylogs, feel free to check our other existing examples, as they might be extremely useful!

For detailed questions regarding Fugue, please join Fugue’s Slack channel: Slack Status

Installing the extra dependency#

As we want to enable users to have exactly what they need to use from whylogs, the pyspark integration comes as an extra dependency. In order to have it available, install according to the following table:

Run Whylogs on …

Installation Command

Any Spark cluster (including Databricks Notebooks)

pip install 'whylogs[fugue]' 'fugue[spark]'

Databricks (remote access)

pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]'

Any Ray cluster (including Anyscale Notebooks)

pip install 'whylogs[fugue]' 'fugue[ray]'

Anyscale (remote access)

pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]'

Any Dask cluster

pip install 'whylogs[fugue]' 'fugue[dask]'

Coiled

pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]'

For example, in this notebook we are using a local Spark cluster, so we should:

[ ]:
# Note: you may need to restart the kernel to use updated packages.
%pip install 'whylogs[fugue]' 'fugue[spark]'

The following environment variable should NOT need to be set in your own environment.

[1]:
import os

os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"

Constructing a dataset#

[2]:
import pandas as pd
import numpy as np

n = 100
np.random.seed(0)
tdf = pd.DataFrame(
    dict(
        a=np.random.choice([1, 2, 3], n),
        b=np.random.choice(["a", "b"], n),
        c=np.random.random(n),
        d=np.random.choice(["xy", "z"], n),
    )
)
tdf.to_parquet("/tmp/test.parquet")
tdf
[2]:
a b c d
0 1 a 0.533206 xy
1 2 b 0.230533 z
2 1 a 0.394869 z
3 2 b 0.618809 z
4 2 b 0.474868 xy
... ... ... ... ...
95 1 b 0.904425 xy
96 3 a 0.645785 z
97 1 a 0.324683 xy
98 2 b 0.519711 z
99 3 a 0.000055 z

100 rows × 4 columns

Profiling using Whylogs + Fugue#

The simplest way to use profile is equivalent to use why.log(df).view()

[3]:
from whylogs.api.fugue import fugue_profile

fugue_profile(tdf).to_pandas()
[3]:
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf counts/n counts/nan counts/null distribution/max distribution/mean distribution/median ... distribution/stddev frequent_items/frequent_strings ints/max ints/min type types/boolean types/fractional types/integral types/object types/string
column
a 3.000000 3.0 3.000150 0 100 0 0 3.000000 1.880000 2.000000 ... 0.807540 [FrequentItem(value='1', est=39, upper=39, low... 3.0 1.0 SummaryType.COLUMN 0 0 100 0 0
b 2.000000 2.0 2.000100 0 100 0 0 NaN 0.000000 NaN ... 0.000000 [FrequentItem(value='a', est=57, upper=57, low... NaN NaN SummaryType.COLUMN 0 0 0 0 100
c 100.000025 100.0 100.005018 0 100 0 0 0.992396 0.499929 0.487838 ... 0.294085 NaN NaN NaN SummaryType.COLUMN 0 100 0 0 0
d 2.000000 2.0 2.000100 0 100 0 0 NaN 0.000000 NaN ... 0.000000 [FrequentItem(value='xy', est=53, upper=53, lo... NaN NaN SummaryType.COLUMN 0 0 0 0 100

4 rows × 30 columns

We can select the columns for profiling

[4]:
fugue_profile(tdf, profile_cols=["c","d"]).to_pandas()
[4]:
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf counts/n counts/nan counts/null distribution/max distribution/mean distribution/median ... distribution/q_95 distribution/q_99 distribution/stddev type types/boolean types/fractional types/integral types/object types/string frequent_items/frequent_strings
column
c 100.000025 100.0 100.005018 0 100 0 0 0.992396 0.499929 0.487838 ... 0.970237 0.992396 0.294085 SummaryType.COLUMN 0 100 0 0 0 NaN
d 2.000000 2.0 2.000100 0 100 0 0 NaN 0.000000 NaN ... NaN NaN 0.000000 SummaryType.COLUMN 0 0 0 0 100 [FrequentItem(value='xy', est=53, upper=53, lo...

2 rows × 28 columns

Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession:

[6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

If we want to profile the pandas df on Spark:

[7]:
fugue_profile(tdf, engine=spark)

[7]:
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7effd21e5f90>

If we want to profile a SparkDataFrame:

[8]:
spark_df = spark.createDataFrame(tdf)
fugue_profile(spark_df)
[8]:
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7effd1ee4910>

We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):

[9]:
fugue_profile("/tmp/test.parquet", engine=spark)
[9]:
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7effd1ef9240>

It is very similar to profile datasets or files using other backends, there will be detailed guides in the later sections.

Profiling on logical partitions#

If we want to profile tdf grouped by columns a and b

[10]:
fugue_profile(tdf, partition={"by":["a","b"]})
[10]:
a b __whylogs_df_profile_view
0 1 a b'WHY1\x00\xf6\x02\n\x0e \xb3\x93\x93\x80\xda0...
1 1 b b'WHY1\x00\xf6\x02\n\x0e \xc6\x93\x93\x80\xda0...
2 2 a b'WHY1\x00\xf6\x02\n\x0e \xd6\x93\x93\x80\xda0...
3 2 b b'WHY1\x00\xf6\x02\n\x0e \xe5\x93\x93\x80\xda0...
4 3 a b'WHY1\x00\xf6\x02\n\x0e \xf3\x93\x93\x80\xda0...
5 3 b b'WHY1\x00\xf6\x02\n\x0e \x82\x94\x93\x80\xda0...

We can also control the output profile field:

[11]:
res = fugue_profile(tdf, partition={"by":["a","b"]}, profile_field="x")
res
[11]:
a b x
0 1 a b'WHY1\x00\xf6\x02\n\x0e \xf7\xb1\x93\x80\xda0...
1 1 b b'WHY1\x00\xf6\x02\n\x0e \x89\xb2\x93\x80\xda0...
2 2 a b'WHY1\x00\xf6\x02\n\x0e \x99\xb2\x93\x80\xda0...
3 2 b b'WHY1\x00\xf6\x02\n\x0e \xa8\xb2\x93\x80\xda0...
4 3 a b'WHY1\x00\xf6\x02\n\x0e \xb7\xb2\x93\x80\xda0...
5 3 b b'WHY1\x00\xf6\x02\n\x0e \xc5\xb2\x93\x80\xda0...

Here is how to retrieve the views:

[12]:
from whylogs import DatasetProfileView

res.x.apply(DatasetProfileView.deserialize)
[12]:
0    <whylogs.core.view.dataset_profile_view.Datase...
1    <whylogs.core.view.dataset_profile_view.Datase...
2    <whylogs.core.view.dataset_profile_view.Datase...
3    <whylogs.core.view.dataset_profile_view.Datase...
4    <whylogs.core.view.dataset_profile_view.Datase...
5    <whylogs.core.view.dataset_profile_view.Datase...
Name: x, dtype: object

When we profile a large number of partitions using a distributed backend and don’t want to collect them on a local machine, we can keep the output as the native distributed dataframe, for example:

[13]:
fugue_profile(tdf, partition={"by":["a","b"]}, engine=spark, as_local=False) # returns a native pyspark dataframe
[13]:
DataFrame[a: bigint, b: string, __whylogs_df_profile_view: binary]

We may also directly save the output to a file distributedly:

[14]:
fugue_profile(tdf, partition={"by":["a","b"]}, save_path="/tmp/output1.parquet", engine=spark)
fugue_profile("/tmp/test.parquet", partition={"by":["a","b"]}, save_path="/tmp/output2.parquet", engine=spark)

[14]:
'/tmp/output2.parquet'
[22]:
!ls /tmp/output*.parquet
/tmp/output1.parquet:
_SUCCESS  part-00000-ed074f68-68ff-42a8-a003-57e4a0275767-c000.snappy.parquet

/tmp/output2.parquet:
_SUCCESS
part-00000-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet
part-00001-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet
part-00002-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet
part-00003-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet

Using fugue_profile in the Fugue API way#

Fugue API is a collection of standalone platform agnostic functions for common big data operations. fugue_profile is following the same style and can be used like one of them. In the previous example we do data profiling from file to file with one function. While it is compact, it overloads the function, making it hard to read and maintain. Instead, if we follow the best practice, we can do this alternatively:

[21]:
import fugue.api as fa

with fa.engine_context(spark):
    df = fa.load("/tmp/test.parquet")
    res = fugue_profile(df, partition={"by":["a","b"]})
    fa.save(res, "/tmp/output2.parquet")

Execution wise, it is the same as the previous example. But this makes one line of code do one thing, which is a better coding style

Visualization in FugueSQL#

Whylogs profile visualization is a auto-registered extension in Fugue. The namespace is why and the extension name is viz.

Here is how you use the extension:

[11]:
import whylogs.api.fugue.registry  # you don't really need to import this explicitly, the registration is automatic
import fugue.api as fa

fa.fugue_sql_flow("""
-- visualize a single dataframe's profile
OUTPUT df USING why:viz
-- compare profiles, must set reference and target
OUTPUT target=df, reference=df USING why:viz
""", df = tdf).run();

If running using a distributed backend, the profling will be done by fugue_profile

[12]:
fa.fugue_sql_flow("""
df = LOAD "/tmp/test.parquet"

OUTPUT USING why:viz

OUTPUT target=df, reference=df USING why:viz
""").run(spark);

Performance Tips#

Spark#

Please use Fugue >= 0.8.0, which enables Pandas UDF by default

It is also beneficial to enabled pandas UDF on Spark to get better performance. We need to follow this instruction to enable spark.sql.execution.arrow.pyspark.enabled.

When we profile a dataframe without logical partition, we may control the number of partitions in order to control the parallelism:

fugue_profile(..., partition={"num": 200}, engine=spark)

If we don’t specify num then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be the number of CPUs of the Spark cluster.

When we profile a dataframe with logical partitions, we can also be explicit on how many physical partitions to use:

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=spark)

But the convention in Spark is to set spark.shuffle.partitions when starting the Spark cluster. And an ideal number should be 2-4 times of the total CPUs.

Ray#

When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:

fugue_profile(..., partition={"num": 200}, engine="ray")

If we don’t specify num then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be 1. So in Ray, it is always a good idea to be explicit about ``num``

When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine="ray")

Dask#

When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:

fugue_profile(..., partition={"num": 200}, engine=dask_client)

If we don’t specify num then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be a small number representing the local CPUs. So in Dask, it is always a good idea to be explicit about ``num``

When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=dask_client)

No matter in Spark, Ray or Dask, no matter which way to set ``num``, setting it to 2 times of the total cluster CPUs will in general work very well.

Accessing distributed platforms#

In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as:

fugue_profile("s3://<path>", engine="db:<databricks_cluster_id>")
fugue_profile("s3://<path>", engine="<anyscale_cluster_uri>")
fugue_profile("s3://<path>", engine="coiled:<coiled_cluster_id>")

For details of each platform, please read the instructions for Databricks, Anyscale and Coiled