Skip to content
Merged

185 #192

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ persisted row. `action = "nothing"` keeps the existing row (`DO NOTHING`)
and returns `Option<Entity>` — `None` when a conflicting row already
existed. Requires `returning = "full"` (the default). With `streams`
enabled, upsert publishes a `Created` notification for every row it
returns.
returns. With `transactions`, the `{Entity}TransactionRepo` adapter
gains the same `upsert` so it can share atomicity with adjacent
statements (release-then-upsert login flows and the like).

### Transactions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Context<'_> {
}

/// Assemble the complete upsert SQL string at expansion time.
fn upsert_sql(&self) -> String {
pub(crate) fn upsert_sql(&self) -> String {
let upsert = self
.entity
.upsert
Expand Down
114 changes: 114 additions & 0 deletions crates/entity-derive-impl/src/entity/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,53 @@ fn generate_repo_adapter(entity: &EntityDef) -> TokenStream {
}
};

let upsert_span = instrument(&entity_name_str, "tx.upsert");
let upsert_method = match &entity.upsert {
Some(upsert_def) if !entity.create_fields().is_empty() => {
let sql = ctx.upsert_sql();
match upsert_def.action {
crate::entity::parse::UpsertAction::Update => quote! {
/// Insert or update the conflicting row within the transaction.
///
/// Same semantics as the pool-backed `upsert`, executed on
/// the transaction handle so it can share atomicity with
/// adjacent statements.
#upsert_span
pub async fn upsert(
&mut self,
dto: #create_dto
) -> Result<#entity_name, sqlx::Error> {
let entity = #entity_name::from(dto);
let insertable = #insertable_name::from(&entity);
let row: #row_name = sqlx::query_as(#sql)
#(#bindings)*
.fetch_one(&mut **self.tx).await?;
Ok(#entity_name::from(row))
}
},
crate::entity::parse::UpsertAction::Nothing => quote! {
/// Insert the entity or keep the conflicting row, within
/// the transaction.
///
/// Returns `None` when a conflicting row already existed.
#upsert_span
pub async fn upsert(
&mut self,
dto: #create_dto
) -> Result<Option<#entity_name>, sqlx::Error> {
let entity = #entity_name::from(dto);
let insertable = #insertable_name::from(&entity);
let row: Option<#row_name> = sqlx::query_as(#sql)
#(#bindings)*
.fetch_optional(&mut **self.tx).await?;
Ok(row.map(#entity_name::from))
}
}
}
}
_ => TokenStream::new()
};

let update_span = instrument(&entity_name_str, "tx.update");
let update_method = if entity.update_fields().is_empty() {
TokenStream::new()
Expand Down Expand Up @@ -191,6 +238,8 @@ fn generate_repo_adapter(entity: &EntityDef) -> TokenStream {

#create_method

#upsert_method

/// Find an entity by ID within the transaction.
#find_span
pub async fn find_by_id(
Expand Down Expand Up @@ -500,3 +549,68 @@ mod tests {
assert_eq!(pluralize("manager"), "managers");
}
}

#[cfg(all(test, feature = "postgres", feature = "transactions"))]
mod tx_upsert_tests {
use quote::quote;
use syn::DeriveInput;

use super::*;

fn parse_entity(tokens: proc_macro2::TokenStream) -> EntityDef {
let input: DeriveInput = syn::parse2(tokens).expect("test entity must parse");
EntityDef::from_derive_input(&input).expect("test entity must be valid")
}

#[test]
fn adapter_gains_upsert_with_both_attributes() {
let entity = parse_entity(quote! {
#[entity(table = "users", transactions, upsert(conflict = "email"))]
pub struct User {
#[id]
pub id: uuid::Uuid,
#[field(create, response)]
#[column(unique)]
pub email: String,
#[field(create, update, response)]
pub name: String,
}
});
let code = generate(&entity).to_string();
assert!(code.contains("pub async fn upsert"));
assert!(code.contains("ON CONFLICT (email) DO UPDATE"));
assert!(code.contains("self . tx"));
}

#[test]
fn adapter_upsert_nothing_returns_option() {
let entity = parse_entity(quote! {
#[entity(table = "subs", transactions, upsert(conflict = "email", action = "nothing"))]
pub struct Sub {
#[id]
pub id: uuid::Uuid,
#[field(create, response)]
#[column(unique)]
pub email: String,
}
});
let code = generate(&entity).to_string();
assert!(code.contains("Result < Option < Sub > , sqlx :: Error >"));
assert!(code.contains("fetch_optional"));
}

#[test]
fn adapter_without_upsert_attribute_has_no_method() {
let entity = parse_entity(quote! {
#[entity(table = "users", transactions)]
pub struct User {
#[id]
pub id: uuid::Uuid,
#[field(create, update, response)]
pub name: String,
}
});
let code = generate(&entity).to_string();
assert!(!code.contains("pub async fn upsert"));
}
}
11 changes: 10 additions & 1 deletion crates/entity-derive/tests/cases/pass/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use entity_derive::Entity;
use uuid::Uuid;

#[derive(Entity)]
#[entity(table = "users", upsert(conflict = "email"))]
#[entity(table = "users", transactions, upsert(conflict = "email"))]
pub struct User {
#[id]
pub id: Uuid,
Expand Down Expand Up @@ -55,6 +55,15 @@ fn assert_upsert_signatures() {
takes_nothing_style::<sqlx::PgPool>();
}

async fn exercise_tx_upsert(pool: sqlx::PgPool, dto: CreateUserRequest) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;
let mut repo = UserTransactionRepo::new(&mut tx);
let _user: User = repo.upsert(dto).await?;
tx.commit().await?;
Ok(())
}

fn main() {
assert_upsert_signatures();
let _ = exercise_tx_upsert;
}
11 changes: 11 additions & 0 deletions wiki/Atributos.md
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,14 @@ Declara constraints que la macro no puede inferir — claves foráneas sobre cla
constraint(name = "orders_window_check", kind = "check"),
)]
```

### Upsert transaccional

Con `transactions` y `upsert(...)` habilitados, el adaptador `{Entity}TransactionRepo` expone `upsert` con la misma semántica SQL que el método del pool, ejecutado sobre el handle de la transacción — para flujos donde el upsert debe compartir atomicidad con sentencias adyacentes.

```rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE users SET username = NULL WHERE ...").execute(&mut *tx).await?;
let user = UserTransactionRepo::new(&mut tx).upsert(dto).await?;
tx.commit().await?;
```
11 changes: 11 additions & 0 deletions wiki/Attributes-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,14 @@ Declare constraints the macro cannot infer — foreign keys over natural keys, c
constraint(name = "orders_window_check", kind = "check"),
)]
```

### Transactional upsert

With both `transactions` and `upsert(...)`, the `{Entity}TransactionRepo` adapter exposes `upsert` with the same SQL and action semantics as the pool method, executed on the transaction handle — for flows where the upsert must share atomicity with adjacent statements.

```rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE users SET username = NULL WHERE ...").execute(&mut *tx).await?;
let user = UserTransactionRepo::new(&mut tx).upsert(dto).await?;
tx.commit().await?;
```
11 changes: 11 additions & 0 deletions wiki/Атрибуты.md
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,14 @@ garde::Validate::validate(&dto)?;
constraint(name = "orders_window_check", kind = "check"),
)]
```

### Транзакционный upsert

При включённых `transactions` и `upsert(...)` адаптер `{Entity}TransactionRepo` получает `upsert` с той же SQL-семантикой, что и метод пула, но на хендле транзакции — для потоков, где upsert должен быть атомарен с соседними стейтментами.

```rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE users SET username = NULL WHERE ...").execute(&mut *tx).await?;
let user = UserTransactionRepo::new(&mut tx).upsert(dto).await?;
tx.commit().await?;
```
11 changes: 11 additions & 0 deletions wiki/属性.md
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,14 @@ garde::Validate::validate(&dto)?;
constraint(name = "orders_window_check", kind = "check"),
)]
```

### 事务性 upsert

同时启用 `transactions` 和 `upsert(...)` 时,`{Entity}TransactionRepo` 适配器提供与池方法相同 SQL 语义的 `upsert`,在事务句柄上执行——适用于 upsert 必须与相邻语句共享原子性的流程。

```rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE users SET username = NULL WHERE ...").execute(&mut *tx).await?;
let user = UserTransactionRepo::new(&mut tx).upsert(dto).await?;
tx.commit().await?;
```
11 changes: 11 additions & 0 deletions wiki/속성.md
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,14 @@ garde::Validate::validate(&dto)?;
constraint(name = "orders_window_check", kind = "check"),
)]
```

### 트랜잭션 upsert

`transactions`와 `upsert(...)`가 모두 켜져 있으면 `{Entity}TransactionRepo` 어댑터가 풀 메서드와 동일한 SQL 시맨틱의 `upsert`를 트랜잭션 핸들에서 제공합니다 — upsert가 인접 문장과 원자성을 공유해야 하는 플로우용입니다.

```rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE users SET username = NULL WHERE ...").execute(&mut *tx).await?;
let user = UserTransactionRepo::new(&mut tx).upsert(dto).await?;
tx.commit().await?;
```
Loading