diff options
Diffstat (limited to 'src/user/transactions.rs')
| -rw-r--r-- | src/user/transactions.rs | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/src/user/transactions.rs b/src/user/transactions.rs new file mode 100644 index 0000000..197b2ec --- /dev/null +++ b/src/user/transactions.rs @@ -0,0 +1,175 @@ +use actix::{Handler, Message}; +use chrono::NaiveDateTime; +use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, RunQueryDsl}; +use diesel::{Connection, MysqlConnection}; + +use crate::error::ServiceError; +use crate::model::{DbExecutor, NewTransaction, User}; +use crate::schema::transactions::{columns as transaction_columns, dsl::transactions}; +use crate::schema::users::{columns as user_columns, dsl::users}; + +#[derive(Debug, Clone, Queryable)] +struct TempTransaction { + pub id: u64, + pub date: NaiveDateTime, + pub sender: String, + pub receiver: u64, + pub amount: u64, + pub sender_balance: u64, + pub receiver_balance: u64, + pub purpose: String, +} + +#[derive(Debug, Clone)] +pub struct Transaction { + pub id: u64, + pub date: NaiveDateTime, + pub sender: String, + pub receiver: String, + pub amount: u64, + pub sender_balance: u64, + pub receiver_balance: u64, + pub purpose: String, +} + +pub struct TransactionsRequest { + id: u64, +} + +impl TransactionsRequest { + pub fn new(id: u64) -> Self { + Self { id } + } +} + +impl Message for TransactionsRequest { + type Result = Result<Vec<Transaction>, ServiceError>; +} + +impl Handler<TransactionsRequest> for DbExecutor { + type Result = Result<Vec<Transaction>, ServiceError>; + + fn handle(&mut self, req: TransactionsRequest, _: &mut Self::Context) -> Self::Result { + let conn: &MysqlConnection = &self.0.get().unwrap(); + + use transaction_columns as t_cols; + let temp = transactions + .filter( + transaction_columns::sender + .eq(req.id) + .or(transaction_columns::receiver.eq(req.id)), + ) + .inner_join(users.on(transaction_columns::sender.eq(user_columns::id))) + .select(( + t_cols::id, + t_cols::date, + user_columns::name, + t_cols::receiver, + t_cols::amount, + t_cols::sender_balance, + t_cols::receiver_balance, + t_cols::purpose, + )) + .order(transaction_columns::date.desc()) + .load::<TempTransaction>(conn) + .map_err(|_| ServiceError::NotFound)?; + + let recv_ids = temp.iter().map(|t| t.receiver).collect::<Vec<_>>(); + + let receivers = users + .select((user_columns::id, user_columns::name)) + .filter(user_columns::id.eq_any(recv_ids)) + .load::<(u64, String)>(conn)?; + + let mut output = Vec::with_capacity(temp.len()); + for t in temp { + let (_, name) = receivers + .iter() + .find(|(id, _)| id == &t.receiver) + .expect("oh no"); + + let t = Transaction { + id: t.id, + date: t.date, + sender: t.sender, + receiver: name.to_owned(), + amount: t.amount, + sender_balance: t.sender_balance, + receiver_balance: t.receiver_balance, + purpose: t.purpose, + }; + output.push(t); + } + + Ok(output) + } +} + +pub struct TransferRequest { + pub from: String, + pub to: String, + pub amount: u64, + pub purpose: String, +} + +impl Message for TransferRequest { + type Result = Result<(), ServiceError>; +} + +impl Handler<TransferRequest> for DbExecutor { + type Result = Result<(), ServiceError>; + + fn handle(&mut self, req: TransferRequest, _: &mut Self::Context) -> Self::Result { + let conn: &MysqlConnection = &self.0.get().unwrap(); + + Ok(conn.transaction::<_, diesel::result::Error, _>(|| { + // Lock the user record to avoid modification by other threads + users + .filter( + user_columns::name + .eq(&req.from) + .or(user_columns::name.eq(&req.to)), + ) + .for_update() + .execute(conn)?; + + let sender = users + .filter(user_columns::name.eq(&req.from)) + .first::<User>(conn)?; + + let new_sender_balance = sender + .balance + .checked_sub(req.amount) + .ok_or(diesel::result::Error::RollbackTransaction)?; + + diesel::update(users.find(sender.id)) + .set(user_columns::balance.eq(new_sender_balance)) + .execute(conn)?; + + let receiver = users + .filter(user_columns::name.eq(&req.to)) + .first::<User>(conn)?; + + let new_receiver_balance = receiver + .balance + .checked_add(req.amount) + .ok_or(diesel::result::Error::RollbackTransaction)?; + + diesel::update(users.find(receiver.id)) + .set(user_columns::balance.eq(new_receiver_balance)) + .execute(conn)?; + + let t = NewTransaction { + sender: sender.id, + receiver: receiver.id, + amount: req.amount, + sender_balance: new_sender_balance, + receiver_balance: new_receiver_balance, + purpose: &req.purpose, + }; + diesel::insert_into(transactions).values(&t).execute(conn)?; + + Ok(()) + })?) + } +} |
