Package hela
You probably already have your data job scripts version controlled, but what about your data catalog? The answer: write your data catalog as code! Storing your data catalog and data documentation as code makes your catalog searchable, referenceable, reliable, platform agnostic, sets you up for easy collaboration and much more! This library is built to fit small and large data landscapes, but is happiest when included from the start.
Hela (or Hel) is the norse mythological collector of souls, and the Swedish word for "whole" or "all of it". Hela
is designed to give everyone a chance to build a data catalog, with a low entry barrier: pure python code.
- Interested in contributing? Find
helaon github - For an example of a larger data catalog, view this showcase catalog
Overview
The catalog package consists of four primary components:
Catalog: The eponymous class of this package. This inheritable class holds your entire catalog together, and you can build trees of datasets in catalogs in catalogs.BaseDataset: An inheritable dataset class, the second cornerstone of the package. Depending on how much time you want to invest in this data catalog, it is within your own datasets you would write the most code. SeePandasParquetDatasetfor examples.Col&NestedCol: The leaves of your beautiful catalog tree. These are referenceable, reusable and (preferably) well documented column objects.column_store(): The most generous store filled with columns used in multiple datasets of your catalog landscape.
And let's not forget the crown of this beautiful tree:
generate_webpage()Giving you the possibility to democratize and share your catalog with all recipients you want to. Serve the site wherever you can host a static index.html file such github pages.
One schema to rule them all
With high probability you have at some point have stumbled upon a situation where you have the same type of data, represented in multiple locations of different formats. Be it JSON, a database, Parquet files or BigQuery, usually a datapoint called e.g. weekday will mean the same no matter where you are. With catalog you can make sure these datapoints are of the same type, and described the same no matter the source.
Let's say you have an API that dumps JSON into some kind of blob storage. You want to dump this data into your BigQuery table and ensure that you have the correct schema end-to-end. Using the same dataset (or list of columns) you can generate a schema for both BigQuery and JSON:
from hela import Col, schema_generators
from hela.data_types import String, Int
columns = [
Col('product_name', String(), 'The name of the product.'),
NestedCol('ratings', [
Col('taste', Int(), 'A taste rating of 1-5'),
Col('design', Int(), 'A design rating of 1-5')
])
]
# Generates BigQuery schema (using BigQuery SDK)
bigquery_schema = schema_generators.bigquery_schema(columns)
# Generates JSON schema (according to json-schema.org)
json_schema = schema_generators.json_schema(columns)
Or if you have some data stored in parquet read by spark, with overlapping columns stored in S3 managed by AWS Glue:
from hela import Col, NestedCol, schema_generators, column_store
from hela.data_types import String, Int
@column_store()
class MyStore:
product_name = Col('product_name', String(), 'The name of the product.')
glue_columns = [
MyStore.product_name,
Col('nbr_sold', Int(), 'Number sold of a specific product.')
]
spark_columns = [
MyStore.product_name,
Col('product_id', Int(), 'Integer identificator of a specific product.')
]
# Generates glue schema (using AWS CDK)
schema_generators.aws_glue_schema(glue_columns)
# Generate spark schema (using pyspark)
schema_generators.spark_schema(spark_columns)
Getting Started
Setting up, reference the infer module here?
When building your data catalog it is recommended to keep the folder structure in line with how the data will be structure in your data lake/warehouse as the example below (for a complete example see the showcase repo).
my_catalog/
├── rich_descriptions/
│ ├── orders.md
│ └── ...
├── MyDatasets/
│ ├── best_dataset.py
│ └── ...
├── MyOtherDatasets/
│ ├── decent_dataset.py
│ └── ...
├── my_catalog.py
└── my_column_store.py
The next step is to build your own dataset, this is where you can put most of your code when it comes functionality such as:
- Authentication and permissions
- Connections and configs
- Write & Load functionality
- Various partitioning and optimization logic
Important is to inherit the BaseDataset class and shadow/hard-code any of the init fields
required.
from hela import BaseDataset, Col
from hela.data_types import String
class MyDatasetClass(BaseDataset):
def __init__(
self,
name: str, # Required
description: str, # Optional but recommended
columns: list, # Optional but recommended
rich_description_path: str = None, # Optional, used for web app
partition_cols: list = None, # Optional but recommended
# folder: str = None, # Only do one of either folder or database
database: str, # Optional, can also be enriched via Catalog
) -> None:
super().__init__(
name,
data_type='bigquery',
folder=None,
database=database,
description=description,
rich_description_path=rich_description_path,
partition_cols=partition_cols,
dependencies=None,
columns=columns
)
# Do more of your own init stuff
def my_func(self) -> None:
# Your own dataset function
pass
# Now instantiate your dataset class with one example column
my_dataset = MyDatasetClass('my_dataset', 'An example dataset.', [
Col('my_column', String(), 'An example column.')
])
Now that you have a dataset class, and instantiated your first dataset, you can start populating your data catalog.
from hela import Catalog
class MyCatalog(Catalog):
my_dataset = my_dataset
That's it! You now have a small catalog to keep building on. To view it as a web page you can add the following code to a python script, and in the future add it in whichever CI/CD tool you use:
from hela import generate_webpage
generate_webpage(MyCatalog, output_folder='.')
For further reading check out:
Catalog.search()Smart search across your catalog!hela.test_suiteQuality assurance and smart validations for your testing pipeline.- Catalog Showcase Repo See a bigger catalog in action.
Highlights
In the sections below you will find some important highlights of quality-of-life improvements given by the catalog package!
Iterate through datasets
Let's say you want to change the type of your column best_column from a string to an integer
everywhere the column is used, you can do that by fetching all datasets that
includes best_column using Catalog.get_columns_datasets(), then execute your query
on these datasets:
from my_package import MyCatalog
columns_datasets_dict = MyCatalog.get_columns_datasets()
for dataset in columns_datasets_dict['best_column']:
dataset.query('your schema changing spark query')
Anticipate errors before they happen
Everyone knows how difficult it is to name things, especially when managing multiple datasets across many similar domains. Catalog helps you keep your standards in check by making sure no column is unknowingly duplicated between different datasets.
To combat this there is a pre-built hela.test_suite module filled with helper functions. The best
way to use these functions is to include them in your package test setup (e.g. pytest). For example
make sure no column name is duplicated using validate_no_duplicated_columns().
On the other hand, sometimes as you build your catalog you find columns you would want to have the same name,
as they might include the same type of information. In these cases we can only rely on that the descriptions are
similar enough to get a hit using validate_description_similarity().
Notebook interactivity
With hela you don't even have to leave your favorite notebook tool to study your data catalog!
The Catalog and BaseDataset classes have built in functions that will in a notebook environment
display informations such as:
Columns within the catalog
This functionality also extends into sub-catalogs.

