Module hela.infer
Includes functions to infer Catalog schemas on various data structures.
Expand source code
"""Includes functions to infer Catalog schemas on various data structures."""
from json.decoder import JSONDecodeError
import warnings
import pandas as pd
import json
from typing import List, Union
from pathlib import Path
from collections import defaultdict
from hela.data_types import PrimitiveType, String, Struct, Array
from hela._utils.maps import python_to_data_type_map
from hela import Col
from hela._column_classes import _ColumnType
from hela.errors import InferralError
from datetime import date
def _deep_string_inferral(s: pd.Series) -> PrimitiveType:
# Attempt to parse column as date
try:
return _infer_pandas_column_type(s.apply(lambda x: date.fromisoformat(x)))
except ValueError:
pass
# Attempt to parse column as datetime
try:
return _infer_pandas_column_type(pd.to_datetime(s))
except ValueError:
pass
# Attempt to parse column as a json structure (list/dict)
try:
return _infer_pandas_column_type(s.apply(lambda x: json.loads(x.replace("'", '"'))))
except JSONDecodeError:
pass
except TypeError:
pass
return String()
def _infer_pandas_column_type(s: pd.Series, deep: bool = True) -> PrimitiveType:
s = s.dropna()
type_set = set([type(x) for x in s])
if len(type_set) > 1:
raise InferralError(f'Could not infer data type for column {s.name}, multiple types found: {type_set}.')
type_ = list(type_set)[0]
if deep and type_ == str:
return _deep_string_inferral(s)
mapped_type = python_to_data_type_map.get(type_, None)
if mapped_type is not None:
return mapped_type
if type_ == dict:
combined_dicts = defaultdict(list)
for d in s:
for k, v in d.items():
combined_dicts[k].append(v)
return Struct({
k: _infer_pandas_column_type(pd.Series(v, name=k), deep=deep)
for k, v in combined_dicts.items()
})
if type_ == list:
all_vals = []
for sub_list in s:
all_vals.extend(sub_list)
return Array(_infer_pandas_column_type(pd.Series(sub_list, name=s.name), deep=deep))
raise InferralError(f'Could not infer data type for column "{s.name}".')
def infer_schema_pandas(df: pd.DataFrame, raise_infer_errors=True, deep=True,
sample_size=10**6
) -> List[_ColumnType]:
"""Attempts to infer the types of all columns in a pandas dataframe.
Can handle nested (dict) columns.
Args:
df: A pandas dataframe
raise_infer_errors: If raise_infer_errors is False, will default to String() and issue a warning
deep: Will attempt to json load string types
sample_size: Max number of rows to use for sampling (default 10**6)
Returns:
A list of Col objects
Raises:
InferralError: When a column or subcolumn could not be inferred
"""
sample_df = df.sample(min(len(df), sample_size))
cols = []
for col, vals in sample_df.items():
try:
cols.append(Col(name=col, data_type=_infer_pandas_column_type(vals.copy(), deep=deep)))
except InferralError as e:
if raise_infer_errors:
raise e
warnings.warn(str(e) + ' Defaulting to String()')
cols.append(Col(name=col, data_type=String()))
return cols
def infer_schema_json(path_or_str: Union[Path, str],
raise_infer_errors=True, deep=True) -> List[_ColumnType]:
"""Attempts to infer the types of all objects in a json
Args:
path_or_str: JSON as string, JSON line (newline separator), Path to JSON, or string path to JSON
raise_infer_errors: If raise_infer_errors is False, will default to String() and issue a warning
deep: Will attempt to json load string types
Returns:
A list of Col objects
Raises:
InferralError: When a column or subcolumn could not be inferred
"""
if isinstance(path_or_str, str) and path_or_str[-5:] == '.json':
path_or_str = Path(path_or_str).read_text()
elif isinstance(path_or_str, Path):
path_or_str = path_or_str.read_text()
try:
dict_list = json.loads(path_or_str)
except JSONDecodeError:
dict_list = [json.loads(x) for x in path_or_str.split('\n')]
try:
df = pd.DataFrame(dict_list)
except ValueError:
df = pd.DataFrame([dict_list])
return infer_schema_pandas(df, raise_infer_errors=raise_infer_errors, deep=deep)
Functions
def infer_schema_json(path_or_str: Union[pathlib.Path, str], raise_infer_errors=True, deep=True) ‑> List[hela._column_classes._ColumnType]-
Attempts to infer the types of all objects in a json
Args
path_or_str- JSON as string, JSON line (newline separator), Path to JSON, or string path to JSON
raise_infer_errors- If raise_infer_errors is False, will default to String() and issue a warning
deep- Will attempt to json load string types
Returns
A list of Col objects
Raises
InferralError- When a column or subcolumn could not be inferred
Expand source code
def infer_schema_json(path_or_str: Union[Path, str], raise_infer_errors=True, deep=True) -> List[_ColumnType]: """Attempts to infer the types of all objects in a json Args: path_or_str: JSON as string, JSON line (newline separator), Path to JSON, or string path to JSON raise_infer_errors: If raise_infer_errors is False, will default to String() and issue a warning deep: Will attempt to json load string types Returns: A list of Col objects Raises: InferralError: When a column or subcolumn could not be inferred """ if isinstance(path_or_str, str) and path_or_str[-5:] == '.json': path_or_str = Path(path_or_str).read_text() elif isinstance(path_or_str, Path): path_or_str = path_or_str.read_text() try: dict_list = json.loads(path_or_str) except JSONDecodeError: dict_list = [json.loads(x) for x in path_or_str.split('\n')] try: df = pd.DataFrame(dict_list) except ValueError: df = pd.DataFrame([dict_list]) return infer_schema_pandas(df, raise_infer_errors=raise_infer_errors, deep=deep) def infer_schema_pandas(df: pandas.core.frame.DataFrame, raise_infer_errors=True, deep=True, sample_size=1000000) ‑> List[hela._column_classes._ColumnType]-
Attempts to infer the types of all columns in a pandas dataframe.
Can handle nested (dict) columns.
Args
df- A pandas dataframe
raise_infer_errors- If raise_infer_errors is False, will default to String() and issue a warning
deep- Will attempt to json load string types
sample_size- Max number of rows to use for sampling (default 10**6)
Returns
A list of Col objects
Raises
InferralError- When a column or subcolumn could not be inferred
Expand source code
def infer_schema_pandas(df: pd.DataFrame, raise_infer_errors=True, deep=True, sample_size=10**6 ) -> List[_ColumnType]: """Attempts to infer the types of all columns in a pandas dataframe. Can handle nested (dict) columns. Args: df: A pandas dataframe raise_infer_errors: If raise_infer_errors is False, will default to String() and issue a warning deep: Will attempt to json load string types sample_size: Max number of rows to use for sampling (default 10**6) Returns: A list of Col objects Raises: InferralError: When a column or subcolumn could not be inferred """ sample_df = df.sample(min(len(df), sample_size)) cols = [] for col, vals in sample_df.items(): try: cols.append(Col(name=col, data_type=_infer_pandas_column_type(vals.copy(), deep=deep))) except InferralError as e: if raise_infer_errors: raise e warnings.warn(str(e) + ' Defaulting to String()') cols.append(Col(name=col, data_type=String())) return cols