Module hela.schema_generators

Module used to translate from catalog schema to other schema types.

Expand source code
"""Module used to translate from catalog schema to other schema types."""
from hela._column_classes import _ColumnType
from hela import BaseDataset
from typing import Sequence, Union


def _get_columns(obj: Union[Sequence[_ColumnType], BaseDataset]) -> Sequence[_ColumnType]:
    if isinstance(obj, BaseDataset):
        obj = obj.columns
    if len(obj) == 0:
        raise ValueError('The column list is empty.')
    return obj


def spark_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    from pyspark.sql.types import StructType
    return StructType([c._spark_type() for c in col_list])


def aws_glue_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    return [c._glue_type() for c in col_list]


def bigquery_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    return [c._bigquery_type() for c in col_list]


def json_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    properties = {}
    for c in col_list:
        properties.update(c._json_type())
    return {
        '$schema': 'http://json-schema.org/draft-07/schema',
        'type': 'object',
        'properties': properties
    }

Functions

def aws_glue_schema(obj: Union[Sequence[hela._column_classes._ColumnType], hela._base_dataset.BaseDataset])
Expand source code
def aws_glue_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    return [c._glue_type() for c in col_list]
def bigquery_schema(obj: Union[Sequence[hela._column_classes._ColumnType], hela._base_dataset.BaseDataset])
Expand source code
def bigquery_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    return [c._bigquery_type() for c in col_list]
def json_schema(obj: Union[Sequence[hela._column_classes._ColumnType], hela._base_dataset.BaseDataset])
Expand source code
def json_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    properties = {}
    for c in col_list:
        properties.update(c._json_type())
    return {
        '$schema': 'http://json-schema.org/draft-07/schema',
        'type': 'object',
        'properties': properties
    }
def spark_schema(obj: Union[Sequence[hela._column_classes._ColumnType], hela._base_dataset.BaseDataset])
Expand source code
def spark_schema(obj: Union[Sequence[_ColumnType], BaseDataset]):
    col_list = _get_columns(obj)
    from pyspark.sql.types import StructType
    return StructType([c._spark_type() for c in col_list])