diff --git a/.github/workflows/integration-tests-against-emulator.yaml b/.github/workflows/integration-tests-against-emulator.yaml index 19f49c5e4b..d74aa0fa00 100644 --- a/.github/workflows/integration-tests-against-emulator.yaml +++ b/.github/workflows/integration-tests-against-emulator.yaml @@ -10,7 +10,7 @@ jobs: services: emulator: - image: gcr.io/cloud-spanner-emulator/emulator:1.5.37 + image: gcr.io/cloud-spanner-emulator/emulator ports: - 9010:9010 - 9020:9020 diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 295222022b..5633cd4486 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -133,7 +133,11 @@ def _restart_on_unavailable( # Update the transaction from the response. if transaction is not None: transaction._update_for_result_set_pb(item) - if item.precommit_token is not None and transaction is not None: + if ( + item._pb is not None + and item._pb.HasField("precommit_token") + and transaction is not None + ): transaction._update_for_precommit_token_pb(item.precommit_token) if item.resume_token: @@ -1029,7 +1033,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None: if self._transaction_id is None and transaction_pb.id: self._transaction_id = transaction_pb.id - if transaction_pb.precommit_token: + if transaction_pb._pb.HasField("precommit_token"): self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token) def _update_for_precommit_token_pb( diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 314c5d13a4..5db809f91c 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -328,14 +328,20 @@ def before_next_retry(nth_retry, delay_in_seconds): # successfully commit, and must be retried with the new precommit token. # The mutations should not be included in the new request, and no further # retries or exception handling should be performed. - if commit_response_pb.precommit_token: + if commit_response_pb._pb.HasField("precommit_token"): add_span_event(span, commit_retry_event_name) + nth_request = database._next_nth_request commit_response_pb = api.commit( request=CommitRequest( precommit_token=commit_response_pb.precommit_token, **common_commit_request_args, ), - metadata=metadata, + metadata=database.metadata_with_request_id( + nth_request, + 1, + metadata, + span, + ), ) add_span_event(span, "Commit Done") @@ -521,7 +527,7 @@ def wrapped_method(*args, **kwargs): if is_inline_begin: self._lock.release() - if result_set_pb.precommit_token is not None: + if result_set_pb._pb.HasField("precommit_token"): self._update_for_precommit_token_pb(result_set_pb.precommit_token) return result_set_pb.stats.row_count_exact diff --git a/tests/system/test_database_api.py b/tests/system/test_database_api.py index 57ce49c8a2..e3c18ece10 100644 --- a/tests/system/test_database_api.py +++ b/tests/system/test_database_api.py @@ -569,7 +569,10 @@ def test_db_run_in_transaction_then_snapshot_execute_sql(shared_database): batch.delete(sd.TABLE, sd.ALL) def _unit_of_work(transaction, test): - rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) + # TODO: Remove query and execute a read instead when the Emulator has been fixed + # and returns pre-commit tokens for streaming read results. + rows = list(transaction.execute_sql(sd.SQL)) + # rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) assert rows == [] transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) @@ -882,7 +885,10 @@ def test_db_run_in_transaction_w_max_commit_delay(shared_database): batch.delete(sd.TABLE, sd.ALL) def _unit_of_work(transaction, test): - rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) + # TODO: Remove query and execute a read instead when the Emulator has been fixed + # and returns pre-commit tokens for streaming read results. + rows = list(transaction.execute_sql(sd.SQL)) + # rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) assert rows == [] transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 4da4e2e0d1..04d8ad799a 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -932,6 +932,8 @@ def _transaction_read_then_raise(transaction): def test_transaction_read_and_insert_or_update_then_commit( sessions_database, sessions_to_delete, + # TODO: Re-enable when the emulator returns pre-commit tokens for reads. + not_emulator, ): # [START spanner_test_dml_read_your_writes] sd = _sample_data @@ -1586,7 +1588,11 @@ def _read_w_concurrent_update(transaction, pkey): transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[pkey, value + 1]]) -def test_transaction_read_w_concurrent_updates(sessions_database): +def test_transaction_read_w_concurrent_updates( + sessions_database, + # TODO: Re-enable when the Emulator returns pre-commit tokens for streaming reads. + not_emulator, +): pkey = "read_w_concurrent_updates" _transaction_concurrency_helper(sessions_database, _read_w_concurrent_update, pkey) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index e7cfce3761..5e60d71bd6 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -158,6 +158,7 @@ def _make_item(self, value, resume_token=b"", metadata=None): resume_token=resume_token, metadata=metadata, precommit_token=None, + _pb=None, spec=["value", "resume_token", "metadata", "precommit_token"], ) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 05bb25de6b..7a33372dae 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -533,7 +533,7 @@ def _commit_helper( ) commit.assert_any_call( request=expected_retry_request, - metadata=base_metadata, + metadata=expected_retry_metadata, ) if not HAS_OPENTELEMETRY_INSTALLED: