Module hela.datasets.bigquery_dataset
Expand source code
import os
import pandas as pd
from typing import Optional, Sequence
from hela import BaseDataset
from hela._column_classes import _ColumnType
from hela.schema_generators import bigquery_schema
from hela._utils.dict_utils import flatten_dict
try:
from google.cloud import bigquery
except ModuleNotFoundError:
pass
class BigqueryDataset(BaseDataset):
def __init__(
self,
name: str,
database: Optional[str] = None,
description: Optional[str] = None,
rich_description_path: Optional[str] = None,
partition_cols: Optional[Sequence[str]] = None,
columns: Optional[Sequence[_ColumnType]] = None
) -> None:
super().__init__(
name,
data_type='bigquery',
folder=None,
database=database,
description=description,
rich_description_path=rich_description_path,
partition_cols=partition_cols,
columns=columns
)
self.client = bigquery.Client()
@property
def project_id(self) -> str:
"""Returns the bigquery project id, assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT"""
return os.environ['CLOUDSDK_CORE_PROJECT']
@property
def table_id(self) -> str:
"""Returns the full table id in style of: "project_id.database_id.table_id"
Assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT"
"""
return f'{self.project_id}.{self.database}.{self.name}'
def write_json(self, json_rows: Sequence[dict]) -> None:
self.client.insert_rows_json(self.table_id, json_rows)
def query_table(self, query_string: str) -> pd.DataFrame:
"""Query this dataset in bigquery. In the FROM part simply write TABLE.
>>> SELECT * FROM TABLE
instead of
>>> SELECT * FROM <some_table_id>
Args:
query_string: A query for this dataset/table
"""
query_string = query_string.replace('FROM TABLE', f'FROM {self.table_id}')
return self.client.query(query_string).to_dataframe()
def create_table(self) -> None:
"""Create a bq table given this datasets column schema."""
table = bigquery.Table(
self.table_id,
schema=bigquery_schema(self.columns)
)
self.client.create_table(table)
def create_database(self) -> None:
"""Create a bq database given this datasets database (or its catalog's database)"""
if self.database is None:
raise ValueError('Database is None')
self.client.create_dataset(f'{self.project_id}.{self.database}')
def delete_table(self) -> None:
"""Delete this table from bigquery"""
self.client.delete_table(self.table_id)
def get_samples(self) -> dict:
df = self.query_table('SELECT * FROM TABLE LIMIT 1')
sample_dict = df.to_dict(orient='records')[0]
# Return both original and flattened keys to make sure we have both
# my_column {a: 1, b: 2}
# and
# my_column.a
# my_column.b
return {
**sample_dict,
**flatten_dict(sample_dict)
}
Classes
class BigqueryDataset (name: str, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None, partition_cols: Optional[Sequence[str]] = None, columns: Optional[Sequence[hela._column_classes._ColumnType]] = None)-
Abstract Dataset class to be used when defining building your own datasets.
If you choose to build data interactivity through the data catalog, it is within your own dataset classes you would build authentication and connection logic.
For full usage of the available catalog features implement the functions
BaseDataset.get_samplesandBaseDataset.get_dates.Attributes
name- The name of the dataset
data_type- The data type of the dataset e.g. "parquet" or "bigquery
description- A description of the dataset as a string
partition_cols- A list of column names to be used for partitioning as strings
rich_description_path- A path to a markdown file with possibilities for longer, more detailed descriptions. Primarily used for generated catalog web page.
columns- A list of class ColumnType objects defining the columns of the dataset
path- The path to the dataset (combination of folder and name)
Expand source code
class BigqueryDataset(BaseDataset): def __init__( self, name: str, database: Optional[str] = None, description: Optional[str] = None, rich_description_path: Optional[str] = None, partition_cols: Optional[Sequence[str]] = None, columns: Optional[Sequence[_ColumnType]] = None ) -> None: super().__init__( name, data_type='bigquery', folder=None, database=database, description=description, rich_description_path=rich_description_path, partition_cols=partition_cols, columns=columns ) self.client = bigquery.Client() @property def project_id(self) -> str: """Returns the bigquery project id, assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT""" return os.environ['CLOUDSDK_CORE_PROJECT'] @property def table_id(self) -> str: """Returns the full table id in style of: "project_id.database_id.table_id" Assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT" """ return f'{self.project_id}.{self.database}.{self.name}' def write_json(self, json_rows: Sequence[dict]) -> None: self.client.insert_rows_json(self.table_id, json_rows) def query_table(self, query_string: str) -> pd.DataFrame: """Query this dataset in bigquery. In the FROM part simply write TABLE. >>> SELECT * FROM TABLE instead of >>> SELECT * FROM <some_table_id> Args: query_string: A query for this dataset/table """ query_string = query_string.replace('FROM TABLE', f'FROM {self.table_id}') return self.client.query(query_string).to_dataframe() def create_table(self) -> None: """Create a bq table given this datasets column schema.""" table = bigquery.Table( self.table_id, schema=bigquery_schema(self.columns) ) self.client.create_table(table) def create_database(self) -> None: """Create a bq database given this datasets database (or its catalog's database)""" if self.database is None: raise ValueError('Database is None') self.client.create_dataset(f'{self.project_id}.{self.database}') def delete_table(self) -> None: """Delete this table from bigquery""" self.client.delete_table(self.table_id) def get_samples(self) -> dict: df = self.query_table('SELECT * FROM TABLE LIMIT 1') sample_dict = df.to_dict(orient='records')[0] # Return both original and flattened keys to make sure we have both # my_column {a: 1, b: 2} # and # my_column.a # my_column.b return { **sample_dict, **flatten_dict(sample_dict) }Ancestors
- hela._base_dataset.BaseDataset
- abc.ABC
Instance variables
var project_id : str-
Returns the bigquery project id, assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT
Expand source code
@property def project_id(self) -> str: """Returns the bigquery project id, assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT""" return os.environ['CLOUDSDK_CORE_PROJECT'] var table_id : str-
Returns the full table id in style of: "project_id.database_id.table_id" Assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT"
Expand source code
@property def table_id(self) -> str: """Returns the full table id in style of: "project_id.database_id.table_id" Assumes project_id exists under env variable "CLOUDSDK_CORE_PROJECT" """ return f'{self.project_id}.{self.database}.{self.name}'
Methods
def create_database(self) ‑> None-
Create a bq database given this datasets database (or its catalog's database)
Expand source code
def create_database(self) -> None: """Create a bq database given this datasets database (or its catalog's database)""" if self.database is None: raise ValueError('Database is None') self.client.create_dataset(f'{self.project_id}.{self.database}') def create_table(self) ‑> None-
Create a bq table given this datasets column schema.
Expand source code
def create_table(self) -> None: """Create a bq table given this datasets column schema.""" table = bigquery.Table( self.table_id, schema=bigquery_schema(self.columns) ) self.client.create_table(table) def delete_table(self) ‑> None-
Delete this table from bigquery
Expand source code
def delete_table(self) -> None: """Delete this table from bigquery""" self.client.delete_table(self.table_id) def get_samples(self) ‑> dict-
Implement this function for sample inspection functionality used in e.g.
BaseDataset.show_columns.Should return a dictionary of string keys for column names with samples:
>>> {'my_column': 123}Nested columns should return names with dot-notation:
>>> {'parent_column.my_column': 123}Or None if samples could not be fetched:
>>> NoneExpand source code
def get_samples(self) -> dict: df = self.query_table('SELECT * FROM TABLE LIMIT 1') sample_dict = df.to_dict(orient='records')[0] # Return both original and flattened keys to make sure we have both # my_column {a: 1, b: 2} # and # my_column.a # my_column.b return { **sample_dict, **flatten_dict(sample_dict) } def query_table(self, query_string: str) ‑> pandas.core.frame.DataFrame-
Query this dataset in bigquery. In the FROM part simply write TABLE.
>>> SELECT * FROM TABLE instead of >>> SELECT * FROM <some_table_id>Args
query_string- A query for this dataset/table
Expand source code
def query_table(self, query_string: str) -> pd.DataFrame: """Query this dataset in bigquery. In the FROM part simply write TABLE. >>> SELECT * FROM TABLE instead of >>> SELECT * FROM <some_table_id> Args: query_string: A query for this dataset/table """ query_string = query_string.replace('FROM TABLE', f'FROM {self.table_id}') return self.client.query(query_string).to_dataframe() def write_json(self, json_rows: Sequence[dict]) ‑> None-
Expand source code
def write_json(self, json_rows: Sequence[dict]) -> None: self.client.insert_rows_json(self.table_id, json_rows)