From c9894e1223f19c57b5058c664ebd1b1501076935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 13 Aug 2025 16:34:24 +0200 Subject: [PATCH] chore: make precommit token check emulator-proof The Emulator returns an empty pre-commit token when a commit is attempted without a pre-commit token. This is different from not returning any pre-commit token at all. The check for 'did the Commit return a pre-commit token?' did not take this into account, which caused commits on the Emulator that needed to be retried, not to be retried. This again caused multiple test errors when running on the Emulator, as this would keep a transaction present on the test database on the Emulator, and the Emulator only supports one transaction at a time. These test failures went unnoticed, because the test configuration for the Emulator had pinned the Emulator version to 1.5.37, which did not support multiplexed sessions. This again caused the tests to fall back to using regular sessions. This change fixes the check for whether a pre-commit token was returned by a Commit. It also unpins the Emulator version for the system tests using default settings. This ensures that the tests actually use multiplexed sessions. --- .../integration-tests-against-emulator.yaml | 2 +- google/cloud/spanner_v1/snapshot.py | 8 ++++++-- google/cloud/spanner_v1/transaction.py | 12 +++++++++--- tests/system/test_database_api.py | 10 ++++++++-- tests/system/test_session_api.py | 8 +++++++- tests/unit/test_snapshot.py | 1 + tests/unit/test_transaction.py | 2 +- 7 files changed, 33 insertions(+), 10 deletions(-) diff --git a/.github/workflows/integration-tests-against-emulator.yaml b/.github/workflows/integration-tests-against-emulator.yaml index 19f49c5e4b..d74aa0fa00 100644 --- a/.github/workflows/integration-tests-against-emulator.yaml +++ b/.github/workflows/integration-tests-against-emulator.yaml @@ -10,7 +10,7 @@ jobs: services: emulator: - image: gcr.io/cloud-spanner-emulator/emulator:1.5.37 + image: gcr.io/cloud-spanner-emulator/emulator ports: - 9010:9010 - 9020:9020 diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 295222022b..5633cd4486 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -133,7 +133,11 @@ def _restart_on_unavailable( # Update the transaction from the response. if transaction is not None: transaction._update_for_result_set_pb(item) - if item.precommit_token is not None and transaction is not None: + if ( + item._pb is not None + and item._pb.HasField("precommit_token") + and transaction is not None + ): transaction._update_for_precommit_token_pb(item.precommit_token) if item.resume_token: @@ -1029,7 +1033,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None: if self._transaction_id is None and transaction_pb.id: self._transaction_id = transaction_pb.id - if transaction_pb.precommit_token: + if transaction_pb._pb.HasField("precommit_token"): self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token) def _update_for_precommit_token_pb( diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 314c5d13a4..5db809f91c 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -328,14 +328,20 @@ def before_next_retry(nth_retry, delay_in_seconds): # successfully commit, and must be retried with the new precommit token. # The mutations should not be included in the new request, and no further # retries or exception handling should be performed. - if commit_response_pb.precommit_token: + if commit_response_pb._pb.HasField("precommit_token"): add_span_event(span, commit_retry_event_name) + nth_request = database._next_nth_request commit_response_pb = api.commit( request=CommitRequest( precommit_token=commit_response_pb.precommit_token, **common_commit_request_args, ), - metadata=metadata, + metadata=database.metadata_with_request_id( + nth_request, + 1, + metadata, + span, + ), ) add_span_event(span, "Commit Done") @@ -521,7 +527,7 @@ def wrapped_method(*args, **kwargs): if is_inline_begin: self._lock.release() - if result_set_pb.precommit_token is not None: + if result_set_pb._pb.HasField("precommit_token"): self._update_for_precommit_token_pb(result_set_pb.precommit_token) return result_set_pb.stats.row_count_exact diff --git a/tests/system/test_database_api.py b/tests/system/test_database_api.py index 57ce49c8a2..e3c18ece10 100644 --- a/tests/system/test_database_api.py +++ b/tests/system/test_database_api.py @@ -569,7 +569,10 @@ def test_db_run_in_transaction_then_snapshot_execute_sql(shared_database): batch.delete(sd.TABLE, sd.ALL) def _unit_of_work(transaction, test): - rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) + # TODO: Remove query and execute a read instead when the Emulator has been fixed + # and returns pre-commit tokens for streaming read results. + rows = list(transaction.execute_sql(sd.SQL)) + # rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) assert rows == [] transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) @@ -882,7 +885,10 @@ def test_db_run_in_transaction_w_max_commit_delay(shared_database): batch.delete(sd.TABLE, sd.ALL) def _unit_of_work(transaction, test): - rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) + # TODO: Remove query and execute a read instead when the Emulator has been fixed + # and returns pre-commit tokens for streaming read results. + rows = list(transaction.execute_sql(sd.SQL)) + # rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) assert rows == [] transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 4da4e2e0d1..04d8ad799a 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -932,6 +932,8 @@ def _transaction_read_then_raise(transaction): def test_transaction_read_and_insert_or_update_then_commit( sessions_database, sessions_to_delete, + # TODO: Re-enable when the emulator returns pre-commit tokens for reads. + not_emulator, ): # [START spanner_test_dml_read_your_writes] sd = _sample_data @@ -1586,7 +1588,11 @@ def _read_w_concurrent_update(transaction, pkey): transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[pkey, value + 1]]) -def test_transaction_read_w_concurrent_updates(sessions_database): +def test_transaction_read_w_concurrent_updates( + sessions_database, + # TODO: Re-enable when the Emulator returns pre-commit tokens for streaming reads. + not_emulator, +): pkey = "read_w_concurrent_updates" _transaction_concurrency_helper(sessions_database, _read_w_concurrent_update, pkey) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index e7cfce3761..5e60d71bd6 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -158,6 +158,7 @@ def _make_item(self, value, resume_token=b"", metadata=None): resume_token=resume_token, metadata=metadata, precommit_token=None, + _pb=None, spec=["value", "resume_token", "metadata", "precommit_token"], ) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 05bb25de6b..7a33372dae 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -533,7 +533,7 @@ def _commit_helper( ) commit.assert_any_call( request=expected_retry_request, - metadata=base_metadata, + metadata=expected_retry_metadata, ) if not HAS_OPENTELEMETRY_INSTALLED: