From 9e76d112fc04d360d75b543348f975737a405e01 Mon Sep 17 00:00:00 2001 From: danshat Date: Tue, 16 Apr 2024 13:36:18 +0300 Subject: [PATCH] Initial commit --- .gitignore | 22 ++++++ Cargo.toml | 19 +++++ src/main.rs | 203 +++++++++++++++++++++++++++++++++++++++++++++++++ src/structs.rs | 58 ++++++++++++++ 4 files changed, 302 insertions(+) create mode 100755 .gitignore create mode 100644 Cargo.toml create mode 100755 src/main.rs create mode 100644 src/structs.rs diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..9681487 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Created by https://www.toptal.com/developers/gitignore/api/rust +# Edit at https://www.toptal.com/developers/gitignore?templates=rust + +### Rust ### +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# End of https://www.toptal.com/developers/gitignore/api/rust + +config.toml diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..0ab804e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "rust_vk_stats" +version = "0.1.0" +edition = "2021" + +[profile.release] +debug = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { version = "4.5.4", features = ["derive"] } +colored = "2.1.0" +mysql_async = { version = "0.34.1", features = ["chrono"]} +reqwest = {version = "0.12.2", features = ["json"]} +serde = { version = "1.0.97", features = ["derive"] } +serde_json = "1.0.115" +tokio = { version = "1.36.0", features = ["full"] } +toml = "0.8.12" diff --git a/src/main.rs b/src/main.rs new file mode 100755 index 0000000..b6198f0 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,203 @@ +use colored::Colorize; +use mysql_async::{prelude::*, Pool, Transaction}; +use std::time::Instant; +use std::{error::Error, fs}; + +use structs::*; +pub mod structs; + +// Construct URL for API access +fn api_url(method: &str, params: Vec<&str>, config: &Config) -> String { + let mut url: String = "https://api.vk.com/method/".to_string() + method + "?"; + for param in params { + url = url + "&" + param; + } + url + "&v=" + &config.vk_version + "&access_token=" + &config.access_token +} + +// Parse config from config.toml +fn parse_config() -> Result { + let config_contents = + fs::read_to_string("config.toml").expect("Couldn't read config file. Aborting! :("); + let deserialized = toml::from_str::(&config_contents); + if deserialized.is_err() { + return Err("wrong_config".to_string()); + } + let yaml: TomlFile = deserialized.unwrap(); + let config = yaml.config; + Ok(config) +} + +// Connect to database +async fn db_connect(config: &Config) -> Result> { + let database_url = "mysql://".to_string() + + &config.database_username + + ":" + + &config.database_password + + "@" + + &config.database_hostname + + ":" + + &config.database_port + + "/" + + &config.database_name; + let pool = mysql_async::Pool::new(database_url.as_str()); + Ok(pool) +} + +// Write array to database +async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec) -> Result<(), Box> { + let start = Instant::now(); + transaction.exec_batch( + r"INSERT INTO messages (time, id, sender, message, sticker) VALUES (:time, :id, :sender, :message, :sticker)", + arr.iter().map(|p| { + params! { + "time" => p.date, + "id" => p.id, + "sender" => p.from_id, + "message" => p.text.clone(), + "sticker" => { + if p.attachments.len() > 0 && p.attachments[0].sticker.is_some() { + p.attachments[0].sticker.as_ref().unwrap().sticker_id + } else { + 0 + } + }, + } + }), + ) + .await?; + let duration = start.elapsed(); + println!("Time elapsed in write_arr_to_db() is: {:?}", duration); + println!("{}", "Inserted into DB!".green()); + Ok(()) +} + +// Clear database and start anew +async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box> { + let mut conn = pool.get_conn().await?; + let result = conn.query_drop("DROP TABLE messages").await; + if result.is_err() { + println!( + "{}", + "Couldn't delete table. Doesn't exist? Proceeding..." + .yellow() + .bold() + ); + } else { + println!("{}", "Dropped messages table.".red().bold()); + } + + conn.query_drop( + "CREATE TABLE IF NOT EXISTS messages + (n MEDIUMINT NOT NULL AUTO_INCREMENT, + time INT, id INT, sender INT, message TEXT, + sticker INT, + sticker_url TEXT, + CONSTRAINT n PRIMARY KEY (n))", + ) + .await?; + println!("{}", "Recreated table.".green()); + drop(conn); + let count: i64; + loop { + let result = reqwest::get(api_url( + "messages.getHistory", + vec!["count=200", &("peer_id=".to_owned() + &config.peer_id)], + config, + )) + .await? + .json::() + .await; + if result.is_ok() { + count = result.unwrap().response.count; + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(340)).await; + } + println!("{}", count); + let conn = &mut pool.get_conn().await?; + let mut transaction = conn.start_transaction(mysql_async::TxOpts::new()).await?; + for offset in (0..count).step_by(200) { + let start = Instant::now(); + let unwrapped_result: Option; + loop { + let result = reqwest::get(api_url( + "messages.getHistory", + vec![ + "count=200", + &format!("offset={}", offset), + &("peer_id=".to_owned() + &config.peer_id), + "rev=1", + ], + config, + )) + .await? + .json::() + .await; + if result.is_err() { + continue; + } + println!("{}", format!("Processed {}", offset).green()); + unwrapped_result = Some(result.unwrap()); + break; + } + println!("Writing to DB"); + + write_arr_to_db( + &mut transaction, + &unwrapped_result.unwrap().response.items, + ) + .await?; + let duration = start.elapsed(); + print!("Time elapsed in main FOR is: {:?}", duration); + if duration.as_millis() < 340 { + let new_dur = tokio::time::Duration::from_millis(340) - duration; + println!(", so sleeping for {}ms", new_dur.as_millis()); + tokio::time::sleep(new_dur).await; + } else {print!("\n");} + } + transaction.commit().await?; + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = clap::Args::parse(); + println!( + "{}", + "VK dialogue statistics collector initializing..." + .yellow() + .bold() + ); + println!("{}", "Parsing config from config.yml.".yellow().bold()); + let config = parse_config(); + if config.is_err() { + println!( + "{}", + "Error parsing config. Please check if it's correct! Shutting down :(" + .red() + .bold() + ); + return Ok(()); + } + let config = config.unwrap(); + let pool = db_connect(&config).await; + let mut pool = pool.expect("Error while connecting to DB. Shutting down :("); + println!( + "{}", + "Connected to MYSQL database successfully!".green().bold() + ); + + if cli.rebuild > 0 { + println!("{}", "Rebuilding mode.".yellow()); + rebuild(&mut pool, &config).await?; + } else { + println!( + "{}", + "No argument specified. Running in usual mode.".yellow() + ); + } + + pool.disconnect().await?; + Ok(()) +} diff --git a/src/structs.rs b/src/structs.rs new file mode 100644 index 0000000..f233250 --- /dev/null +++ b/src/structs.rs @@ -0,0 +1,58 @@ +use serde::{Deserialize, Serialize}; +use clap::{command, Parser}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct TomlFile { + pub config: Config, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Config { + pub peer_id: String, + pub access_token: String, + pub vk_version: String, + pub database_hostname: String, + pub database_port: String, + pub database_username: String, + pub database_password: String, + pub database_password_root: String, + pub database_name: String, +} + +#[derive(Serialize, Deserialize)] +pub struct VkResponse { + pub response: VkResp, +} + +#[derive(Serialize, Deserialize)] +pub struct VkResp { + pub count: i64, + pub items: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct VkMessage { + pub date: i64, + pub from_id: i64, + pub id: i64, + pub text: String, + pub peer_id: i64, + pub attachments: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct Attachment { + pub sticker: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct Sticker { + pub sticker_id: i64, +} + +#[derive(Parser)] +#[command(version, about, long_about = None)] +pub struct Args { + #[arg(short, long, action = clap::ArgAction::Count)] + pub rebuild: u8, +} \ No newline at end of file