Package hela

You probably already have your data job scripts version controlled, but what about your data catalog? The answer: write your data catalog as code! Storing your data catalog and data documentation as code makes your catalog searchable, referenceable, reliable, platform agnostic, sets you up for easy collaboration and much more! This library is built to fit small and large data landscapes, but is happiest when included from the start.

Hela (or Hel) is the norse mythological collector of souls, and the Swedish word for "whole" or "all of it". Hela is designed to give everyone a chance to build a data catalog, with a low entry barrier: pure python code.

Overview

The catalog package consists of four primary components:

  • Catalog: The eponymous class of this package. This inheritable class holds your entire catalog together, and you can build trees of datasets in catalogs in catalogs.
  • BaseDataset: An inheritable dataset class, the second cornerstone of the package. Depending on how much time you want to invest in this data catalog, it is within your own datasets you would write the most code. See PandasParquetDataset for examples.
  • Col & NestedCol: The leaves of your beautiful catalog tree. These are referenceable, reusable and (preferably) well documented column objects.
  • column_store(): The most generous store filled with columns used in multiple datasets of your catalog landscape.

And let's not forget the crown of this beautiful tree:

  • generate_webpage() Giving you the possibility to democratize and share your catalog with all recipients you want to. Serve the site wherever you can host a static index.html file such github pages.

One schema to rule them all

With high probability you have at some point have stumbled upon a situation where you have the same type of data, represented in multiple locations of different formats. Be it JSON, a database, Parquet files or BigQuery, usually a datapoint called e.g. weekday will mean the same no matter where you are. With catalog you can make sure these datapoints are of the same type, and described the same no matter the source.

Let's say you have an API that dumps JSON into some kind of blob storage. You want to dump this data into your BigQuery table and ensure that you have the correct schema end-to-end. Using the same dataset (or list of columns) you can generate a schema for both BigQuery and JSON:

from hela import Col, schema_generators
from hela.data_types import String, Int
columns = [
    Col('product_name', String(), 'The name of the product.'),
    NestedCol('ratings', [
        Col('taste', Int(), 'A taste rating of 1-5'),
        Col('design', Int(), 'A design rating of 1-5')
    ])
]
# Generates BigQuery schema (using BigQuery SDK)
bigquery_schema = schema_generators.bigquery_schema(columns)
# Generates JSON schema (according to json-schema.org)
json_schema = schema_generators.json_schema(columns)

Or if you have some data stored in parquet read by spark, with overlapping columns stored in S3 managed by AWS Glue:

from hela import Col, NestedCol, schema_generators, column_store
from hela.data_types import String, Int

@column_store()
class MyStore:
    product_name = Col('product_name', String(), 'The name of the product.')

glue_columns = [
    MyStore.product_name,
    Col('nbr_sold', Int(), 'Number sold of a specific product.')
]
spark_columns = [
    MyStore.product_name,
    Col('product_id', Int(), 'Integer identificator of a specific product.')
]
# Generates glue schema (using AWS CDK)
schema_generators.aws_glue_schema(glue_columns)
# Generate spark schema (using pyspark)
schema_generators.spark_schema(spark_columns)

Getting Started

Setting up, reference the infer module here?

When building your data catalog it is recommended to keep the folder structure in line with how the data will be structure in your data lake/warehouse as the example below (for a complete example see the showcase repo).

my_catalog/
├── rich_descriptions/
│   ├── orders.md
│   └── ...
├── MyDatasets/
│   ├── best_dataset.py
│   └── ...
├── MyOtherDatasets/
│   ├── decent_dataset.py
│   └── ...
├── my_catalog.py
└── my_column_store.py

The next step is to build your own dataset, this is where you can put most of your code when it comes functionality such as:

  • Authentication and permissions
  • Connections and configs
  • Write & Load functionality
  • Various partitioning and optimization logic

Important is to inherit the BaseDataset class and shadow/hard-code any of the init fields required.

from hela import BaseDataset, Col
from hela.data_types import String

class MyDatasetClass(BaseDataset):
    def __init__(
        self,
        name: str, # Required
        description: str, # Optional but recommended
        columns: list, # Optional but recommended
        rich_description_path: str = None, # Optional, used for web app
        partition_cols: list = None,  # Optional but recommended
        # folder: str = None, # Only do one of either folder or database
        database: str, # Optional, can also be enriched via Catalog
    ) -> None:
        super().__init__(
            name,
            data_type='bigquery',
            folder=None,
            database=database,
            description=description,
            rich_description_path=rich_description_path,
            partition_cols=partition_cols,
            dependencies=None,
            columns=columns
        )
        # Do more of your own init stuff

    def my_func(self) -> None:
        # Your own dataset function
        pass

# Now instantiate your dataset class with one example column
my_dataset = MyDatasetClass('my_dataset', 'An example dataset.', [
    Col('my_column', String(), 'An example column.')
])

Now that you have a dataset class, and instantiated your first dataset, you can start populating your data catalog.

from hela import Catalog

class MyCatalog(Catalog):
    my_dataset = my_dataset

That's it! You now have a small catalog to keep building on. To view it as a web page you can add the following code to a python script, and in the future add it in whichever CI/CD tool you use:

from hela import generate_webpage

generate_webpage(MyCatalog, output_folder='.')

For further reading check out:

Highlights

In the sections below you will find some important highlights of quality-of-life improvements given by the catalog package!

Iterate through datasets

Let's say you want to change the type of your column best_column from a string to an integer everywhere the column is used, you can do that by fetching all datasets that includes best_column using Catalog.get_columns_datasets(), then execute your query on these datasets:

from my_package import MyCatalog
columns_datasets_dict = MyCatalog.get_columns_datasets()
for dataset in columns_datasets_dict['best_column']:
    dataset.query('your schema changing spark query')

Anticipate errors before they happen

Everyone knows how difficult it is to name things, especially when managing multiple datasets across many similar domains. Catalog helps you keep your standards in check by making sure no column is unknowingly duplicated between different datasets.

