Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Lazy creation of session dataset
With this change BigFrames will not create a dataset upfront at the time
of session creation, but instead leave it to the components which need the dataset to create it.
  • Loading branch information
shobsi committed Oct 6, 2023
commit 2bcf14432ef54f19ddafd664a0cf7b5518b99094
37 changes: 26 additions & 11 deletions bigframes/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,22 @@ def create_bq_remote_function(
OPTIONS (
endpoint = "{endpoint}"
)"""

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO: Use session._start_query() so we get progress bar
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
query_job.result() # Wait for the job to complete.

logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
Expand Down Expand Up @@ -465,17 +477,20 @@ def get_remote_function_specs(self, remote_function_name):
routines = self._bq_client.list_routines(
f"{self._gcp_project_id}.{self._bq_dataset}"
)
for routine in routines:
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
try:
for routine in routines:
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
except google.api_core.exceptions.NotFound:
pass
return (http_endpoint, bq_connection)


Expand Down
7 changes: 1 addition & 6 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,12 @@ def _create_and_bind_bq_session(self):
]
)

# Dataset for storing BQML models and remote functions, which don't yet
# Dataset for storing remote functions, which don't yet
# support proper session temporary storage yet
self._session_dataset = bigquery.Dataset(
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
)
self._session_dataset.location = self._location
self._session_dataset.default_table_expiration_ms = 24 * 60 * 60 * 1000

# TODO: handle case when the dataset does not exist and the user does
# not have permission to create one (bigquery.datasets.create IAM)
self.bqclient.create_dataset(self._session_dataset, exists_ok=True)

def close(self):
"""Terminated the BQ session, otherwises the session will be terminated automatically after
Expand Down
5 changes: 0 additions & 5 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,11 +894,6 @@ def test_session_id(session):
# TODO(chelsealin): Verify the session id can be binded with a load job.


def test_session_dataset_exists_and_configured(session: bigframes.Session):
dataset = session.bqclient.get_dataset(session._session_dataset_id)
assert dataset.default_table_expiration_ms == 24 * 60 * 60 * 1000


@pytest.mark.flaky(retries=2)
def test_to_close_session():
session = bigframes.Session()
Expand Down