use actix::{Handler, Message}; use chrono::NaiveDateTime; use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, RunQueryDsl}; use diesel::{Connection, MysqlConnection}; use crate::error::ServiceError; use crate::model::{DbExecutor, NewTransaction, User}; use crate::schema::transactions::{columns as transaction_columns, dsl::transactions}; use crate::schema::users::{columns as user_columns, dsl::users}; #[derive(Debug, Clone, Queryable)] struct TempTransaction { pub id: u64, pub date: NaiveDateTime, pub sender: String, pub receiver: u64, pub amount: u64, pub sender_balance: u64, pub receiver_balance: u64, pub purpose: String, } #[derive(Debug, Clone)] pub struct Transaction { pub id: u64, pub date: NaiveDateTime, pub sender: String, pub receiver: String, pub amount: u64, pub sender_balance: u64, pub receiver_balance: u64, pub purpose: String, } pub struct TransactionsRequest { id: u64, } impl TransactionsRequest { pub fn new(id: u64) -> Self { Self { id } } } impl Message for TransactionsRequest { type Result = Result, ServiceError>; } impl Handler for DbExecutor { type Result = Result, ServiceError>; fn handle(&mut self, req: TransactionsRequest, _: &mut Self::Context) -> Self::Result { let conn: &MysqlConnection = &self.0.get().unwrap(); use transaction_columns as t_cols; let temp = transactions .filter( transaction_columns::sender .eq(req.id) .or(transaction_columns::receiver.eq(req.id)), ) .inner_join(users.on(transaction_columns::sender.eq(user_columns::id))) .select(( t_cols::id, t_cols::date, user_columns::name, t_cols::receiver, t_cols::amount, t_cols::sender_balance, t_cols::receiver_balance, t_cols::purpose, )) .order(transaction_columns::date.desc()) .load::(conn) .map_err(|_| ServiceError::NotFound)?; let recv_ids = temp.iter().map(|t| t.receiver).collect::>(); let receivers = users .select((user_columns::id, user_columns::name)) .filter(user_columns::id.eq_any(recv_ids)) .load::<(u64, String)>(conn)?; let mut output = Vec::with_capacity(temp.len()); for t in temp { let (_, name) = receivers .iter() .find(|(id, _)| id == &t.receiver) .expect("oh no"); let t = Transaction { id: t.id, date: t.date, sender: t.sender, receiver: name.to_owned(), amount: t.amount, sender_balance: t.sender_balance, receiver_balance: t.receiver_balance, purpose: t.purpose, }; output.push(t); } Ok(output) } } pub struct TransferRequest { pub from: String, pub to: String, pub amount: u64, pub purpose: String, } impl Message for TransferRequest { type Result = Result<(), ServiceError>; } impl Handler for DbExecutor { type Result = Result<(), ServiceError>; fn handle(&mut self, req: TransferRequest, _: &mut Self::Context) -> Self::Result { let conn: &MysqlConnection = &self.0.get().unwrap(); Ok(conn.transaction::<_, diesel::result::Error, _>(|| { // Lock the user record to avoid modification by other threads users .filter( user_columns::name .eq(&req.from) .or(user_columns::name.eq(&req.to)), ) .for_update() .execute(conn)?; let sender = users .filter(user_columns::name.eq(&req.from)) .first::(conn)?; let new_sender_balance = sender .balance .checked_sub(req.amount) .ok_or(diesel::result::Error::RollbackTransaction)?; diesel::update(users.find(sender.id)) .set(user_columns::balance.eq(new_sender_balance)) .execute(conn)?; let receiver = users .filter(user_columns::name.eq(&req.to)) .first::(conn)?; let new_receiver_balance = receiver .balance .checked_add(req.amount) .ok_or(diesel::result::Error::RollbackTransaction)?; diesel::update(users.find(receiver.id)) .set(user_columns::balance.eq(new_receiver_balance)) .execute(conn)?; let t = NewTransaction { sender: sender.id, receiver: receiver.id, amount: req.amount, sender_balance: new_sender_balance, receiver_balance: new_receiver_balance, purpose: &req.purpose, }; diesel::insert_into(transactions).values(&t).execute(conn)?; Ok(()) })?) } }