To combat this there is a pre-built hela.test_suite module filled with helper functions. The best way to use these functions is to include them in your package test setup (e.g. pytest). For example make sure no column name is duplicated using validate_no_duplicated_columns().

On the other hand, sometimes as you build your catalog you find columns you would want to have the same name, as they might include the same type of information. In these cases we can only rely on that the descriptions are similar enough to get a hit using validate_description_similarity().

Notebook interactivity

With hela you don't even have to leave your favorite notebook tool to study your data catalog! The Catalog and BaseDataset classes have built in functions that will in a notebook environment display informations such as:

Columns within the catalog

This functionality also extends into sub-catalogs.

Show Columns functionality

Which dates a dataset is available on

This functionality requires BaseDataset.get_dates() function implemented.

Date availability grid from show_dates function

Advanced

[Page under construction]

Sometimes things work almost, but not exactly, the way you want. Here is a brief guide on how to modify the behaviour among a variety of topics. If you improve something that you believe could be useful for other people as well, please consider contributing.

Coming soon:

  • Build your own schema generators
  • Build your own data types
Expand source code
"""
.. include:: ../gh_pages/hela.md
"""

from hela._catalog_class import Catalog
from hela._column_classes import Col, NestedCol
from hela._base_dataset import BaseDataset
from hela._column_store_class import column_store
from hela.web_page.generate import generate_webpage


__all__ = [
    'Catalog',
    'BaseDataset',
    'column_store',
    'Col',
    'NestedCol',
    'generate_webpage'
]

Sub-modules

hela.data_types

Module consisting of all pre-built data types.

hela.datasets

Module with pre-built datasets for demonstrational purposes.

hela.errors

Module with custom errors.

hela.infer

Includes functions to infer Catalog schemas on various data structures.

hela.math

Module for math and statistics related functions.

hela.plots

Module for plot functions.

hela.schema_generators

Module used to translate from catalog schema to other schema types.

hela.test_suite

Module covering the test suite to make sure your catalog is set up properly …

hela.web_page

This module includes function to generate a data catalog web page.

Functions

def column_store(cls=None, label: str = None) ‑> object

Decorator to used to flag a class as a column store.

A column store is a referencable class used when multiple datasets use the same column. In order to ensure that this column is purposefully duplicated among datasets we check that any duplicated column must originate from the same column store.

Args

label
This string label will be passed down to all column objects within the store.

Returns

The decorated class.

Examples

>>> from hela import column_store, Col
>>> from hela.data_types import String
>>> @column_store(label='cool_columns')
>>> class MyStore:
...     my_column = Col('my_column', String(), 'Example column')
>>> MyStore.my_column
Expand source code
def column_store(cls=None, label: str = None) -> object:
    """Decorator to used to flag a class as a column store.

    A column store is a referencable class used when multiple datasets use the same column.
    In order to ensure that this column is purposefully duplicated among datasets we
    check that any duplicated column must originate from the same column store.

    Args:
        label: This string label will be passed down to all column objects within the store.

    Returns:
        The decorated class.


    Examples:
        >>> from hela import column_store, Col
        >>> from hela.data_types import String
        >>> @column_store(label='cool_columns')
        >>> class MyStore:
        ...     my_column = Col('my_column', String(), 'Example column')
        >>> MyStore.my_column
    """

    def wrap(cls):
        return _make_column_store(cls, label=label)

    # This is triggered when called as @column_store() (with parentheses)
    if cls is None:
        return wrap

    # This is triggered when called as @column_store (no parentheses)
    # Only allow triggered with parenthesis, this will keep IDE hints.
    raise ValueError('A column store can only be decorated with called method: `@column_store()`')
def generate_webpage(catalogs: Union[hela._catalog_class.Catalog, Sequence[hela._catalog_class.Catalog]], output_path: str, overwrite_existing: bool = False, include_samples: bool = False, web_app_title: str = 'Catalog') ‑> None

Generates an index.html file that can be used as a data catalog website.

Include a python script implementing this function in your CI/CD pipeline, outputting an index.html file that you can then use to share your data catalog (e.g. on github pages). For an example see (TODO: insert example repo link here).

Args

catalogs
One or multiple objects inheriting the Catalog class. If you have a tree of catalogs, only the root catalog is required.
output_path
The folder where index.html file should end up.
overwrite_existing
Flag whether and potential index.html file should be overwritten if existing.
include_samples
Flag whether to attempt to fetch sample datapoints from the columns in each dataset. Requires BaseDataset.get_samples() function implemented.
web_app_title
Optional title of the web app.

Raises

FileExistsError
If the index.html file already exists under output_path and overwrite_existing=False.

Examples:

>>> from my_catalog import MyCatalog
>>> from hela import generate_webpage
>>> generate_webpage(MyCatalog, '.', overwrite_existing=True)
Expand source code
def generate_webpage(
    catalogs: Union[Catalog, Sequence[Catalog]],
    output_path: str,
    overwrite_existing: bool = False,
    include_samples: bool = False,
    web_app_title: str = 'Catalog'
) -> None:
    """Generates an index.html file that can be used as a data catalog website.

    Include a python script implementing this function in your CI/CD pipeline, outputting an index.html file
    that you can then use to share your data catalog (e.g. on github pages).
    For an example see (TODO: insert example repo link here).

    Args:
        catalogs:   One or multiple objects inheriting the Catalog class.
                    If you have a tree of catalogs, only the root catalog is required.
        output_path:  The folder where index.html file should end up.
        overwrite_existing: Flag whether and potential index.html file should be overwritten if existing.
        include_samples:    Flag whether to attempt to fetch sample datapoints from the columns in each
                            dataset. Requires `hela.BaseDataset.get_samples` function implemented.
        web_app_title:  Optional title of the web app.

    Raises:
        FileExistsError: If the index.html file already exists under `output_path` and overwrite_existing=False.

    Examples:
    >>> from my_catalog import MyCatalog
    >>> from hela import generate_webpage
    >>> generate_webpage(MyCatalog, '.', overwrite_existing=True)
    """
    if not isinstance(catalogs, Sequence):
        catalogs = [catalogs]

    jg = JsonGenerator()
    json_str = jg.generate_docs_jsons(catalogs, include_samples=include_samples)
    folder_path = Path(output_path)
    file_path = folder_path if '.html' in output_path else folder_path / 'index.html'
    if not folder_path.exists():
        folder_path.mkdir(parents=True)

    if file_path.exists():
        if not overwrite_existing:
            raise FileExistsError(f'File {file_path} already exists, delete or set overwrite_existing=True')
        file_path.unlink()

    # Replace placeholder script with actual json data
    replacement_str = f'<script>window.treeListData = {json_str}</script>'
    match_str = '<script id="tree-list-data"></script>'
    output_file = gzip.decompress(pkg_resources.resource_string(__name__, 'index.html.gz')).decode()
    if match_str not in output_file:
        raise ValueError('Could not insert data in frontend.')
    output_file = output_file.replace(match_str, replacement_str)

    if '[[ReplaceDashboard]]' not in output_file:
        raise ValueError(f'Could not replace title in frontend.')

    # Replace web app title with custom title
    output_file = output_file.replace('[[ReplaceTitleDashboard]]', f'<title>{web_app_title}</title>')
    output_file = output_file.replace('[[ReplaceDashboard]]', web_app_title)
    file_path.write_text(output_file)

