Skip to content

Commit

Permalink
driver-adapters: close tx on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Sep 26, 2023
1 parent 4bf3cce commit 895e2fe
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 19 deletions.
8 changes: 8 additions & 0 deletions query-engine/driver-adapters/js/adapter-libsql/src/libsql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ class LibSqlTransaction extends LibSqlQueryable<TransactionClient> implements Tr

return ok(undefined)
}

discard(): Result<void> {
if (!this.finished) {
this.finished = true
this.rollback().catch(console.error)
}
return ok(undefined)
}
}

export class PrismaLibSQL extends LibSqlQueryable<StdClient> implements DriverAdapter {
Expand Down
10 changes: 9 additions & 1 deletion query-engine/driver-adapters/js/adapter-neon/src/neon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class NeonWsQueryable<ClientT extends neon.Pool | neon.PoolClient> extends NeonQ
}

class NeonTransaction extends NeonWsQueryable<neon.PoolClient> implements Transaction {
constructor(client: neon.PoolClient, readonly options: TransactionOptions) {
constructor(
client: neon.PoolClient,
readonly options: TransactionOptions,
) {
super(client)
}

Expand All @@ -97,6 +100,11 @@ class NeonTransaction extends NeonWsQueryable<neon.PoolClient> implements Transa
this.client.release()
return Promise.resolve(ok(undefined))
}

discard(): Result<void> {
this.client.release()
return ok(undefined)
}
}

export class PrismaNeon extends NeonWsQueryable<neon.Pool> implements DriverAdapter {
Expand Down
5 changes: 5 additions & 0 deletions query-engine/driver-adapters/js/adapter-pg/src/pg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class PgTransaction extends PgQueryable<TransactionClient> implements Transactio
this.client.release()
return ok(undefined)
}

discard(): Result<void> {
this.client.release()
return ok(undefined)
}
}

export class PrismaPg extends PgQueryable<StdClient> implements DriverAdapter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class PlanetScaleQueryable<ClientT extends planetScale.Connection | planetScale.
}

class PlanetScaleTransaction extends PlanetScaleQueryable<planetScale.Transaction> implements Transaction {
finished = false

constructor(
tx: planetScale.Transaction,
readonly options: TransactionOptions,
Expand All @@ -96,16 +98,25 @@ class PlanetScaleTransaction extends PlanetScaleQueryable<planetScale.Transactio
async commit(): Promise<Result<void>> {
debug(`[js::commit]`)

this.finished = true
this.txDeferred.resolve()
return Promise.resolve(ok(await this.txResultPromise))
}

async rollback(): Promise<Result<void>> {
debug(`[js::rollback]`)

this.finished = true
this.txDeferred.reject(new RollbackError())
return Promise.resolve(ok(await this.txResultPromise))
}

discard(): Result<void> {
if (!this.finished) {
this.rollback().catch(console.error)
}
return ok(undefined)
}
}

export class PrismaPlanetScale extends PlanetScaleQueryable<planetScale.Connection> implements DriverAdapter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@prisma/driver-adapter-utils",
"version": "0.4.0",
"version": "0.5.0-pre.1",
"description": "Internal set of utilities and types for Prisma's driver adapters.",
"main": "dist/index.js",
"module": "dist/index.mjs",
Expand Down
15 changes: 15 additions & 0 deletions query-engine/driver-adapters/js/driver-adapter-utils/src/binder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const bindTransaction = (errorRegistry: ErrorRegistryInternal, transaction: Tran
executeRaw: wrapAsync(errorRegistry, transaction.executeRaw.bind(transaction)),
commit: wrapAsync(errorRegistry, transaction.commit.bind(transaction)),
rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction)),
discard: wrapSync(errorRegistry, transaction.discard.bind(transaction)),
}
}

Expand All @@ -63,3 +64,17 @@ function wrapAsync<A extends unknown[], R>(
}
}
}

function wrapSync<A extends unknown[], R>(
registry: ErrorRegistryInternal,
fn: (...args: A) => Result<R>,
): (...args: A) => Result<R> {
return (...args) => {
try {
return fn(...args)
} catch (error) {
const id = registry.registerNewError(error)
return err({ kind: 'GenericJsError', id })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ export interface Transaction extends Queryable {
* Rolls back the transaction.
*/
rollback(): Promise<Result<void>>
/**
* Discards and closes the transaction which may or may not have been committed or rolled back.
* This operation must be synchronous. If the implementation requires calling creating new
* asynchronous tasks on the event loop, the driver is responsible for handling the errors
* appropriately to ensure they don't crash the application.
*/
discard(): Result<void>
}

export interface ErrorCapturingDriverAdapter extends DriverAdapter {
Expand Down
30 changes: 15 additions & 15 deletions query-engine/driver-adapters/js/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions query-engine/driver-adapters/js/smoke-test-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"@prisma/adapter-neon": "workspace:*",
"@prisma/adapter-pg": "workspace:*",
"@prisma/adapter-planetscale": "workspace:*",
"@prisma/client": "5.4.0-integration-libsql-adapter.7",
"@prisma/client": "5.4.0-integration-dispose-tx.1",
"@prisma/driver-adapter-utils": "workspace:*",
"pg": "^8.11.3",
"superjson": "^1.13.1",
Expand All @@ -61,7 +61,7 @@
"@types/node": "^20.5.1",
"@types/pg": "^8.10.2",
"cross-env": "^7.0.3",
"prisma": "5.4.0-integration-libsql-adapter.7",
"prisma": "5.4.0-integration-dispose-tx.1",
"tsx": "^3.12.7"
}
}
20 changes: 20 additions & 0 deletions query-engine/driver-adapters/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::async_js_function::AsyncJsFunction;
use crate::conversion::JSArg;
use crate::transaction::JsTransaction;
use napi::bindgen_prelude::{FromNapiValue, ToNapiValue};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction};
use napi::{JsObject, JsString};
use napi_derive::napi;
use quaint::connector::ResultSet as QuaintResultSet;
Expand Down Expand Up @@ -46,6 +47,9 @@ pub(crate) struct TransactionProxy {

/// rollback transaction
rollback: AsyncJsFunction<(), ()>,

/// discard transaction
discard: ThreadsafeFunction<(), ErrorStrategy::Fatal>,
}

/// This result set is more convenient to be manipulated from both Rust and NodeJS.
Expand Down Expand Up @@ -387,11 +391,13 @@ impl TransactionProxy {
pub fn new(js_transaction: &JsObject) -> napi::Result<Self> {
let commit = js_transaction.get_named_property("commit")?;
let rollback = js_transaction.get_named_property("rollback")?;
let discard = js_transaction.get_named_property("discard")?;
let options = js_transaction.get_named_property("options")?;

Ok(Self {
commit,
rollback,
discard,
options,
})
}
Expand All @@ -403,9 +409,23 @@ impl TransactionProxy {
pub async fn commit(&self) -> quaint::Result<()> {
self.commit.call(()).await
}

pub async fn rollback(&self) -> quaint::Result<()> {
self.rollback.call(()).await
}

pub fn discard(&self) -> quaint::Result<()> {
match self
.discard
.call((), napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking)
{
napi::Status::Ok => Ok(()),
err => Err(quaint::error::Error::raw_connector_error(
err.to_string(),
"error in TransactionProxy::discard".to_owned(),
)),
}
}
}

/// Coerce a `f64` to a `f32`, asserting that the conversion is lossless.
Expand Down
6 changes: 6 additions & 0 deletions query-engine/driver-adapters/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ impl JsTransaction {
}
}

impl Drop for JsTransaction {
fn drop(&mut self) {
_ = self.tx_proxy.discard();
}
}

#[async_trait]
impl QuaintTransaction for JsTransaction {
async fn commit(&self) -> quaint::Result<()> {
Expand Down

0 comments on commit 895e2fe

Please sign in to comment.