1
0

Concurrency (sorta)

This commit is contained in:
2024-04-19 01:09:48 +03:00
parent 9e76d112fc
commit 1c925d97e7
4 changed files with 137 additions and 63 deletions

View File

@@ -17,3 +17,4 @@ serde = { version = "1.0.97", features = ["derive"] }
serde_json = "1.0.115"
tokio = { version = "1.36.0", features = ["full"] }
toml = "0.8.12"
url = "2.5.0"

30
request.vkscript Normal file
View File

@@ -0,0 +1,30 @@
var offset = {1};
var peer_id = {2};
var hist = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 0}).items;
var hist_2 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 200}).items;
var hist_3 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 400}).items;
var hist_4 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 600}).items;
var hist_5 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 800}).items;
var hist_6 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1000}).items;
var hist_7 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1200}).items;
var hist_8 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1400}).items;
var hist_9 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1600}).items;
var hist_10 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1800}).items;
var hist_11 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2000}).items;
var hist_12 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2200}).items;
var hist_13 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2400}).items;
var hist_14 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2600}).items;
var hist_15 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2800}).items;
var hist_16 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3000}).items;
var hist_17 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3200}).items;
var hist_18 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3400}).items;
var hist_19 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3600}).items;
var hist_20 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3800}).items;
var hist_21 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4000}).items;
var hist_22 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4200}).items;
var hist_23 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4400}).items;
var hist_24 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4600}).items;
var hist_25 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4800}).items;
return hist + hist_2 + hist_3 + hist_4 + hist_5 + hist_6 + hist_7 + hist_8 + hist_9 + hist_10 + hist_11 + hist_12 + hist_13 + hist_14 + hist_15 + hist_16 + hist_17 + hist_18 + hist_19 + hist_20 + hist_21 + hist_22 + hist_23 + hist_24 + hist_25;

View File