Classes

class BaseDataset (name: str, data_type: str, folder: Optional[Union[str, Path]] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None, partition_cols: Optional[Sequence[str]] = None, columns: Optional[Sequence[_ColumnType]] = None)

Abstract Dataset class to be used when defining building your own datasets.

If you choose to build data interactivity through the data catalog, it is within your own dataset classes you would build authentication and connection logic.

For full usage of the available catalog features implement the functions BaseDataset.get_samples() and BaseDataset.get_dates().

Attributes

name
The name of the dataset
data_type
The data type of the dataset e.g. "parquet" or "bigquery
description
A description of the dataset as a string
partition_cols
A list of column names to be used for partitioning as strings
rich_description_path
A path to a markdown file with possibilities for longer, more detailed descriptions. Primarily used for generated catalog web page.
columns
A list of class ColumnType objects defining the columns of the dataset
path
The path to the dataset (combination of folder and name)
Expand source code
class BaseDataset(ABC):
    """Abstract Dataset class to be used when defining building your own datasets.

    If you choose to build data interactivity through the data catalog, it is within
    your own dataset classes you would build authentication and connection logic.

    For full usage of the available catalog features implement the functions
    `BaseDataset.get_samples` and `BaseDataset.get_dates`.

    Attributes:
        name: The name of the dataset
        data_type: The data type of the dataset e.g. "parquet" or "bigquery
        description: A description of the dataset as a string
        partition_cols: A list of column names to be used for partitioning as strings
        rich_description_path:  A path to a markdown file with possibilities for longer,
                                more detailed descriptions. Primarily used for generated catalog web page.
        columns: A list of class ColumnType objects defining the columns of the dataset
        path: The path to the dataset (combination of folder and name)
    """
    _is_dataset: bool = True
    _type: str = 'Dataset'

    def __init__(
        self,
        name: str,
        data_type: str,
        folder: Optional[Union[str, Path]] = None,
        database: Optional[str] = None,
        description: Optional[str] = None,
        rich_description_path: Optional[str] = None,
        partition_cols: Optional[Sequence[str]] = None,
        columns: Optional[Sequence[_ColumnType]] = None,
    ) -> None:
        self.name = name
        self.data_type = data_type
        self.description = description
        self.rich_description_path = rich_description_path
        self.partition_cols = partition_cols
        self.database = database
        self.folder = folder
        self.path = None
        self._set_path()
        self._set_columns(columns)
        # _id used to build links in generated catalog website
        self._id: str = str(uuid.uuid4())

    def _set_columns(self, columns: Optional[Sequence[_ColumnType]] = None) -> None:
        if columns is None:
            self.columns = columns
            return
        duplicated_columns = ', '.join(
            [f'"{col.name}"' for col, count in Counter(columns).items() if count > 1]
        )
        if duplicated_columns:
            raise DuplicationError(f'Found duplication of column(s) {duplicated_columns} in dataset "{self.name}".')
        col_list = Columns(columns)
        for c in columns:
            setattr(col_list, c.name, c)
        self.columns = col_list

    def _set_path(self) -> None:
        if self.folder is None:
            return
        path = join_paths(self.folder, self.name).with_suffix(f'.{self.data_type}')
        self.path = path
        setattr(self, _PATH_VAR, path)

    def _describe(self) -> _DatasetInfo:
        info_obj = _DatasetInfo(
            name=self.name,
            data_type=self.data_type,
            description=self.description
        )
        try:
            dates = self.get_dates()
            if dates is None:
                return info_obj
            info_obj.min_date = min(dates)
            info_obj.max_date = max(dates)
            info_obj.nbr_missing_dates = len(get_missing_dates(dates))
        except NotImplementedError:
            pass
        return info_obj

    def show_columns(self, samples: bool = True) -> pd.DataFrame:
        """Returns a dataframe with information of the columns of this dataset, one column per row.

        Args:
            samples:    When true will include a sample datapoint for all columns.
                        Requires implementation of `BaseDataset.get_samples` function.

        Returns:
            A pandas dataframe with one column per row.
        """
        if self.columns is None:
            return None

        column_df = pd.DataFrame([
            cinfo.__dict__ for c in self.columns for cinfo in c._describe()
        ])
        if samples:
            try:
                fetched_samples = self.get_samples()
                if fetched_samples:
                    fetched_samples = {**fetched_samples, **flatten_dict(fetched_samples)}
                    column_df.loc[:, 'Sample'] = column_df.name.apply(lambda x: fetched_samples.get(x, None))
            except NotImplementedError:
                pass
        return column_df

    def show_dates(self) -> None:
        """
        Will generate a grid plot of all available dates for this dataset.
        Requires `BaseDataset.get_dates` implemented.
        """
        dates = self.get_dates()
        if dates is None:
            raise ValueError(f'No dates could be fetched from dataset {self}')
        return plot_date_availability_calendar(dates)

    def check_columns(
        self,
        column_list: Sequence[str],
        raise_undefined_columns: bool = False,
        raise_missing_columns: bool = False
    ) -> None:
        """
        Will compare the sent in column list against the dataset's defined columns
        and inform (warn or raise) regarding any discrepancies.

        Args:
            column_list: A list of names of columns as strings
            raise_undefined_columns: Optional; If True will raise if columns
                found in column_list not defined in dataset
            raise_missing_columns: If True will raise if columns
                defined in dataset not found in column_list

        Raises:
            DatasetError: If any of raise flags are set to True

        Examples:
        >>> my_dataset.check_columns(df.columns, raise_undefined_columns=True)
        """
        if self.columns is None:
            warnings.warn('Dataset has no columns specified.')
            return
        undefined_columns = set(column_list) - set([c.name for c in self.columns])
        msg = f'The following columns are not defined in dataset: {list(undefined_columns)}'
        if raise_undefined_columns:
            raise DatasetError(msg)
        warnings.warn(msg)

        missing_columns = set([c.name for c in self.columns]) - set(column_list)
        msg = f'The following columns are missing from column_list: {list(missing_columns)}'
        if raise_missing_columns:
            raise DatasetError(msg)
        warnings.warn(msg)

    @property
    def _prefix(self) -> str:
        """Returns the prefix as either folder, database or empty string."""
        if self.folder:
            return self.folder
        if self.database:
            return self.database
        return ''

    def __str__(self) -> str:
        prefix = self._prefix
        if prefix:
            return f'{prefix}:{self.name}'
        return self.name

    def __repr__(self) -> str:
        return self.__str__()

    def __eq__(self, o: BaseDataset) -> bool:
        return self.name == o.name and self._id == o._id

    def __hash__(self) -> int:
        return hash(self.__str__())

    def _desc_(self) -> ShortDescription:
        return ShortDescription(name=self.name, type=self._type, description=self.description)

    def get_dates(self) -> Optional[Set[date]]:
        """Implement this function for date inspection functionality such as `BaseDataset.show_dates`.

        Should return a set of dates when called or None if dates for some reason could not be fetched.
        """
        raise NotImplementedError

    def get_samples(self) -> Optional[Dict[str, Any]]:
        """Implement this function for sample inspection functionality used in e.g. `BaseDataset.show_columns`.

        Should return a dictionary of string keys for column names with samples:
        >>> {'my_column': 123}

        Nested columns should return names with dot-notation:
        >>> {'parent_column.my_column': 123}

        Or None if samples could not be fetched:
        >>> None
        """
        raise NotImplementedError

