From 1c925d97e7295babff538abd2fa4dbb2f5b74be6 Mon Sep 17 00:00:00 2001 From: danshat Date: Fri, 19 Apr 2024 01:09:48 +0300 Subject: [PATCH] Concurrency (sorta) --- Cargo.toml | 1 + request.vkscript | 30 ++++++++++ src/main.rs | 151 +++++++++++++++++++++++++++++------------------ src/structs.rs | 18 +++--- 4 files changed, 137 insertions(+), 63 deletions(-) create mode 100644 request.vkscript diff --git a/Cargo.toml b/Cargo.toml index 0ab804e..85ca3d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,4 @@ serde = { version = "1.0.97", features = ["derive"] } serde_json = "1.0.115" tokio = { version = "1.36.0", features = ["full"] } toml = "0.8.12" +url = "2.5.0" diff --git a/request.vkscript b/request.vkscript new file mode 100644 index 0000000..44ce3f0 --- /dev/null +++ b/request.vkscript @@ -0,0 +1,30 @@ +var offset = {1}; +var peer_id = {2}; + +var hist = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 0}).items; +var hist_2 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 200}).items; +var hist_3 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 400}).items; +var hist_4 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 600}).items; +var hist_5 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 800}).items; +var hist_6 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1000}).items; +var hist_7 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1200}).items; +var hist_8 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1400}).items; +var hist_9 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1600}).items; +var hist_10 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1800}).items; +var hist_11 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2000}).items; +var hist_12 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2200}).items; +var hist_13 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2400}).items; +var hist_14 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2600}).items; +var hist_15 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2800}).items; +var hist_16 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3000}).items; +var hist_17 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3200}).items; +var hist_18 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3400}).items; +var hist_19 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3600}).items; +var hist_20 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3800}).items; +var hist_21 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4000}).items; +var hist_22 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4200}).items; +var hist_23 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4400}).items; +var hist_24 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4600}).items; +var hist_25 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4800}).items; + +return hist + hist_2 + hist_3 + hist_4 + hist_5 + hist_6 + hist_7 + hist_8 + hist_9 + hist_10 + hist_11 + hist_12 + hist_13 + hist_14 + hist_15 + hist_16 + hist_17 + hist_18 + hist_19 + hist_20 + hist_21 + hist_22 + hist_23 + hist_24 + hist_25; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index b6198f0..8b02ab1 100755 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,11 @@ +use clap::Parser; use colored::Colorize; -use mysql_async::{prelude::*, Pool, Transaction}; +use mysql_async::{prelude::*, Conn, Pool}; use std::time::Instant; use std::{error::Error, fs}; +use tokio::join; +use tokio::task::JoinHandle; +use url::form_urlencoded::byte_serialize; use structs::*; pub mod structs; @@ -28,9 +32,9 @@ fn parse_config() -> Result { Ok(config) } -// Connect to database +// Connect to database and return connection pool async fn db_connect(config: &Config) -> Result> { - let database_url = "mysql://".to_string() + let database_url = "mysql://".to_string() // JDBC connection url + &config.database_username + ":" + &config.database_password @@ -45,9 +49,9 @@ async fn db_connect(config: &Config) -> Result> { } // Write array to database -async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec) -> Result<(), Box> { +async fn write_arr_to_db(mut conn: Conn, arr: Vec) { let start = Instant::now(); - transaction.exec_batch( + conn.exec_batch( // Might be faster to do in a single transaction, but unsure how to do this r"INSERT INTO messages (time, id, sender, message, sticker) VALUES (:time, :id, :sender, :message, :sticker)", arr.iter().map(|p| { params! { @@ -65,17 +69,84 @@ async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec } }), ) - .await?; + .await.expect("Failed to execute request to DB"); let duration = start.elapsed(); - println!("Time elapsed in write_arr_to_db() is: {:?}", duration); + println!( + "Time elapsed in write_arr_to_db() is: {:.2}s", + duration.as_secs_f32() + ); println!("{}", "Inserted into DB!".green()); - Ok(()) +} + +async fn get_from_file_vkscript() -> String { + let file_string = fs::read_to_string("request.vkscript").expect("Reading VKScript file"); + file_string +} + +// This thread handles full database rebuild +async fn thread_requests<'a>(count: i64, code: String, config: Config, pool: Pool) -> Config { + let mut handles: Vec> = vec![]; // Vector for joining database writing threads later + for offset in (0..count).step_by(5000) { + // Get messages in batches of 5k because that is the maximum that /method/execute allows + let start = Instant::now(); + let unwrapped_result: Option; + loop { + // Just in case something fails -- retry + let replaced_code = code + .replace("{1}", &offset.to_string()) + .replace("{2}", &config.peer_id); // Put in peer_id and offsets + let urlencoded: String = byte_serialize(replaced_code.as_bytes()).collect(); // Encode string for request to VK + + let result = reqwest::get(api_url( // Send GET request to VK API + "execute", + vec![&("code=".to_string() + &urlencoded)], + &config, + )) + .await; + if result.is_err() { + println!("{}", "VK request failed".red().bold()); + continue; + } + let result_text = result.unwrap().text().await.expect("Request to VK failed"); + let result_2 = serde_json::from_str::(&result_text); + if result_2.is_err() { + println!("{}", "Error while deserializing request".red().bold()); + continue; + } + println!("{}", format!("Processed {}", offset).green()); + unwrapped_result = Some(result_2.unwrap()); + break; + } + // Spawn a separate thread for concurrent writes to database (they are not really concurrent) and dump data there + println!("Writing to DB"); + let conn = pool + .get_conn() + .await + .expect("Failed to get connection from DB"); + + let handle = tokio::spawn(write_arr_to_db( + conn, + unwrapped_result.unwrap().clone().response, + )); + handles.push(handle); // Store for joining later + let duration = start.elapsed(); + println!( + "Time elapsed in main FOR is: {:.2}s", + duration.as_secs_f32() + ); + } + println!("{}", "Done! Awaiting for DB writes...".yellow()); + for task in handles { + task.await.expect("Failed to await DB writing task"); + } + + config } // Clear database and start anew -async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box> { +async fn rebuild(pool: &mut Pool, config: Config) -> Result> { let mut conn = pool.get_conn().await?; - let result = conn.query_drop("DROP TABLE messages").await; + let result = conn.query_drop("DROP TABLE messages").await; // Drop old table if result.is_err() { println!( "{}", @@ -88,6 +159,7 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box> } conn.query_drop( + // Create new table "CREATE TABLE IF NOT EXISTS messages (n MEDIUMINT NOT NULL AUTO_INCREMENT, time INT, id INT, sender INT, message TEXT, @@ -99,11 +171,12 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box> println!("{}", "Recreated table.".green()); drop(conn); let count: i64; + // Get message count loop { let result = reqwest::get(api_url( "messages.getHistory", vec!["count=200", &("peer_id=".to_owned() + &config.peer_id)], - config, + &config, )) .await? .json::() @@ -115,54 +188,20 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box> tokio::time::sleep(tokio::time::Duration::from_millis(340)).await; } println!("{}", count); - let conn = &mut pool.get_conn().await?; - let mut transaction = conn.start_transaction(mysql_async::TxOpts::new()).await?; - for offset in (0..count).step_by(200) { - let start = Instant::now(); - let unwrapped_result: Option; - loop { - let result = reqwest::get(api_url( - "messages.getHistory", - vec![ - "count=200", - &format!("offset={}", offset), - &("peer_id=".to_owned() + &config.peer_id), - "rev=1", - ], - config, - )) - .await? - .json::() - .await; - if result.is_err() { - continue; - } - println!("{}", format!("Processed {}", offset).green()); - unwrapped_result = Some(result.unwrap()); - break; - } - println!("Writing to DB"); - - write_arr_to_db( - &mut transaction, - &unwrapped_result.unwrap().response.items, - ) - .await?; - let duration = start.elapsed(); - print!("Time elapsed in main FOR is: {:?}", duration); - if duration.as_millis() < 340 { - let new_dur = tokio::time::Duration::from_millis(340) - duration; - println!(", so sleeping for {}ms", new_dur.as_millis()); - tokio::time::sleep(new_dur).await; - } else {print!("\n");} - } - transaction.commit().await?; - Ok(()) + // TODO: get it from config file + let code = get_from_file_vkscript().await; + + // Rebuild in separate thread + let main_handle = tokio::spawn(thread_requests(count, code, config, pool.clone())); + let config = join!(main_handle) + .0 + .expect("Couldn't join main rebuilding thread"); + Ok(config) } #[tokio::main] async fn main() -> Result<(), Box> { - let cli = clap::Args::parse(); + let cli = Args::parse(); println!( "{}", "VK dialogue statistics collector initializing..." @@ -190,7 +229,7 @@ async fn main() -> Result<(), Box> { if cli.rebuild > 0 { println!("{}", "Rebuilding mode.".yellow()); - rebuild(&mut pool, &config).await?; + rebuild(&mut pool, config).await?; } else { println!( "{}", diff --git a/src/structs.rs b/src/structs.rs index f233250..f150b2a 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -1,5 +1,5 @@ -use serde::{Deserialize, Serialize}; use clap::{command, Parser}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct TomlFile { @@ -19,18 +19,22 @@ pub struct Config { pub database_name: String, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct VkResponse { pub response: VkResp, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct VkResp { pub count: i64, pub items: Vec, } +#[derive(Serialize, Deserialize, Clone)] +pub struct VkRespCode { + pub response: Vec, +} -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct VkMessage { pub date: i64, pub from_id: i64, @@ -40,12 +44,12 @@ pub struct VkMessage { pub attachments: Vec, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct Attachment { pub sticker: Option, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct Sticker { pub sticker_id: i64, } @@ -55,4 +59,4 @@ pub struct Sticker { pub struct Args { #[arg(short, long, action = clap::ArgAction::Count)] pub rebuild: u8, -} \ No newline at end of file +}