Which dates a dataset is available on
This functionality requires BaseDataset.get_dates() function implemented.

Advanced
[Page under construction]
Sometimes things work almost, but not exactly, the way you want. Here is a brief guide on how to modify the behaviour among a variety of topics. If you improve something that you believe could be useful for other people as well, please consider contributing.
Coming soon:
- Build your own schema generators
- Build your own data types
Expand source code
"""
.. include:: ../gh_pages/hela.md
"""
from hela._catalog_class import Catalog
from hela._column_classes import Col, NestedCol
from hela._base_dataset import BaseDataset
from hela._column_store_class import column_store
from hela.web_page.generate import generate_webpage
__all__ = [
'Catalog',
'BaseDataset',
'column_store',
'Col',
'NestedCol',
'generate_webpage'
]
Sub-modules
hela.data_types-
Module consisting of all pre-built data types.
hela.datasets-
Module with pre-built datasets for demonstrational purposes.
hela.errors-
Module with custom errors.
hela.infer-
Includes functions to infer Catalog schemas on various data structures.
hela.math-
Module for math and statistics related functions.
hela.plots-
Module for plot functions.
hela.schema_generators-
Module used to translate from catalog schema to other schema types.
hela.test_suite-
Module covering the test suite to make sure your catalog is set up properly …
hela.web_page-
This module includes function to generate a data catalog web page.
Functions
def column_store(cls=None, label: str = None) ‑> object-
Decorator to used to flag a class as a column store.
A column store is a referencable class used when multiple datasets use the same column. In order to ensure that this column is purposefully duplicated among datasets we check that any duplicated column must originate from the same column store.
Args
label- This string label will be passed down to all column objects within the store.
Returns
The decorated class.
Examples
>>> from hela import column_store, Col >>> from hela.data_types import String >>> @column_store(label='cool_columns') >>> class MyStore: ... my_column = Col('my_column', String(), 'Example column') >>> MyStore.my_columnExpand source code
def column_store(cls=None, label: str = None) -> object: """Decorator to used to flag a class as a column store. A column store is a referencable class used when multiple datasets use the same column. In order to ensure that this column is purposefully duplicated among datasets we check that any duplicated column must originate from the same column store. Args: label: This string label will be passed down to all column objects within the store. Returns: The decorated class. Examples: >>> from hela import column_store, Col >>> from hela.data_types import String >>> @column_store(label='cool_columns') >>> class MyStore: ... my_column = Col('my_column', String(), 'Example column') >>> MyStore.my_column """ def wrap(cls): return _make_column_store(cls, label=label) # This is triggered when called as @column_store() (with parentheses) if cls is None: return wrap # This is triggered when called as @column_store (no parentheses) # Only allow triggered with parenthesis, this will keep IDE hints. raise ValueError('A column store can only be decorated with called method: `@column_store()`') def generate_webpage(catalogs: Union[hela._catalog_class.Catalog, Sequence[hela._catalog_class.Catalog]], output_path: str, overwrite_existing: bool = False, include_samples: bool = False, web_app_title: str = 'Catalog') ‑> None-
Generates an index.html file that can be used as a data catalog website.
Include a python script implementing this function in your CI/CD pipeline, outputting an index.html file that you can then use to share your data catalog (e.g. on github pages). For an example see (TODO: insert example repo link here).
Args
catalogs- One or multiple objects inheriting the Catalog class. If you have a tree of catalogs, only the root catalog is required.
output_path- The folder where index.html file should end up.
overwrite_existing- Flag whether and potential index.html file should be overwritten if existing.
include_samples- Flag whether to attempt to fetch sample datapoints from the columns in each
dataset. Requires
BaseDataset.get_samples()function implemented. web_app_title- Optional title of the web app.
Raises
FileExistsError- If the index.html file already exists under
output_pathand overwrite_existing=False.
Examples:
>>> from my_catalog import MyCatalog >>> from hela import generate_webpage >>> generate_webpage(MyCatalog, '.', overwrite_existing=True)Expand source code
def generate_webpage( catalogs: Union[Catalog, Sequence[Catalog]], output_path: str, overwrite_existing: bool = False, include_samples: bool = False, web_app_title: str = 'Catalog' ) -> None: """Generates an index.html file that can be used as a data catalog website. Include a python script implementing this function in your CI/CD pipeline, outputting an index.html file that you can then use to share your data catalog (e.g. on github pages). For an example see (TODO: insert example repo link here). Args: catalogs: One or multiple objects inheriting the Catalog class. If you have a tree of catalogs, only the root catalog is required. output_path: The folder where index.html file should end up. overwrite_existing: Flag whether and potential index.html file should be overwritten if existing. include_samples: Flag whether to attempt to fetch sample datapoints from the columns in each dataset. Requires `hela.BaseDataset.get_samples` function implemented. web_app_title: Optional title of the web app. Raises: FileExistsError: If the index.html file already exists under `output_path` and overwrite_existing=False. Examples: >>> from my_catalog import MyCatalog >>> from hela import generate_webpage >>> generate_webpage(MyCatalog, '.', overwrite_existing=True) """ if not isinstance(catalogs, Sequence): catalogs = [catalogs] jg = JsonGenerator() json_str = jg.generate_docs_jsons(catalogs, include_samples=include_samples) folder_path = Path(output_path) file_path = folder_path if '.html' in output_path else folder_path / 'index.html' if not folder_path.exists(): folder_path.mkdir(parents=True) if file_path.exists(): if not overwrite_existing: raise FileExistsError(f'File {file_path} already exists, delete or set overwrite_existing=True') file_path.unlink() # Replace placeholder script with actual json data replacement_str = f'<script>window.treeListData = {json_str}</script>' match_str = '<script id="tree-list-data"></script>' output_file = gzip.decompress(pkg_resources.resource_string(__name__, 'index.html.gz')).decode() if match_str not in output_file: raise ValueError('Could not insert data in frontend.') output_file = output_file.replace(match_str, replacement_str) if '[[ReplaceDashboard]]' not in output_file: raise ValueError(f'Could not replace title in frontend.') # Replace web app title with custom title output_file = output_file.replace('[[ReplaceTitleDashboard]]', f'<title>{web_app_title}</title>') output_file = output_file.replace('[[ReplaceDashboard]]', web_app_title) file_path.write_text(output_file)
Classes
class BaseDataset (name: str, data_type: str, folder: Optional[Union[str, Path]] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None, partition_cols: Optional[Sequence[str]] = None, columns: Optional[Sequence[_ColumnType]] = None)-
Abstract Dataset class to be used when defining building your own datasets.
If you choose to build data interactivity through the data catalog, it is within your own dataset classes you would build authentication and connection logic.
For full usage of the available catalog features implement the functions
BaseDataset.get_samples()andBaseDataset.get_dates().Attributes
name- The name of the dataset
data_type- The data type of the dataset e.g. "parquet" or "bigquery
description- A description of the dataset as a string
partition_cols- A list of column names to be used for partitioning as strings
rich_description_path- A path to a markdown file with possibilities for longer, more detailed descriptions. Primarily used for generated catalog web page.
columns- A list of class ColumnType objects defining the columns of the dataset
path- The path to the dataset (combination of folder and name)
Expand source code
class BaseDataset(ABC): """Abstract Dataset class to be used when defining building your own datasets. If you choose to build data interactivity through the data catalog, it is within your own dataset classes you would build authentication and connection logic. For full usage of the available catalog features implement the functions `BaseDataset.get_samples` and `BaseDataset.get_dates`. Attributes: name: The name of the dataset data_type: The data type of the dataset e.g. "parquet" or "bigquery description: A description of the dataset as a string partition_cols: A list of column names to be used for partitioning as strings rich_description_path: A path to a markdown file with possibilities for longer, more detailed descriptions. Primarily used for generated catalog web page. columns: A list of class ColumnType objects defining the columns of the dataset path: The path to the dataset (combination of folder and name) """ _is_dataset: bool = True _type: str = 'Dataset' def __init__( self, name: str, data_type: str, folder: Optional[Union[str, Path]] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None, partition_cols: Optional[Sequence[str]] = None, columns: Optional[Sequence[_ColumnType]] = None, ) -> None: self.name = name self.data_type = data_type self.description = description self.rich_description_path = rich_description_path self.partition_cols = partition_cols self.database = database self.folder = folder self.path = None self._set_path() self._set_columns(columns) # _id used to build links in generated catalog website self._id: str = str(uuid.uuid4()) def _set_columns(self, columns: Optional[Sequence[_ColumnType]] = None) -> None: if columns is None: self.columns = columns return duplicated_columns = ', '.join( [f'"{col.name}"' for col, count in Counter(columns).items() if count > 1] ) if duplicated_columns: raise DuplicationError(f'Found duplication of column(s) {duplicated_columns} in dataset "{self.name}".') col_list = Columns(columns) for c in columns: setattr(col_list, c.name, c) self.columns = col_list def _set_path(self) -> None: if self.folder is None: return path = join_paths(self.folder, self.name).with_suffix(f'.{self.data_type}') self.path = path setattr(self, _PATH_VAR, path) def _describe(self) -> _DatasetInfo: info_obj = _DatasetInfo( name=self.name, data_type=self.data_type, description=self.description ) try: dates = self.get_dates() if dates is None: return info_obj info_obj.min_date = min(dates) info_obj.max_date = max(dates) info_obj.nbr_missing_dates = len(get_missing_dates(dates)) except NotImplementedError: pass return info_obj def show_columns(self, samples: bool = True) -> pd.DataFrame: """Returns a dataframe with information of the columns of this dataset, one column per row. Args: samples: When true will include a sample datapoint for all columns. Requires implementation of `BaseDataset.get_samples` function. Returns: A pandas dataframe with one column per row. """ if self.columns is None: return None column_df = pd.DataFrame([ cinfo.__dict__ for c in self.columns for cinfo in c._describe() ]) if samples: try: fetched_samples = self.get_samples() if fetched_samples: fetched_samples = {**fetched_samples, **flatten_dict(fetched_samples)} column_df.loc[:, 'Sample'] = column_df.name.apply(lambda x: fetched_samples.get(x, None)) except NotImplementedError: pass return column_df def show_dates(self) -> None: """ Will generate a grid plot of all available dates for this dataset. Requires `BaseDataset.get_dates` implemented. """ dates = self.get_dates() if dates is None: raise ValueError(f'No dates could be fetched from dataset {self}') return plot_date_availability_calendar(dates) def check_columns( self, column_list: Sequence[str], raise_undefined_columns: bool = False, raise_missing_columns: bool = False ) -> None: """ Will compare the sent in column list against the dataset's defined columns and inform (warn or raise) regarding any discrepancies. Args: column_list: A list of names of columns as strings raise_undefined_columns: Optional; If True will raise if columns found in column_list not defined in dataset raise_missing_columns: If True will raise if columns defined in dataset not found in column_list Raises: DatasetError: If any of raise flags are set to True Examples: >>> my_dataset.check_columns(df.columns, raise_undefined_columns=True) """ if self.columns is None: warnings.warn('Dataset has no columns specified.') return undefined_columns = set(column_list) - set([c.name for c in self.columns]) msg = f'The following columns are not defined in dataset: {list(undefined_columns)}' if raise_undefined_columns: raise DatasetError(msg) warnings.warn(msg) missing_columns = set([c.name for c in self.columns]) - set(column_list) msg = f'The following columns are missing from column_list: {list(missing_columns)}' if raise_missing_columns: raise DatasetError(msg) warnings.warn(msg) @property def _prefix(self) -> str: """Returns the prefix as either folder, database or empty string.""" if self.folder: return self.folder if self.database: return self.database return '' def __str__(self) -> str: prefix = self._prefix if prefix: return f'{prefix}:{self.name}' return self.name def __repr__(self) -> str: return self.__str__() def __eq__(self, o: BaseDataset) -> bool: return self.name == o.name and self._id == o._id def __hash__(self) -> int: return hash(self.__str__()) def _desc_(self) -> ShortDescription: return ShortDescription(name=self.name, type=self._type, description=self.description) def get_dates(self) -> Optional[Set[date]]: """Implement this function for date inspection functionality such as `BaseDataset.show_dates`. Should return a set of dates when called or None if dates for some reason could not be fetched. """ raise NotImplementedError def get_samples(self) -> Optional[Dict[str, Any]]: """Implement this function for sample inspection functionality used in e.g. `BaseDataset.show_columns`. Should return a dictionary of string keys for column names with samples: >>> {'my_column': 123} Nested columns should return names with dot-notation: >>> {'parent_column.my_column': 123} Or None if samples could not be fetched: >>> None """ raise NotImplementedErrorAncestors
- abc.ABC
Subclasses
Methods
def check_columns(self, column_list: Sequence[str], raise_undefined_columns: bool = False, raise_missing_columns: bool = False) ‑> None-
Will compare the sent in column list against the dataset's defined columns and inform (warn or raise) regarding any discrepancies.
Args
column_list- A list of names of columns as strings
raise_undefined_columns- Optional; If True will raise if columns found in column_list not defined in dataset
raise_missing_columns- If True will raise if columns defined in dataset not found in column_list
Raises
DatasetError- If any of raise flags are set to True
Examples:
>>> my_dataset.check_columns(df.columns, raise_undefined_columns=True)Expand source code
def check_columns( self, column_list: Sequence[str], raise_undefined_columns: bool = False, raise_missing_columns: bool = False ) -> None: """ Will compare the sent in column list against the dataset's defined columns and inform (warn or raise) regarding any discrepancies. Args: column_list: A list of names of columns as strings raise_undefined_columns: Optional; If True will raise if columns found in column_list not defined in dataset raise_missing_columns: If True will raise if columns defined in dataset not found in column_list Raises: DatasetError: If any of raise flags are set to True Examples: >>> my_dataset.check_columns(df.columns, raise_undefined_columns=True) """ if self.columns is None: warnings.warn('Dataset has no columns specified.') return undefined_columns = set(column_list) - set([c.name for c in self.columns]) msg = f'The following columns are not defined in dataset: {list(undefined_columns)}' if raise_undefined_columns: raise DatasetError(msg) warnings.warn(msg) missing_columns = set([c.name for c in self.columns]) - set(column_list) msg = f'The following columns are missing from column_list: {list(missing_columns)}' if raise_missing_columns: raise DatasetError(msg) warnings.warn(msg) def get_dates(self) ‑> Optional[Set[datetime.date]]-
Implement this function for date inspection functionality such as
BaseDataset.show_dates().Should return a set of dates when called or None if dates for some reason could not be fetched.
Expand source code
def get_dates(self) -> Optional[Set[date]]: """Implement this function for date inspection functionality such as `BaseDataset.show_dates`. Should return a set of dates when called or None if dates for some reason could not be fetched. """ raise NotImplementedError def get_samples(self) ‑> Optional[Dict[str, Any]]-
Implement this function for sample inspection functionality used in e.g.
BaseDataset.show_columns().Should return a dictionary of string keys for column names with samples:
>>> {'my_column': 123}Nested columns should return names with dot-notation:
>>> {'parent_column.my_column': 123}Or None if samples could not be fetched:
>>> NoneExpand source code
def get_samples(self) -> Optional[Dict[str, Any]]: """Implement this function for sample inspection functionality used in e.g. `BaseDataset.show_columns`. Should return a dictionary of string keys for column names with samples: >>> {'my_column': 123} Nested columns should return names with dot-notation: >>> {'parent_column.my_column': 123} Or None if samples could not be fetched: >>> None """ raise NotImplementedError def show_columns(self, samples: bool = True) ‑> pandas.core.frame.DataFrame-
Returns a dataframe with information of the columns of this dataset, one column per row.
Args
samples- When true will include a sample datapoint for all columns.
Requires implementation of
BaseDataset.get_samples()function.
Returns
A pandas dataframe with one column per row.
Expand source code
def show_columns(self, samples: bool = True) -> pd.DataFrame: """Returns a dataframe with information of the columns of this dataset, one column per row. Args: samples: When true will include a sample datapoint for all columns. Requires implementation of `BaseDataset.get_samples` function. Returns: A pandas dataframe with one column per row. """ if self.columns is None: return None column_df = pd.DataFrame([ cinfo.__dict__ for c in self.columns for cinfo in c._describe() ]) if samples: try: fetched_samples = self.get_samples() if fetched_samples: fetched_samples = {**fetched_samples, **flatten_dict(fetched_samples)} column_df.loc[:, 'Sample'] = column_df.name.apply(lambda x: fetched_samples.get(x, None)) except NotImplementedError: pass return column_df def show_dates(self) ‑> None-
Will generate a grid plot of all available dates for this dataset. Requires
BaseDataset.get_dates()implemented.Expand source code
def show_dates(self) -> None: """ Will generate a grid plot of all available dates for this dataset. Requires `BaseDataset.get_dates` implemented. """ dates = self.get_dates() if dates is None: raise ValueError(f'No dates could be fetched from dataset {self}') return plot_date_availability_calendar(dates)
class Catalog-
Inheritable Catalog class, used when building your own data catalog.
The namesake of the python package, this class will turn your code from just being code into a data catalog. This class will make your datasets iterable, testable, referenceable and more. You can also decorate the catalog with the
Catalog.setup()function, giving your catalog a description and enriching datasets within it.Examples:
>>> # Without decorator >>> from catalog import Catalog >>> class MyCatalog(Catalog): ... my_dataset = Dataset(...)>>> # With decorator >>> from catalog import Catalog >>> @Catalog.setup(folder='sales', description='Datasets related to sales.') >>> class SalesCatalog(Catalog): ... my_dales_dataset = Dataset(...)Expand source code
class Catalog: """Inheritable Catalog class, used when building your own data catalog. The namesake of the python package, this class will turn your code from just being code into a data catalog. This class will make your datasets iterable, testable, referenceable and more. You can also decorate the catalog with the `Catalog.setup` function, giving your catalog a description and enriching datasets within it. Examples: >>> # Without decorator >>> from catalog import Catalog >>> class MyCatalog(Catalog): ... my_dataset = Dataset(...) >>> # With decorator >>> from catalog import Catalog >>> @Catalog.setup(folder='sales', description='Datasets related to sales.') >>> class SalesCatalog(Catalog): ... my_dales_dataset = Dataset(...) """ _is_catalog: bool = True _type: str = 'Catalog' _folder: str = None _database: str = None _description: str = None _rich_description_path: str = None @staticmethod def setup( cls: Catalog = None, folder: Optional[str] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None ) -> Catalog: """Decorator enriching the catalog with a description, and optionally binding a folder or database to all datasets within it. Args: folder: Used for filestore style datasets (e.g. spark), build the catalogs folder structure. database: Used for database style datasets (e.g. bigquery, aws glue) builds the catalogs database structure. description: A description of this catalog. rich_description_path: Path to markdown file with richer descriptions of this catalog. """ def wrap(cls: Catalog): if not getattr(cls, '_is_catalog', False): raise ValueError(f'Class {cls} must inherit Catalog class.') cls._folder = folder cls._database = database cls._description = description cls._rich_description_path = rich_description_path _enrich_datasets(cls, folder=folder, database=database) return cls # When called with pathentheses "@catalog()" if cls is None: return wrap # When called without pathentheses "@catalog" return wrap(cls) @classmethod def get_catalogs(cls, recursive: bool = True) -> Sequence[Catalog]: """Get a list of all sub-catalogs of this catalog, not including self. Args: recursive: Flag whether to search for sub-catalog in this catalog's sub-catalogs. Returns: A list of objects inheriting the Catalog class. """ catalog_list = [] for obj in cls.__dict__.values(): catalog = is_catalog(obj) if catalog: catalog_list.append(catalog) if recursive: catalog_list.extend(catalog.get_catalogs()) return catalog_list @classmethod def get_datasets(cls, recursive: bool = True) -> Sequence[BaseDataset]: """Returns a list of all datasets within this catalog, recursively if flag is set. Args: recursive: When set to true this function will fetch datasets from sub-catalogs of this catalog. Returns: A list of dataset objects. """ dataset_list = [] for obj in cls.__dict__.values(): dataset = is_dataset(obj) if dataset: dataset_list.append(obj) elif recursive: catalog = is_catalog(obj) if catalog: dataset_list.extend(catalog.get_datasets()) return dataset_list @classmethod def get_columns_datasets(cls, recursive: bool = True) -> Dict[_ColumnType, Sequence[BaseDataset]]: column_dict = defaultdict(list) for dataset in cls.get_datasets(recursive=recursive): if dataset.columns is None: continue for column in dataset.columns: column_dict[column].append(dataset) return column_dict @classmethod def show_datasets(cls, recursive: bool = True) -> DFDisplay: """Returns a DFDisplay with a description of all datasets in this catalog, one dataset per row. Args: recursive: Whether to show dataset recursively in subcatalogs. Returns: DFDisplay (pandas dataframe) """ return DFDisplay([ d._describe().__dict__ for d in cls.get_datasets(recursive=recursive) ]) @classmethod def show_columns(cls, recursive: bool = True) -> DFDisplay: """Returns a pandas dataframe with a description of all columns in this catalog, one column per row. Args: recursive: Whether to show dataset recursively in subcatalogs. Returns: DFDisplay (pandas dataframe) """ df = DFDisplay([ { **col.__dict__, 'datasets': datasets } for col_obj, datasets in cls.get_columns_datasets(recursive=recursive).items() for col in col_obj._describe() ]) return df.sort_values('name').reset_index(drop=True) @classmethod def _all_descriptions(cls, recursive=True) -> Sequence[ShortDescription]: """Collect descriptions for all Catalogs, Datasets and Columns. Args: recursive: Whether to search recursively through sub-catalogs. Returns: A sequence of ShortDescription objects """ column_descriptions = [] for c in cls.get_columns_datasets(recursive=recursive).keys(): desc = c._desc_() # Columns will sometimes give a sequence if it is a nested column if isinstance(desc, Sequence): column_descriptions.extend(desc) else: column_descriptions.append(desc) return [ *column_descriptions, *[d._desc_() for d in cls.get_datasets(recursive=recursive)], *[c._desc_() for c in cls.get_catalogs(recursive=recursive)], cls._desc_() ] @classmethod def search(cls, query_str: str, recursive: bool = True, max_hits: int = 5, min_relevance: float = .1) -> DFDisplay: """Searches across names and descriptions of Catalogs, Datasets and Columns. * Search on name is based on Levenshtein distance (fuzzy search). * Search on description is based on cosine similarity of TF-IDF matrix. Args: query_str: The string to base the search on. recursive: Whether to search recursively through sub-catalogs. max_hits: Maximum number of hits to return. min_relevance: Minimum required relevance score to return a hit. Returns: A DFDisplay (pandas) dataframe with hits sorted on relevance. """ # Create dictionaries to collect into unique name/desc keys name_dict, desc_dict = defaultdict(list), defaultdict(list) for short_desc in cls._all_descriptions(recursive=recursive): name = short_desc.name if name is not None: name_dict[name].append(short_desc) # Take both original and split version if we're dealing with nested columns # E.g. ratings.taste if '.' in name: name_dict[name.split('.')[-1]].append(short_desc) if short_desc.description is not None: desc_dict[short_desc.description].append(short_desc) l_search = levenshtein.sort(query_str, list(name_dict.keys()), min_similarity=.5) try: tf_idf_search = tf_idf.sort(query_str, list(desc_dict.keys())) except ValueError: # Raised when we get no vocabulary matches at all tf_idf_search = [] relevance_col = 'relevance' hit_df = DFDisplay([ *[ { **short_desc.__dict__, relevance_col: hit.score, 'hit_on': 'name' } for hit in l_search for short_desc in name_dict[hit.match_string] ], *[ { **short_desc.__dict__, relevance_col: hit.score, 'hit_on': 'description' } for hit in tf_idf_search for short_desc in desc_dict[hit.match_string] ] ]) error_msg = f'No hits good enough found on query string: "{query_str}"' if len(hit_df) == 0: raise ValueError(error_msg) hit_df = hit_df[hit_df[relevance_col] > min_relevance] if len(hit_df) == 0: raise ValueError(error_msg) hit_df.loc[:, relevance_col] = hit_df[relevance_col].round(2) return ( hit_df .sort_values(relevance_col, ascending=False) .drop_duplicates(subset=['name', 'type']) [:max_hits] ) @classmethod def _desc_(self) -> ShortDescription: return ShortDescription(name=self.__name__, type=self._type, description=self._description)Static methods
def get_catalogs(recursive: bool = True) ‑> Sequence[hela._catalog_class.Catalog]-
Get a list of all sub-catalogs of this catalog, not including self.
Args
recursive- Flag whether to search for sub-catalog in this catalog's sub-catalogs.
Returns
A list of objects inheriting the Catalog class.
Expand source code
@classmethod def get_catalogs(cls, recursive: bool = True) -> Sequence[Catalog]: """Get a list of all sub-catalogs of this catalog, not including self. Args: recursive: Flag whether to search for sub-catalog in this catalog's sub-catalogs. Returns: A list of objects inheriting the Catalog class. """ catalog_list = [] for obj in cls.__dict__.values(): catalog = is_catalog(obj) if catalog: catalog_list.append(catalog) if recursive: catalog_list.extend(catalog.get_catalogs()) return catalog_list def get_columns_datasets(recursive: bool = True) ‑> Dict[hela._column_classes._ColumnType, Sequence[hela._base_dataset.BaseDataset]]-
Expand source code
@classmethod def get_columns_datasets(cls, recursive: bool = True) -> Dict[_ColumnType, Sequence[BaseDataset]]: column_dict = defaultdict(list) for dataset in cls.get_datasets(recursive=recursive): if dataset.columns is None: continue for column in dataset.columns: column_dict[column].append(dataset) return column_dict def get_datasets(recursive: bool = True) ‑> Sequence[hela._base_dataset.BaseDataset]-
Returns a list of all datasets within this catalog, recursively if flag is set.
Args
recursive- When set to true this function will fetch datasets from sub-catalogs of this catalog.
Returns
A list of dataset objects.
Expand source code
@classmethod def get_datasets(cls, recursive: bool = True) -> Sequence[BaseDataset]: """Returns a list of all datasets within this catalog, recursively if flag is set. Args: recursive: When set to true this function will fetch datasets from sub-catalogs of this catalog. Returns: A list of dataset objects. """ dataset_list = [] for obj in cls.__dict__.values(): dataset = is_dataset(obj) if dataset: dataset_list.append(obj) elif recursive: catalog = is_catalog(obj) if catalog: dataset_list.extend(catalog.get_datasets()) return dataset_list def search(query_str: str, recursive: bool = True, max_hits: int = 5, min_relevance: float = 0.1) ‑> DFDisplay-
Searches across names and descriptions of Catalogs, Datasets and Columns.
- Search on name is based on Levenshtein distance (fuzzy search).
- Search on description is based on cosine similarity of TF-IDF matrix.
Args
query_str- The string to base the search on.
recursive- Whether to search recursively through sub-catalogs.
max_hits- Maximum number of hits to return.
min_relevance- Minimum required relevance score to return a hit.
Returns
A DFDisplay (pandas) dataframe with hits sorted on relevance.
Expand source code
@classmethod def search(cls, query_str: str, recursive: bool = True, max_hits: int = 5, min_relevance: float = .1) -> DFDisplay: """Searches across names and descriptions of Catalogs, Datasets and Columns. * Search on name is based on Levenshtein distance (fuzzy search). * Search on description is based on cosine similarity of TF-IDF matrix. Args: query_str: The string to base the search on. recursive: Whether to search recursively through sub-catalogs. max_hits: Maximum number of hits to return. min_relevance: Minimum required relevance score to return a hit. Returns: A DFDisplay (pandas) dataframe with hits sorted on relevance. """ # Create dictionaries to collect into unique name/desc keys name_dict, desc_dict = defaultdict(list), defaultdict(list) for short_desc in cls._all_descriptions(recursive=recursive): name = short_desc.name if name is not None: name_dict[name].append(short_desc) # Take both original and split version if we're dealing with nested columns # E.g. ratings.taste if '.' in name: name_dict[name.split('.')[-1]].append(short_desc) if short_desc.description is not None: desc_dict[short_desc.description].append(short_desc) l_search = levenshtein.sort(query_str, list(name_dict.keys()), min_similarity=.5) try: tf_idf_search = tf_idf.sort(query_str, list(desc_dict.keys())) except ValueError: # Raised when we get no vocabulary matches at all tf_idf_search = [] relevance_col = 'relevance' hit_df = DFDisplay([ *[ { **short_desc.__dict__, relevance_col: hit.score, 'hit_on': 'name' } for hit in l_search for short_desc in name_dict[hit.match_string] ], *[ { **short_desc.__dict__, relevance_col: hit.score, 'hit_on': 'description' } for hit in tf_idf_search for short_desc in desc_dict[hit.match_string] ] ]) error_msg = f'No hits good enough found on query string: "{query_str}"' if len(hit_df) == 0: raise ValueError(error_msg) hit_df = hit_df[hit_df[relevance_col] > min_relevance] if len(hit_df) == 0: raise ValueError(error_msg) hit_df.loc[:, relevance_col] = hit_df[relevance_col].round(2) return ( hit_df .sort_values(relevance_col, ascending=False) .drop_duplicates(subset=['name', 'type']) [:max_hits] ) def setup(cls: Catalog = None, folder: Optional[str] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None) ‑> hela._catalog_class.Catalog-
Decorator enriching the catalog with a description, and optionally binding a folder or database to all datasets within it.
Args
folder- Used for filestore style datasets (e.g. spark), build the catalogs folder structure.
database- Used for database style datasets (e.g. bigquery, aws glue) builds the catalogs database structure.
description- A description of this catalog.
rich_description_path- Path to markdown file with richer descriptions of this catalog.
Expand source code
@staticmethod def setup( cls: Catalog = None, folder: Optional[str] = None, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None ) -> Catalog: """Decorator enriching the catalog with a description, and optionally binding a folder or database to all datasets within it. Args: folder: Used for filestore style datasets (e.g. spark), build the catalogs folder structure. database: Used for database style datasets (e.g. bigquery, aws glue) builds the catalogs database structure. description: A description of this catalog. rich_description_path: Path to markdown file with richer descriptions of this catalog. """ def wrap(cls: Catalog): if not getattr(cls, '_is_catalog', False): raise ValueError(f'Class {cls} must inherit Catalog class.') cls._folder = folder cls._database = database cls._description = description cls._rich_description_path = rich_description_path _enrich_datasets(cls, folder=folder, database=database) return cls # When called with pathentheses "@catalog()" if cls is None: return wrap # When called without pathentheses "@catalog" return wrap(cls) def show_columns(recursive: bool = True) ‑> DFDisplay-
Returns a pandas dataframe with a description of all columns in this catalog, one column per row.
Args
recursive- Whether to show dataset recursively in subcatalogs.
Returns
DFDisplay (pandas dataframe)
Expand source code
@classmethod def show_columns(cls, recursive: bool = True) -> DFDisplay: """Returns a pandas dataframe with a description of all columns in this catalog, one column per row. Args: recursive: Whether to show dataset recursively in subcatalogs. Returns: DFDisplay (pandas dataframe) """ df = DFDisplay([ { **col.__dict__, 'datasets': datasets } for col_obj, datasets in cls.get_columns_datasets(recursive=recursive).items() for col in col_obj._describe() ]) return df.sort_values('name').reset_index(drop=True) def show_datasets(recursive: bool = True) ‑> DFDisplay-
Returns a DFDisplay with a description of all datasets in this catalog, one dataset per row.
Args
recursive- Whether to show dataset recursively in subcatalogs.
Returns
DFDisplay (pandas dataframe)
Expand source code
@classmethod def show_datasets(cls, recursive: bool = True) -> DFDisplay: """Returns a DFDisplay with a description of all datasets in this catalog, one dataset per row. Args: recursive: Whether to show dataset recursively in subcatalogs. Returns: DFDisplay (pandas dataframe) """ return DFDisplay([ d._describe().__dict__ for d in cls.get_datasets(recursive=recursive) ])
class Col (name: str, data_type: PrimitiveType, description: str = None)-
A basic column object, for nested columns see
NestedCol.This class is used to define columns within a
BaseDatasetorcolumn_store(). Each defined column will be searchable, testable and referenceable.If you want to give further functionality to your columns, please this class.
Attributes
name- The name of the column.
data_type- The data type of the column, should be one of types found in
hela.data_types description- A description of this column as a string, better descriptions yield a more secure catalog.
Examples:
>>> from hela import Col >>> from hela.data_types import String >>> my_col = Col('my_col', String(), 'This is an example column')Expand source code
class Col(_ColumnType): """A basic column object, for nested columns see `hela.NestedCol`. This class is used to define columns within a `hela.BaseDataset` or `hela.column_store`. Each defined column will be searchable, testable and referenceable. If you want to give further functionality to your columns, please this class. Attributes: name: The name of the column. data_type: The data type of the column, should be one of types found in `hela.data_types` description: A description of this column as a string, better descriptions yield a more secure catalog. Examples: >>> from hela import Col >>> from hela.data_types import String >>> my_col = Col('my_col', String(), 'This is an example column') """ def __init__( self, name: str, data_type: PrimitiveType, description: str = None, ) -> None: super().__init__(name=name, data_type=data_type, description=description) def _describe(self) -> Sequence[_ColInfo]: return [_ColInfo( name=self.name, data_type=str(self.data_type), description=self.description, from_store=self.from_store )] def _spark_type(self): return StructField(name=self.name, dataType=self.data_type._spark_type()) def _glue_type(self): return GlueColumn(name=self.name, type=self.data_type._glue_type(), comment=self.description) def _json_type(self): return {self.name: self.data_type._json_type()} def _bigquery_type(self): return bigquery.SchemaField( name=self.name, description=self.description, **self.data_type._bigquery_type().__dict__ ) def __str__(self) -> str: return f'Col(name="{self.name}", data_type={self.data_type}, description={self.description})' def __repr__(self) -> str: return self.__str__()Ancestors
- hela._column_classes._ColumnType
- hela._base_data_type.BaseDataType
- abc.ABC
class NestedCol (name: str, columns: Sequence[_ColumnType])-
A nested style column object, should be instantiated with sub columns.
Most data stores support nested style columns, these can be built using this column class. These columns will be referenced with dot-notation when shown in the catalog. For dict/struct style columns see
Struct.Attributes
name- The name of the column.
columns- A sequence of columns nested within this column. Can be Col or NestedCol objects.
Examples:
>>> from hela import NestedCol, Col >>> from hela.data_types import String, Int >>> my_col = NestedCol('my_nested_col', [ ... Col('nested_string', String(), 'Nested string column'), ... Col('nested_int', Int(), 'Nested int column') ... ])Expand source code
class NestedCol(_ColumnType): """A nested style column object, should be instantiated with sub columns. Most data stores support nested style columns, these can be built using this column class. These columns will be referenced with dot-notation when shown in the **catalog**. For dict/struct style columns see `hela.data_types.Struct`. Attributes: name: The name of the column. columns: A sequence of columns nested within this column. Can be Col or NestedCol objects. Examples: >>> from hela import NestedCol, Col >>> from hela.data_types import String, Int >>> my_col = NestedCol('my_nested_col', [ ... Col('nested_string', String(), 'Nested string column'), ... Col('nested_int', Int(), 'Nested int column') ... ]) """ def __init__(self, name: str, columns: Sequence[_ColumnType]) -> None: self.columns = columns data_type = Struct({c.name: c.data_type for c in columns}) super().__init__(name=name, data_type=data_type, description='A subset of columns.') def _describe(self) -> Sequence[_ColInfo]: col_info_list = [] for c in self.columns: for c_desc in c._describe(): desc = copy(c_desc) desc.name = f'{self.name}.{desc.name}' col_info_list.append(desc) return col_info_list def _spark_type(self): return StructField(name=self.name, dataType=self.data_type._spark_type()) def _glue_type(self): return GlueColumn(name=self.name, type=self.data_type._glue_type()) def _json_type(self): return {self.name: self.data_type._json_type()} def _bigquery_type(self): return bigquery.SchemaField( name=self.name, field_type='RECORD', mode=BigqueryMode.NULLABLE, fields=[c._bigquery_type() for c in self.columns] ) def __str__(self) -> str: subs = [str(c) for c in self.columns] return f'NestedCol(name="{self.name}", subcols={subs})' def _desc_(self) -> Sequence[ShortDescription]: desc_list = [] for c_desc in self.columns: desc = copy(c_desc._desc_()) desc.name = f'{self.name}.{c_desc.name}' desc_list.append(desc) return desc_list def __repr__(self) -> str: return self.__str__()Ancestors
- hela._column_classes._ColumnType
- hela._base_data_type.BaseDataType
- abc.ABC