Ancestors

  • abc.ABC

Subclasses

Methods

def check_columns(self, column_list: Sequence[str], raise_undefined_columns: bool = False, raise_missing_columns: bool = False) ‑> None

Will compare the sent in column list against the dataset's defined columns and inform (warn or raise) regarding any discrepancies.

Args

column_list
A list of names of columns as strings
raise_undefined_columns
Optional; If True will raise if columns found in column_list not defined in dataset
raise_missing_columns
If True will raise if columns defined in dataset not found in column_list

Raises

DatasetError
If any of raise flags are set to True

Examples:

>>> my_dataset.check_columns(df.columns, raise_undefined_columns=True)
Expand source code
def check_columns(
    self,
    column_list: Sequence[str],
    raise_undefined_columns: bool = False,
    raise_missing_columns: bool = False
) -> None:
    """
    Will compare the sent in column list against the dataset's defined columns
    and inform (warn or raise) regarding any discrepancies.

    Args:
        column_list: A list of names of columns as strings
        raise_undefined_columns: Optional; If True will raise if columns
            found in column_list not defined in dataset
        raise_missing_columns: If True will raise if columns
            defined in dataset not found in column_list

    Raises:
        DatasetError: If any of raise flags are set to True

    Examples:
    >>> my_dataset.check_columns(df.columns, raise_undefined_columns=True)
    """
    if self.columns is None:
        warnings.warn('Dataset has no columns specified.')
        return
    undefined_columns = set(column_list) - set([c.name for c in self.columns])
    msg = f'The following columns are not defined in dataset: {list(undefined_columns)}'
    if raise_undefined_columns:
        raise DatasetError(msg)
    warnings.warn(msg)

    missing_columns = set([c.name for c in self.columns]) - set(column_list)
    msg = f'The following columns are missing from column_list: {list(missing_columns)}'
    if raise_missing_columns:
        raise DatasetError(msg)
    warnings.warn(msg)
def get_dates(self) ‑> Optional[Set[datetime.date]]

Implement this function for date inspection functionality such as BaseDataset.show_dates().

Should return a set of dates when called or None if dates for some reason could not be fetched.

Expand source code
def get_dates(self) -> Optional[Set[date]]:
    """Implement this function for date inspection functionality such as `BaseDataset.show_dates`.

    Should return a set of dates when called or None if dates for some reason could not be fetched.
    """
    raise NotImplementedError
def get_samples(self) ‑> Optional[Dict[str, Any]]

Implement this function for sample inspection functionality used in e.g. BaseDataset.show_columns().

Should return a dictionary of string keys for column names with samples:

>>> {'my_column': 123}

Nested columns should return names with dot-notation:

>>> {'parent_column.my_column': 123}

Or None if samples could not be fetched:

>>> None
Expand source code
def get_samples(self) -> Optional[Dict[str, Any]]:
    """Implement this function for sample inspection functionality used in e.g. `BaseDataset.show_columns`.

    Should return a dictionary of string keys for column names with samples:
    >>> {'my_column': 123}

    Nested columns should return names with dot-notation:
    >>> {'parent_column.my_column': 123}

    Or None if samples could not be fetched:
    >>> None
    """
    raise NotImplementedError
def show_columns(self, samples: bool = True) ‑> pandas.core.frame.DataFrame

Returns a dataframe with information of the columns of this dataset, one column per row.

Args

samples
When true will include a sample datapoint for all columns. Requires implementation of BaseDataset.get_samples() function.