@@ -1,7 +1,11 @@
use clap::Parser;
use colored::Colorize;
use mysql_async::{prelude::*, Pool, Transaction};
use mysql_async::{prelude::*, Conn, Pool};
use std::time::Instant;
use std::{error::Error, fs};
use tokio::join;
use tokio::task::JoinHandle;
use url::form_urlencoded::byte_serialize;
use structs::*;
pub mod structs;
@@ -28,9 +32,9 @@ fn parse_config() -> Result<Config, String> {
Ok(config)
}
// Connect to database
// Connect to database and return connection pool
async fn db_connect(config: &Config) -> Result<Pool, Box<dyn Error>> {
let database_url = "mysql://".to_string()
let database_url = "mysql://".to_string() // JDBC connection url
+ &config.database_username
+ ":"
+ &config.database_password
@@ -45,9 +49,9 @@ async fn db_connect(config: &Config) -> Result<Pool, Box<dyn Error>> {
}
// Write array to database
async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec<VkMessage>) -> Result<(), Box<dyn Error>> {
async fn write_arr_to_db(mut conn: Conn, arr: Vec<VkMessage>) {
let start = Instant::now();
transaction.exec_batch(
conn.exec_batch( // Might be faster to do in a single transaction, but unsure how to do this
r"INSERT INTO messages (time, id, sender, message, sticker) VALUES (:time, :id, :sender, :message, :sticker)",
arr.iter().map(|p| {
params! {
@@ -65,17 +69,84 @@ async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec<VkMessage>
}
}),
)
.await?;
.await.expect("Failed to execute request to DB");
let duration = start.elapsed();
println!("Time elapsed in write_arr_to_db() is: {:?}", duration);
println!(
"Time elapsed in write_arr_to_db() is: {:.2}s",
duration.as_secs_f32()
);
println!("{}", "Inserted into DB!".green());
Ok(())
}
async fn get_from_file_vkscript() -> String {
let file_string = fs::read_to_string("request.vkscript").expect("Reading VKScript file");
file_string
}
// This thread handles full database rebuild
async fn thread_requests<'a>(count: i64, code: String, config: Config, pool: Pool) -> Config {
let mut handles: Vec<JoinHandle<()>> = vec![]; // Vector for joining database writing threads later
for offset in (0..count).step_by(5000) {
// Get messages in batches of 5k because that is the maximum that /method/execute allows
let start = Instant::now();
let unwrapped_result: Option<VkRespCode>;
loop {
// Just in case something fails -- retry
let replaced_code = code
.replace("{1}", &offset.to_string())
.replace("{2}", &config.peer_id); // Put in peer_id and offsets
let urlencoded: String = byte_serialize(replaced_code.as_bytes()).collect(); // Encode string for request to VK
let result = reqwest::get(api_url( // Send GET request to VK API
"execute",
vec![&("code=".to_string() + &urlencoded)],
&config,
))
.await;
if result.is_err() {
println!("{}", "VK request failed".red().bold());
continue;
}
let result_text = result.unwrap().text().await.expect("Request to VK failed");
let result_2 = serde_json::from_str::<VkRespCode>(&result_text);
if result_2.is_err() {
println!("{}", "Error while deserializing request".red().bold());
continue;
}
println!("{}", format!("Processed {}", offset).green());
unwrapped_result = Some(result_2.unwrap());
break;
}
// Spawn a separate thread for concurrent writes to database (they are not really concurrent) and dump data there
println!("Writing to DB");
let conn = pool
.get_conn()
.await
.expect("Failed to get connection from DB");
let handle = tokio::spawn(write_arr_to_db(
conn,
unwrapped_result.unwrap().clone().response,
));
handles.push(handle); // Store for joining later
let duration = start.elapsed();
println!(
"Time elapsed in main FOR is: {:.2}s",
duration.as_secs_f32()
);
}
println!("{}", "Done! Awaiting for DB writes...".yellow());
for task in handles {
task.await.expect("Failed to await DB writing task");
}
config
}
// Clear database and start anew
async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>> {
async fn rebuild(pool: &mut Pool, config: Config) -> Result<Config, Box<dyn Error>> {
let mut conn = pool.get_conn().await?;
let result = conn.query_drop("DROP TABLE messages").await;
let result = conn.query_drop("DROP TABLE messages").await; // Drop old table
if result.is_err() {
println!(
"{}",
@@ -88,6 +159,7 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>>
}
conn.query_drop(
// Create new table
"CREATE TABLE IF NOT EXISTS messages
(n MEDIUMINT NOT NULL AUTO_INCREMENT,
time INT, id INT, sender INT, message TEXT,
@@ -99,11 +171,12 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>>
println!("{}", "Recreated table.".green());
drop(conn);
let count: i64;
// Get message count
loop {
let result = reqwest::get(api_url(
"messages.getHistory",
vec!["count=200", &("peer_id=".to_owned() + &config.peer_id)],
config,
&config,
))
.await?
.json::<VkResponse>()
@@ -115,54 +188,20 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>>
tokio::time::sleep(tokio::time::Duration::from_millis(340)).await;
}
println!("{}", count);
let conn = &mut pool.get_conn().await?;
let mut transaction = conn.start_transaction(mysql_async::TxOpts::new()).await?;
for offset in (0..count).step_by(200) {
let start = Instant::now();
let unwrapped_result: Option<VkResponse>;
loop {
let result = reqwest::get(api_url(
"messages.getHistory",
vec![
"count=200",
&format!("offset={}", offset),
&("peer_id=".to_owned() + &config.peer_id),
"rev=1",
],
config,
))
.await?
.json::<VkResponse>()
.await;
if result.is_err() {
continue;
}
println!("{}", format!("Processed {}", offset).green());
unwrapped_result = Some(result.unwrap());
break;
}
println!("Writing to DB");
write_arr_to_db(
&mut transaction,
&unwrapped_result.unwrap().response.items,
)
.await?;
let duration = start.elapsed();
print!("Time elapsed in main FOR is: {:?}", duration);
if duration.as_millis() < 340 {
let new_dur = tokio::time::Duration::from_millis(340) - duration;
println!(", so sleeping for {}ms", new_dur.as_millis());
tokio::time::sleep(new_dur).await;
} else {print!("\n");}
}
transaction.commit().await?;
Ok(())
// TODO: get it from config file
let code = get_from_file_vkscript().await;
// Rebuild in separate thread
let main_handle = tokio::spawn(thread_requests(count, code, config, pool.clone()));
let config = join!(main_handle)
.0
.expect("Couldn't join main rebuilding thread");
Ok(config)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = clap::Args::parse();
let cli = Args::parse();
println!(
"{}",
"VK dialogue statistics collector initializing..."
@@ -190,7 +229,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if cli.rebuild > 0 {
println!("{}", "Rebuilding mode.".yellow());
rebuild(&mut pool, &config).await?;
rebuild(&mut pool, config).await?;
} else {
println!(
"{}",

View File

@@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use clap::{command, Parser};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct TomlFile {
@@ -19,18 +19,22 @@ pub struct Config {
pub database_name: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct VkResponse {
pub response: VkResp,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct VkResp {
pub count: i64,
pub items: Vec<VkMessage>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct VkRespCode {
pub response: Vec<VkMessage>,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct VkMessage {
pub date: i64,
pub from_id: i64,
@@ -40,12 +44,12 @@ pub struct VkMessage {
pub attachments: Vec<Attachment>,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct Attachment {
pub sticker: Option<Sticker>,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct Sticker {
pub sticker_id: i64,
}
@@ -55,4 +59,4 @@ pub struct Sticker {
pub struct Args {
#[arg(short, long, action = clap::ArgAction::Count)]
pub rebuild: u8,
}
}