vutran1710 of gapo.tinh-te
2/15/2020 - 9:39 AM

MySQL Client

use crate::structs::types::PostMeta;
use mysql::{from_row, Conn, Opts};
use std::{thread, time};

pub struct Mysql {
    uri: String,
    conn: Conn,
    qs: String,
}

pub trait MysqlService {
    fn new(mysql_uri: String, max_post_query: u32) -> Self;
    fn retry_connection(&mut self) -> Result<(), String>;
    fn get_posts(
        &mut self,
        callback: impl FnMut(PostMeta) -> ()
    ) -> Option<()>;
}

impl MysqlService for Mysql {
    fn new(mysql_uri: String, max_post_query: u32) -> Mysql {
        let uri = Opts::from_url(&mysql_uri).unwrap();
        let conn = Conn::new(uri).unwrap();
        let qs = format!("SELECT * FROM post DESC LIMIT {}", max_ppost_query);

        Mysql {
            conn,
            qs,
            uri: mysql_uri.clone(),
        }
    }

    fn retry_connection(&mut self) -> Result<(), String> {
        match self.conn.ping() {
            true => return Ok(()),
            false => (),
        }

        let mut counter = 0;

        let wait = time::Duration::from_millis(1000);

        loop {
            if counter > 5 {
                error!("Too many retries...");
                return Err("Failed to connect".to_string());
            }

            warn!("Reseting connection! {}", counter);

            match Conn::new(&self.uri) {
                Ok(new_conn) => {
                    self.conn = new_conn;
                    return Ok(());
                }
                Err(_) => thread::sleep(wait),
            };

            counter += 1;
        }
    }

    fn get_posts(
        &mut self,
        mut callback: impl FnMut(PostMeta) -> (),
    ) -> Option<()> {
        match self.retry_connection() {
            Ok(_) => (),
            Err(_) => return None,
        }

        let make_post = |row| {
            let (id, react_count, comment_count, share_count, create_time, update_time) =
                // NOTE: directly casting type for each fields
                from_row::<(String, Option<u32>, Option<u32>, Option<u32>, u64, Option<u64>)>(row);
            PostMeta {
                id,
                react_count: react_count.unwrap_or(0),
                comment_count: comment_count.unwrap_or(0),
                share_count: share_count.unwrap_or(0),
                create_time,
                update_time: update_time.unwrap_or(create_time),
            }
        };

        let query = self.conn.prep_exec(&self.qs, ()).map(|result| {
            result
                .map(|x| x.unwrap())
                .map(|row| from_row::<String>(row))
                .collect()
        });

        let posts: Vec<PostMeta> = self
            .conn
            .prep_exec(&self.qs, ())
            .map(|result| result.map(|x| x.unwrap()).map(make_post).collect())
            .unwrap();

        posts
            .into_iter()
            .for_each(|post| callback(post));

        Some(())
    }
}