Returns

A pandas dataframe with one column per row.

Expand source code
def show_columns(self, samples: bool = True) -> pd.DataFrame:
    """Returns a dataframe with information of the columns of this dataset, one column per row.

    Args:
        samples:    When true will include a sample datapoint for all columns.
                    Requires implementation of `BaseDataset.get_samples` function.

    Returns:
        A pandas dataframe with one column per row.
    """
    if self.columns is None:
        return None

    column_df = pd.DataFrame([
        cinfo.__dict__ for c in self.columns for cinfo in c._describe()
    ])
    if samples:
        try:
            fetched_samples = self.get_samples()
            if fetched_samples:
                fetched_samples = {**fetched_samples, **flatten_dict(fetched_samples)}
                column_df.loc[:, 'Sample'] = column_df.name.apply(lambda x: fetched_samples.get(x, None))
        except NotImplementedError:
            pass
    return column_df
def show_dates(self) ‑> None

Will generate a grid plot of all available dates for this dataset. Requires BaseDataset.get_dates() implemented.

Expand source code
def show_dates(self) -> None:
    """
    Will generate a grid plot of all available dates for this dataset.
    Requires `BaseDataset.get_dates` implemented.
    """
    dates = self.get_dates()
    if dates is None:
        raise ValueError(f'No dates could be fetched from dataset {self}')
    return plot_date_availability_calendar(dates)
class Catalog

Inheritable Catalog class, used when building your own data catalog.

The namesake of the python package, this class will turn your code from just being code into a data catalog. This class will make your datasets iterable, testable, referenceable and more. You can also decorate the catalog with the Catalog.setup() function, giving your catalog a description and enriching datasets within it.

Examples:

>>> # Without decorator
>>> from catalog import Catalog
>>> class MyCatalog(Catalog):
...     my_dataset = Dataset(...)
>>> # With decorator
>>> from catalog import Catalog
>>> @Catalog.setup(folder='sales', description='Datasets related to sales.')
>>> class SalesCatalog(Catalog):
...     my_dales_dataset = Dataset(...)
Expand source code
class Catalog:
    """Inheritable Catalog class, used when building your own data catalog.

    The namesake of the python package, this class will turn your code from just being code into a data catalog.
    This class will make your datasets iterable, testable, referenceable and more.
    You can also decorate the catalog with the `Catalog.setup` function, giving your catalog a description and
    enriching datasets within it.

    Examples:
    >>> # Without decorator
    >>> from catalog import Catalog
    >>> class MyCatalog(Catalog):
    ...     my_dataset = Dataset(...)

    >>> # With decorator
    >>> from catalog import Catalog
    >>> @Catalog.setup(folder='sales', description='Datasets related to sales.')
    >>> class SalesCatalog(Catalog):
    ...     my_dales_dataset = Dataset(...)
    """
    _is_catalog: bool = True
    _type: str = 'Catalog'
    _folder: str = None
    _database: str = None
    _description: str = None
    _rich_description_path: str = None

    @staticmethod
    def setup(
        cls: Catalog = None,
        folder: Optional[str] = None,
        database: Optional[str] = None,
        description: Optional[str] = None,
        rich_description_path: Optional[str] = None
    ) -> Catalog:
        """Decorator enriching the catalog with a description, and optionally binding
        a folder or database to all datasets within it.

        Args:
            folder: Used for filestore style datasets (e.g. spark),
                    build the catalogs folder structure.
            database:   Used for database style datasets (e.g. bigquery,
                        aws glue) builds the catalogs database structure.
            description:    A description of this catalog.
            rich_description_path: Path to markdown file with richer descriptions of this catalog.
        """

        def wrap(cls: Catalog):
            if not getattr(cls, '_is_catalog', False):
                raise ValueError(f'Class {cls} must inherit Catalog class.')
            cls._folder = folder
            cls._database = database
            cls._description = description
            cls._rich_description_path = rich_description_path
            _enrich_datasets(cls, folder=folder, database=database)
            return cls

        # When called with pathentheses "@catalog()"
        if cls is None:
            return wrap

        # When called without pathentheses "@catalog"
        return wrap(cls)

    @classmethod
    def get_catalogs(cls, recursive: bool = True) -> Sequence[Catalog]:
        """Get a list of all sub-catalogs of this catalog, not including self.

        Args:
            recursive:  Flag whether to search for sub-catalog in this catalog's sub-catalogs.

        Returns:
            A list of objects inheriting the Catalog class.
        """
        catalog_list = []
        for obj in cls.__dict__.values():
            catalog = is_catalog(obj)
            if catalog:
                catalog_list.append(catalog)
                if recursive:
                    catalog_list.extend(catalog.get_catalogs())
        return catalog_list

    @classmethod
    def get_datasets(cls, recursive: bool = True) -> Sequence[BaseDataset]:
        """Returns a list of all datasets within this catalog, recursively if flag is set.

        Args:
            recursive: When set to true this function will fetch datasets from sub-catalogs of this catalog.

        Returns:
            A list of dataset objects.
        """
        dataset_list = []
        for obj in cls.__dict__.values():
            dataset = is_dataset(obj)
            if dataset:
                dataset_list.append(obj)
            elif recursive:
                catalog = is_catalog(obj)
                if catalog:
                    dataset_list.extend(catalog.get_datasets())
        return dataset_list

    @classmethod
    def get_columns_datasets(cls, recursive: bool = True) -> Dict[_ColumnType, Sequence[BaseDataset]]:
        column_dict = defaultdict(list)
        for dataset in cls.get_datasets(recursive=recursive):
            if dataset.columns is None:
                continue

            for column in dataset.columns:
                column_dict[column].append(dataset)
        return column_dict

    @classmethod
    def show_datasets(cls, recursive: bool = True) -> DFDisplay:
        """Returns a DFDisplay with a description of all datasets in this catalog, one dataset per row.

        Args:
            recursive: Whether to show dataset recursively in subcatalogs.

        Returns:
            DFDisplay (pandas dataframe)
        """
        return DFDisplay([
            d._describe().__dict__
            for d in cls.get_datasets(recursive=recursive)
        ])

    @classmethod
    def show_columns(cls, recursive: bool = True) -> DFDisplay:
        """Returns a pandas dataframe with a description of all columns in this catalog, one column per row.

        Args:
            recursive: Whether to show dataset recursively in subcatalogs.

        Returns:
            DFDisplay (pandas dataframe)
        """
        df = DFDisplay([
            {
                **col.__dict__,
                'datasets': datasets
            }
            for col_obj, datasets in cls.get_columns_datasets(recursive=recursive).items()
            for col in col_obj._describe()
        ])
        return df.sort_values('name').reset_index(drop=True)

    @classmethod
    def _all_descriptions(cls, recursive=True) -> Sequence[ShortDescription]:
        """Collect descriptions for all Catalogs, Datasets and Columns.

        Args:
            recursive:  Whether to search recursively through sub-catalogs.

        Returns:
            A sequence of ShortDescription objects
        """
        column_descriptions = []
        for c in cls.get_columns_datasets(recursive=recursive).keys():
            desc = c._desc_()
            # Columns will sometimes give a sequence if it is a nested column
            if isinstance(desc, Sequence):
                column_descriptions.extend(desc)
            else:
                column_descriptions.append(desc)
        return [
            *column_descriptions,
            *[d._desc_() for d in cls.get_datasets(recursive=recursive)],
            *[c._desc_() for c in cls.get_catalogs(recursive=recursive)],
            cls._desc_()
        ]

    @classmethod
    def search(cls, query_str: str, recursive: bool = True, max_hits: int = 5, min_relevance: float = .1) -> DFDisplay:
        """Searches across names and descriptions of Catalogs, Datasets and Columns.

        * Search on name is based on Levenshtein distance (fuzzy search).
        * Search on description is based on cosine similarity of TF-IDF matrix.

        Args:
            query_str:  The string to base the search on.
            recursive:  Whether to search recursively through sub-catalogs.
            max_hits:   Maximum number of hits to return.
            min_relevance: Minimum required relevance score to return a hit.

        Returns:
            A DFDisplay (pandas) dataframe with hits sorted on relevance.
        """

        # Create dictionaries to collect into unique name/desc keys
        name_dict, desc_dict = defaultdict(list), defaultdict(list)
        for short_desc in cls._all_descriptions(recursive=recursive):
            name = short_desc.name
            if name is not None:
                name_dict[name].append(short_desc)
                # Take both original and split version if we're dealing with nested columns
                # E.g. ratings.taste
                if '.' in name:
                    name_dict[name.split('.')[-1]].append(short_desc)
            if short_desc.description is not None:
                desc_dict[short_desc.description].append(short_desc)

        l_search = levenshtein.sort(query_str, list(name_dict.keys()), min_similarity=.5)
        try:
            tf_idf_search = tf_idf.sort(query_str, list(desc_dict.keys()))
        except ValueError:
            # Raised when we get no vocabulary matches at all
            tf_idf_search = []

        relevance_col = 'relevance'

        hit_df = DFDisplay([
            *[
                {
                    **short_desc.__dict__,
                    relevance_col: hit.score,
                    'hit_on': 'name'
                }
                for hit in l_search
                for short_desc in name_dict[hit.match_string]
            ],
            *[
                {
                    **short_desc.__dict__,
                    relevance_col: hit.score,
                    'hit_on': 'description'
                }
                for hit in tf_idf_search
                for short_desc in desc_dict[hit.match_string]
            ]
        ])
        error_msg = f'No hits good enough found on query string: "{query_str}"'
        if len(hit_df) == 0:
            raise ValueError(error_msg)
        hit_df = hit_df[hit_df[relevance_col] > min_relevance]

        if len(hit_df) == 0:
            raise ValueError(error_msg)

        hit_df.loc[:, relevance_col] = hit_df[relevance_col].round(2)
        return (
            hit_df
            .sort_values(relevance_col, ascending=False)
            .drop_duplicates(subset=['name', 'type'])
            [:max_hits]
        )

    @classmethod
    def _desc_(self) -> ShortDescription:
        return ShortDescription(name=self.__name__, type=self._type, description=self._description)

