forked from launchbadge/sqlx
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransaction.rs
More file actions
110 lines (90 loc) · 3.09 KB
/
transaction.rs
File metadata and controls
110 lines (90 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use futures_core::future::BoxFuture;
use sqlx_core::database::Database;
use std::borrow::Cow;
use crate::error::Error;
use crate::executor::Executor;
use crate::{PgConnection, Postgres};
pub(crate) use sqlx_core::transaction::*;
/// Implementation of [`TransactionManager`] for PostgreSQL.
pub struct PgTransactionManager;
impl TransactionManager for PgTransactionManager {
type Database = Postgres;
fn begin<'conn>(
conn: &'conn mut PgConnection,
statement: Option<Cow<'static, str>>,
) -> BoxFuture<'conn, Result<(), Error>> {
Box::pin(async move {
let depth = conn.inner.transaction_depth;
let statement = match statement {
// custom `BEGIN` statements are not allowed if we're already in
// a transaction (we need to issue a `SAVEPOINT` instead)
Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
Some(statement) => statement,
None => begin_ansi_transaction_sql(depth),
};
let rollback = Rollback::new(conn);
rollback.conn.queue_simple_query(&statement)?;
rollback.conn.wait_until_ready().await?;
if !rollback.conn.in_transaction() {
return Err(Error::BeginFailed);
}
rollback.conn.inner.transaction_depth += 1;
rollback.defuse();
Ok(())
})
}
fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
if conn.inner.transaction_depth > 0 {
conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth))
.await?;
conn.inner.transaction_depth -= 1;
}
Ok(())
})
}
fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
if conn.inner.transaction_depth > 0 {
conn.execute(&*rollback_ansi_transaction_sql(
conn.inner.transaction_depth,
))
.await?;
conn.inner.transaction_depth -= 1;
}
Ok(())
})
}
fn start_rollback(conn: &mut PgConnection) {
if conn.inner.transaction_depth > 0 {
conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.inner.transaction_depth))
.expect("BUG: Rollback query somehow too large for protocol");
conn.inner.transaction_depth -= 1;
}
}
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
conn.inner.transaction_depth
}
}
struct Rollback<'c> {
conn: &'c mut PgConnection,
defuse: bool,
}
impl Drop for Rollback<'_> {
fn drop(&mut self) {
if !self.defuse {
PgTransactionManager::start_rollback(self.conn)
}
}
}
impl<'c> Rollback<'c> {
fn new(conn: &'c mut PgConnection) -> Self {
Self {
conn,
defuse: false,
}
}
fn defuse(mut self) {
self.defuse = true;
}
}