Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:

services:
emulator:
image: gcr.io/cloud-spanner-emulator/emulator:1.5.37
image: gcr.io/cloud-spanner-emulator/emulator
ports:
- 9010:9010
- 9020:9020
Expand Down
8 changes: 6 additions & 2 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ def _restart_on_unavailable(
# Update the transaction from the response.
if transaction is not None:
transaction._update_for_result_set_pb(item)
if item.precommit_token is not None and transaction is not None:
if (
item._pb is not None
and item._pb.HasField("precommit_token")
and transaction is not None
):
transaction._update_for_precommit_token_pb(item.precommit_token)

if item.resume_token:
Expand Down Expand Up @@ -1029,7 +1033,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
if self._transaction_id is None and transaction_pb.id:
self._transaction_id = transaction_pb.id

if transaction_pb.precommit_token:
if transaction_pb._pb.HasField("precommit_token"):
self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token)

def _update_for_precommit_token_pb(
Expand Down
12 changes: 9 additions & 3 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,20 @@ def before_next_retry(nth_retry, delay_in_seconds):
# successfully commit, and must be retried with the new precommit token.
# The mutations should not be included in the new request, and no further
# retries or exception handling should be performed.
if commit_response_pb.precommit_token:
if commit_response_pb._pb.HasField("precommit_token"):
add_span_event(span, commit_retry_event_name)
nth_request = database._next_nth_request
commit_response_pb = api.commit(
request=CommitRequest(
precommit_token=commit_response_pb.precommit_token,
**common_commit_request_args,
),
metadata=metadata,
metadata=database.metadata_with_request_id(
nth_request,
1,
metadata,
span,
),
)

add_span_event(span, "Commit Done")
Expand Down Expand Up @@ -521,7 +527,7 @@ def wrapped_method(*args, **kwargs):
if is_inline_begin:
self._lock.release()

if result_set_pb.precommit_token is not None:
if result_set_pb._pb.HasField("precommit_token"):
self._update_for_precommit_token_pb(result_set_pb.precommit_token)

return result_set_pb.stats.row_count_exact
Expand Down
10 changes: 8 additions & 2 deletions tests/system/test_database_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,10 @@ def test_db_run_in_transaction_then_snapshot_execute_sql(shared_database):
batch.delete(sd.TABLE, sd.ALL)

def _unit_of_work(transaction, test):
rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
# TODO: Remove query and execute a read instead when the Emulator has been fixed
# and returns pre-commit tokens for streaming read results.
rows = list(transaction.execute_sql(sd.SQL))
# rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
assert rows == []

transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA)
Expand Down Expand Up @@ -882,7 +885,10 @@ def test_db_run_in_transaction_w_max_commit_delay(shared_database):
batch.delete(sd.TABLE, sd.ALL)

def _unit_of_work(transaction, test):
rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
# TODO: Remove query and execute a read instead when the Emulator has been fixed
# and returns pre-commit tokens for streaming read results.
rows = list(transaction.execute_sql(sd.SQL))
# rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
assert rows == []

transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA)
Expand Down
8 changes: 7 additions & 1 deletion tests/system/test_session_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,8 @@ def _transaction_read_then_raise(transaction):
def test_transaction_read_and_insert_or_update_then_commit(
sessions_database,
sessions_to_delete,
# TODO: Re-enable when the emulator returns pre-commit tokens for reads.
not_emulator,
):
# [START spanner_test_dml_read_your_writes]
sd = _sample_data
Expand Down Expand Up @@ -1586,7 +1588,11 @@ def _read_w_concurrent_update(transaction, pkey):
transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[pkey, value + 1]])


def test_transaction_read_w_concurrent_updates(sessions_database):
def test_transaction_read_w_concurrent_updates(
sessions_database,
# TODO: Re-enable when the Emulator returns pre-commit tokens for streaming reads.
not_emulator,
):
pkey = "read_w_concurrent_updates"
_transaction_concurrency_helper(sessions_database, _read_w_concurrent_update, pkey)

Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def _make_item(self, value, resume_token=b"", metadata=None):
resume_token=resume_token,
metadata=metadata,
precommit_token=None,
_pb=None,
spec=["value", "resume_token", "metadata", "precommit_token"],
)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def _commit_helper(
)
commit.assert_any_call(
request=expected_retry_request,
metadata=base_metadata,
metadata=expected_retry_metadata,
)

if not HAS_OPENTELEMETRY_INSTALLED:
Expand Down