Static methods

def get_catalogs(recursive: bool = True) ‑> Sequence[hela._catalog_class.Catalog]

Get a list of all sub-catalogs of this catalog, not including self.

Args

recursive
Flag whether to search for sub-catalog in this catalog's sub-catalogs.

Returns

A list of objects inheriting the Catalog class.

Expand source code
@classmethod
def get_catalogs(cls, recursive: bool = True) -> Sequence[Catalog]:
    """Get a list of all sub-catalogs of this catalog, not including self.

    Args:
        recursive:  Flag whether to search for sub-catalog in this catalog's sub-catalogs.

    Returns:
        A list of objects inheriting the Catalog class.
    """
    catalog_list = []
    for obj in cls.__dict__.values():
        catalog = is_catalog(obj)
        if catalog:
            catalog_list.append(catalog)
            if recursive:
                catalog_list.extend(catalog.get_catalogs())
    return catalog_list
def get_columns_datasets(recursive: bool = True) ‑> Dict[hela._column_classes._ColumnType, Sequence[hela._base_dataset.BaseDataset]]
Expand source code
@classmethod
def get_columns_datasets(cls, recursive: bool = True) -> Dict[_ColumnType, Sequence[BaseDataset]]:
    column_dict = defaultdict(list)
    for dataset in cls.get_datasets(recursive=recursive):
        if dataset.columns is None:
            continue

        for column in dataset.columns:
            column_dict[column].append(dataset)
    return column_dict
def get_datasets(recursive: bool = True) ‑> Sequence[hela._base_dataset.BaseDataset]

Returns a list of all datasets within this catalog, recursively if flag is set.

Args

recursive
When set to true this function will fetch datasets from sub-catalogs of this catalog.

Returns

A list of dataset objects.

Expand source code
@classmethod
def get_datasets(cls, recursive: bool = True) -> Sequence[BaseDataset]:
    """Returns a list of all datasets within this catalog, recursively if flag is set.

    Args:
        recursive: When set to true this function will fetch datasets from sub-catalogs of this catalog.

    Returns:
        A list of dataset objects.
    """
    dataset_list = []
    for obj in cls.__dict__.values():
        dataset = is_dataset(obj)
        if dataset:
            dataset_list.append(obj)
        elif recursive:
            catalog = is_catalog(obj)
            if catalog:
                dataset_list.extend(catalog.get_datasets())
    return dataset_list
def search(query_str: str, recursive: bool = True, max_hits: int = 5, min_relevance: float = 0.1) ‑> DFDisplay

Searches across names and descriptions of Catalogs, Datasets and Columns.

  • Search on name is based on Levenshtein distance (fuzzy search).
  • Search on description is based on cosine similarity of TF-IDF matrix.

Args

query_str
The string to base the search on.
recursive
Whether to search recursively through sub-catalogs.
max_hits
Maximum number of hits to return.
min_relevance
Minimum required relevance score to return a hit.

Returns

A DFDisplay (pandas) dataframe with hits sorted on relevance.

Expand source code
@classmethod
def search(cls, query_str: str, recursive: bool = True, max_hits: int = 5, min_relevance: float = .1) -> DFDisplay:
    """Searches across names and descriptions of Catalogs, Datasets and Columns.

    * Search on name is based on Levenshtein distance (fuzzy search).
    * Search on description is based on cosine similarity of TF-IDF matrix.

    Args:
        query_str:  The string to base the search on.
        recursive:  Whether to search recursively through sub-catalogs.
        max_hits:   Maximum number of hits to return.
        min_relevance: Minimum required relevance score to return a hit.

    Returns:
        A DFDisplay (pandas) dataframe with hits sorted on relevance.
    """

    # Create dictionaries to collect into unique name/desc keys
    name_dict, desc_dict = defaultdict(list), defaultdict(list)
    for short_desc in cls._all_descriptions(recursive=recursive):
        name = short_desc.name
        if name is not None:
            name_dict[name].append(short_desc)
            # Take both original and split version if we're dealing with nested columns
            # E.g. ratings.taste
            if '.' in name:
                name_dict[name.split('.')[-1]].append(short_desc)
        if short_desc.description is not None:
            desc_dict[short_desc.description].append(short_desc)

    l_search = levenshtein.sort(query_str, list(name_dict.keys()), min_similarity=.5)
    try:
        tf_idf_search = tf_idf.sort(query_str, list(desc_dict.keys()))
    except ValueError:
        # Raised when we get no vocabulary matches at all
        tf_idf_search = []

    relevance_col = 'relevance'

    hit_df = DFDisplay([
        *[
            {
                **short_desc.__dict__,
                relevance_col: hit.score,
                'hit_on': 'name'
            }
            for hit in l_search
            for short_desc in name_dict[hit.match_string]
        ],
        *[
            {
                **short_desc.__dict__,
                relevance_col: hit.score,
                'hit_on': 'description'
            }
            for hit in tf_idf_search
            for short_desc in desc_dict[hit.match_string]
        ]
    ])
    error_msg = f'No hits good enough found on query string: "{query_str}"'
    if len(hit_df) == 0:
        raise ValueError(error_msg)
    hit_df = hit_df[hit_df[relevance_col] > min_relevance]

    if len(hit_df) == 0:
        raise ValueError(error_msg)

    hit_df.loc[:, relevance_col] = hit_df[relevance_col].round(2)
    return (
        hit_df
        .sort_values(relevance_col, ascending=False)
        .drop_duplicates(subset=['name', 'type'])
        [:max_hits]
    )
def setup(cls: Catalog = None, folder: Optional[str] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None) ‑> hela._catalog_class.Catalog

Decorator enriching the catalog with a description, and optionally binding a folder or database to all datasets within it.

Args

folder
Used for filestore style datasets (e.g. spark), build the catalogs folder structure.
database
Used for database style datasets (e.g. bigquery, aws glue) builds the catalogs database structure.
description
A description of this catalog.
rich_description_path
Path to markdown file with richer descriptions of this catalog.
Expand source code
@staticmethod
def setup(
    cls: Catalog = None,
    folder: Optional[str] = None,
    database: Optional[str] = None,
    description: Optional[str] = None,
    rich_description_path: Optional[str] = None
) -> Catalog:
    """Decorator enriching the catalog with a description, and optionally binding
    a folder or database to all datasets within it.

    Args:
        folder: Used for filestore style datasets (e.g. spark),
                build the catalogs folder structure.
        database:   Used for database style datasets (e.g. bigquery,
                    aws glue) builds the catalogs database structure.
        description:    A description of this catalog.
        rich_description_path: Path to markdown file with richer descriptions of this catalog.
    """

    def wrap(cls: Catalog):
        if not getattr(cls, '_is_catalog', False):
            raise ValueError(f'Class {cls} must inherit Catalog class.')
        cls._folder = folder
        cls._database = database
        cls._description = description
        cls._rich_description_path = rich_description_path
        _enrich_datasets(cls, folder=folder, database=database)
        return cls

    # When called with pathentheses "@catalog()"
    if cls is None:
        return wrap

    # When called without pathentheses "@catalog"
    return wrap(cls)
def show_columns(recursive: bool = True) ‑> DFDisplay

Returns a pandas dataframe with a description of all columns in this catalog, one column per row.

Args

recursive
Whether to show dataset recursively in subcatalogs.

Returns

DFDisplay (pandas dataframe)

Expand source code
@classmethod
def show_columns(cls, recursive: bool = True) -> DFDisplay:
    """Returns a pandas dataframe with a description of all columns in this catalog, one column per row.

    Args:
        recursive: Whether to show dataset recursively in subcatalogs.

    Returns:
        DFDisplay (pandas dataframe)
    """
    df = DFDisplay([
        {
            **col.__dict__,
            'datasets': datasets
        }
        for col_obj, datasets in cls.get_columns_datasets(recursive=recursive).items()
        for col in col_obj._describe()
    ])
    return df.sort_values('name').reset_index(drop=True)
def show_datasets(recursive: bool = True) ‑> DFDisplay

Returns a DFDisplay with a description of all datasets in this catalog, one dataset per row.

Args

recursive
Whether to show dataset recursively in subcatalogs.

Returns

DFDisplay (pandas dataframe)

Expand source code
@classmethod
def show_datasets(cls, recursive: bool = True) -> DFDisplay:
    """Returns a DFDisplay with a description of all datasets in this catalog, one dataset per row.

    Args:
        recursive: Whether to show dataset recursively in subcatalogs.

    Returns:
        DFDisplay (pandas dataframe)
    """
    return DFDisplay([
        d._describe().__dict__
        for d in cls.get_datasets(recursive=recursive)
    ])
class Col (name: str, data_type: PrimitiveType, description: str = None)

A basic column object, for nested columns see NestedCol.

This class is used to define columns within a BaseDataset or column_store(). Each defined column will be searchable, testable and referenceable.

If you want to give further functionality to your columns, please this class.

Attributes

name
The name of the column.
data_type
The data type of the column, should be one of types found in hela.data_types
description
A description of this column as a string, better descriptions yield a more secure catalog.

Examples:

>>> from hela import Col
>>> from hela.data_types import String
>>> my_col = Col('my_col', String(), 'This is an example column')
Expand source code
class Col(_ColumnType):
    """A basic column object, for nested columns see `hela.NestedCol`.

    This class is used to define columns within a `hela.BaseDataset` or `hela.column_store`.
    Each defined column will be searchable, testable and referenceable.

    If you want to give further functionality to your columns, please this class.

    Attributes:
        name:   The name of the column.
        data_type:  The data type of the column, should be one of types found in `hela.data_types`
        description: A description of this column as a string, better descriptions yield a more secure catalog.

    Examples:
    >>> from hela import Col
    >>> from hela.data_types import String
    >>> my_col = Col('my_col', String(), 'This is an example column')
    """

    def __init__(
        self,
        name: str,
        data_type: PrimitiveType,
        description: str = None,
    ) -> None:
        super().__init__(name=name, data_type=data_type, description=description)

    def _describe(self) -> Sequence[_ColInfo]:
        return [_ColInfo(
            name=self.name,
            data_type=str(self.data_type),
            description=self.description,
            from_store=self.from_store
        )]

    def _spark_type(self):
        return StructField(name=self.name, dataType=self.data_type._spark_type())

    def _glue_type(self):
        return GlueColumn(name=self.name, type=self.data_type._glue_type(), comment=self.description)

    def _json_type(self):
        return {self.name: self.data_type._json_type()}

    def _bigquery_type(self):
        return bigquery.SchemaField(
            name=self.name,
            description=self.description,
            **self.data_type._bigquery_type().__dict__
        )

    def __str__(self) -> str:
        return f'Col(name="{self.name}", data_type={self.data_type}, description={self.description})'

    def __repr__(self) -> str:
        return self.__str__()

Ancestors

  • hela._column_classes._ColumnType
  • hela._base_data_type.BaseDataType
  • abc.ABC
class NestedCol (name: str, columns: Sequence[_ColumnType])

A nested style column object, should be instantiated with sub columns.

Most data stores support nested style columns, these can be built using this column class. These columns will be referenced with dot-notation when shown in the catalog. For dict/struct style columns see Struct.

Attributes

name
The name of the column.
columns
A sequence of columns nested within this column. Can be Col or NestedCol objects.

Examples:

>>> from hela import NestedCol, Col
>>> from hela.data_types import String, Int
>>> my_col = NestedCol('my_nested_col', [
...     Col('nested_string', String(), 'Nested string column'),
...     Col('nested_int', Int(), 'Nested int column')
... ])
Expand source code
class NestedCol(_ColumnType):
    """A nested style column object, should be instantiated with sub columns.

    Most data stores support nested style columns, these can be built using this column class.
    These columns will be referenced with dot-notation when shown in the **catalog**.
    For dict/struct style columns see `hela.data_types.Struct`.

    Attributes:
        name:   The name of the column.
        columns:    A sequence of columns nested within this column. Can be Col or NestedCol objects.

    Examples:
    >>> from hela import NestedCol, Col
    >>> from hela.data_types import String, Int
    >>> my_col = NestedCol('my_nested_col', [
    ...     Col('nested_string', String(), 'Nested string column'),
    ...     Col('nested_int', Int(), 'Nested int column')
    ... ])
    """

    def __init__(self, name: str, columns: Sequence[_ColumnType]) -> None:
        self.columns = columns
        data_type = Struct({c.name: c.data_type for c in columns})
        super().__init__(name=name, data_type=data_type, description='A subset of columns.')

    def _describe(self) -> Sequence[_ColInfo]:
        col_info_list = []
        for c in self.columns:
            for c_desc in c._describe():
                desc = copy(c_desc)
                desc.name = f'{self.name}.{desc.name}'
                col_info_list.append(desc)
        return col_info_list

    def _spark_type(self):
        return StructField(name=self.name, dataType=self.data_type._spark_type())

    def _glue_type(self):
        return GlueColumn(name=self.name, type=self.data_type._glue_type())

    def _json_type(self):
        return {self.name: self.data_type._json_type()}

    def _bigquery_type(self):
        return bigquery.SchemaField(
            name=self.name,
            field_type='RECORD',
            mode=BigqueryMode.NULLABLE,
            fields=[c._bigquery_type() for c in self.columns]
        )

    def __str__(self) -> str:
        subs = [str(c) for c in self.columns]
        return f'NestedCol(name="{self.name}", subcols={subs})'

    def _desc_(self) -> Sequence[ShortDescription]:
        desc_list = []
        for c_desc in self.columns:
            desc = copy(c_desc._desc_())
            desc.name = f'{self.name}.{c_desc.name}'
            desc_list.append(desc)
        return desc_list

    def __repr__(self) -> str:
        return self.__str__()

Ancestors

  • hela._column_classes._ColumnType
  • hela._base_data_type.BaseDataType
  • abc.ABC