diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 16da677ef5..0b6fd18561 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -16,6 +16,7 @@ import json from typing import Optional +import warnings from google.cloud import bigquery @@ -24,9 +25,11 @@ def to_bigtable( query: str, + *, instance: str, table: str, - bq_client: Optional[bigquery.Client] = None, + service_account_email: Optional[str] = None, + session: Optional[bigframes.Session] = None, app_profile: Optional[str] = None, truncate: bool = False, overwrite: bool = False, @@ -53,10 +56,15 @@ def to_bigtable( The name of the bigtable instance to export to. table (str): The name of the bigtable table to export to. - bq_client (str, default None): - The Client object to use for the query. This determines + service_account_email (str): + Full name of the service account to run the continuous query. + Example: accountname@projectname.gserviceaccounts.com + If not provided, the user account will be used, but this + limits the lifetime of the continuous query. + session (bigframes.Session, default None): + The session object to use for the query. This determines the project id and location of the query. If None, will - default to the bigframes global session default client. + default to the bigframes global session. app_profile (str, default None): The bigtable app profile to export to. If None, no app profile will be used. @@ -90,9 +98,16 @@ def to_bigtable( For example, the job can be cancelled or its error status can be examined. """ + warnings.warn( + "The bigframes.streaming module is a preview feature, and subject to change.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + # get default client if not passed - if bq_client is None: - bq_client = bigframes.get_global_session().bqclient + if session is None: + session = bigframes.get_global_session() + bq_client = session.bqclient # build export string from parameters project = bq_client.project @@ -123,7 +138,117 @@ def to_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + + job_config_dict: dict = {"query": {"continuous": True}} + if service_account_email is not None: + job_config_dict["query"]["connectionProperties"] = { + "key": "service_account", + "value": service_account_email, + } + job_config_filled = job_config.from_api_repr(job_config_dict) + job_config_filled.labels = {"bigframes-api": "streaming_to_bigtable"} + + # begin the query job + query_job = bq_client.query( + sql, + job_config=job_config_filled, # type:ignore + # typing error above is in bq client library + # (should accept abstract job_config, only takes concrete) + job_id=job_id, + job_id_prefix=job_id_prefix, + ) + + # return the query job to the user for lifetime management + return query_job + + +def to_pubsub( + query: str, + *, + topic: str, + service_account_email: str, + session: Optional[bigframes.Session] = None, + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, +) -> bigquery.QueryJob: + """Launches a BigQuery continuous query and returns a + QueryJob object for some management functionality. + + This method requires an existing pubsub topic. For instructions + on creating a pubsub topic, see + https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en + + Note that a service account is a requirement for continuous queries + exporting to pubsub. + + Args: + query (str): + The sql statement to execute as a continuous function. + For example: "SELECT * FROM dataset.table" + This will be wrapped in an EXPORT DATA statement to + launch a continuous query writing to pubsub. + topic (str): + The name of the pubsub topic to export to. + For example: "taxi-rides" + service_account_email (str): + Full name of the service account to run the continuous query. + Example: accountname@projectname.gserviceaccounts.com + session (bigframes.Session, default None): + The session object to use for the query. This determines + the project id and location of the query. If None, will + default to the bigframes global session. + job_id (str, default None): + If specified, replace the default job id for the query, + see job_id parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + job_id_prefix (str, default None): + If specified, a job id prefix for the query, see + job_id_prefix parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + + Returns: + google.cloud.bigquery.QueryJob: + See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob + The ongoing query job can be managed using this object. + For example, the job can be cancelled or its error status + can be examined. + """ + warnings.warn( + "The bigframes.streaming module is a preview feature, and subject to change.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + + # get default client if not passed + if session is None: + session = bigframes.get_global_session() + bq_client = session.bqclient + + # build export string from parameters + sql = ( + "EXPORT DATA\n" + "OPTIONS (\n" + "format = 'CLOUD_PUBSUB',\n" + f'uri = "https://pubsub.googleapis.com/projects/{bq_client.project}/topics/{topic}"\n' + ")\n" + "AS (\n" + f"{query});" + ) + + # override continuous http parameter + job_config = bigquery.job.QueryJobConfig() + job_config_filled = job_config.from_api_repr( + { + "query": { + "continuous": True, + "connectionProperties": { + "key": "service_account", + "value": service_account_email, + }, + } + } + ) + job_config_filled.labels = {"bigframes-api": "streaming_to_pubsub"} # begin the query job query_job = bq_client.query( diff --git a/scripts/create_bigtable.py b/scripts/create_bigtable.py index 655e4b31ab..f81bb8a013 100644 --- a/scripts/create_bigtable.py +++ b/scripts/create_bigtable.py @@ -16,13 +16,10 @@ # bigframes.streaming testing if they don't already exist import os -import pathlib import sys import google.cloud.bigtable as bigtable -REPO_ROOT = pathlib.Path(__file__).parent.parent - PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") if not PROJECT_ID: diff --git a/scripts/create_pubsub.py b/scripts/create_pubsub.py new file mode 100644 index 0000000000..5d25398983 --- /dev/null +++ b/scripts/create_pubsub.py @@ -0,0 +1,49 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script create the bigtable resources required for +# bigframes.streaming testing if they don't already exist + +import os +import sys + +from google.cloud import pubsub_v1 + +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") + +if not PROJECT_ID: + print( + "Please set GOOGLE_CLOUD_PROJECT environment variable before running.", + file=sys.stderr, + ) + sys.exit(1) + + +def create_topic(topic_id): + # based on + # https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(PROJECT_ID, topic_id) + + topic = publisher.create_topic(request={"name": topic_path}) + print(f"Created topic: {topic.name}") + + +def main(): + create_topic("penguins") + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index dbd9ce5fc2..79baf1fb23 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", "google-cloud-bigtable >=2.24.0", + "google-cloud-pubsub >=2.21.4", "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index bbd7bf0069..5a76698576 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,6 +5,7 @@ gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 google-cloud-bigtable==2.24.0 +google-cloud-pubsub==2.21.4 google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 48db61e5bf..c125fde15a 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -22,11 +22,12 @@ def test_streaming_to_bigtable(): job_id_prefix = "test_streaming_" sql = """SELECT body_mass_g, island as rowkey - FROM birds.penguins""" + FROM birds.penguins_bigtable_streaming""" query_job = bigframes.streaming.to_bigtable( sql, - "streaming-testing-instance", - "table-testing", + instance="streaming-testing-instance", + table="table-testing", + service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", app_profile=None, truncate=True, overwrite=True, @@ -46,3 +47,29 @@ def test_streaming_to_bigtable(): assert str(query_job.job_id).startswith(job_id_prefix) finally: query_job.cancel() + + +def test_streaming_to_pubsub(): + # launch a continuous query + job_id_prefix = "test_streaming_pubsub_" + sql = """SELECT + island + FROM birds.penguins_pubsub_streaming""" + query_job = bigframes.streaming.to_pubsub( + sql, + topic="penguins", + service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", + job_id=None, + job_id_prefix=job_id_prefix, + ) + + try: + # wait 100 seconds in order to ensure the query doesn't stop + # (i.e. it is continuous) + time.sleep(100) + assert query_job.error_result is None + assert query_job.errors is None + assert query_job.running() + assert str(query_job.job_id).startswith(job_id_prefix) + finally: + query_job.cancel()