Compare commits
2 Commits
5ae6292e23
...
107662e649
| Author | SHA1 | Date | |
|---|---|---|---|
| 107662e649 | |||
| 1c925d97e7 |
@@ -17,3 +17,4 @@ serde = { version = "1.0.97", features = ["derive"] }
|
||||
serde_json = "1.0.115"
|
||||
tokio = { version = "1.36.0", features = ["full"] }
|
||||
toml = "0.8.12"
|
||||
url = "2.5.0"
|
||||
|
||||
30
request.vkscript
Normal file
30
request.vkscript
Normal file
@@ -0,0 +1,30 @@
|
||||
var offset = {1};
|
||||
var peer_id = {2};
|
||||
|
||||
var hist = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 0}).items;
|
||||
var hist_2 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 200}).items;
|
||||
var hist_3 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 400}).items;
|
||||
var hist_4 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 600}).items;
|
||||
var hist_5 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 800}).items;
|
||||
var hist_6 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1000}).items;
|
||||
var hist_7 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1200}).items;
|
||||
var hist_8 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1400}).items;
|
||||
var hist_9 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1600}).items;
|
||||
var hist_10 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 1800}).items;
|
||||
var hist_11 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2000}).items;
|
||||
var hist_12 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2200}).items;
|
||||
var hist_13 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2400}).items;
|
||||
var hist_14 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2600}).items;
|
||||
var hist_15 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 2800}).items;
|
||||
var hist_16 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3000}).items;
|
||||
var hist_17 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3200}).items;
|
||||
var hist_18 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3400}).items;
|
||||
var hist_19 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3600}).items;
|
||||
var hist_20 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 3800}).items;
|
||||
var hist_21 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4000}).items;
|
||||
var hist_22 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4200}).items;
|
||||
var hist_23 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4400}).items;
|
||||
var hist_24 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4600}).items;
|
||||
var hist_25 = API.messages.getHistory({peer_id: peer_id, count: 200, offset: offset + 4800}).items;
|
||||
|
||||
return hist + hist_2 + hist_3 + hist_4 + hist_5 + hist_6 + hist_7 + hist_8 + hist_9 + hist_10 + hist_11 + hist_12 + hist_13 + hist_14 + hist_15 + hist_16 + hist_17 + hist_18 + hist_19 + hist_20 + hist_21 + hist_22 + hist_23 + hist_24 + hist_25;
|
||||
151
src/main.rs
151
src/main.rs
@@ -1,7 +1,11 @@
|
||||
use clap::Parser;
|
||||
use colored::Colorize;
|
||||
use mysql_async::{prelude::*, Pool, Transaction};
|
||||
use mysql_async::{prelude::*, Conn, Pool};
|
||||
use std::time::Instant;
|
||||
use std::{error::Error, fs};
|
||||
use tokio::join;
|
||||
use tokio::task::JoinHandle;
|
||||
use url::form_urlencoded::byte_serialize;
|
||||
|
||||
use structs::*;
|
||||
pub mod structs;
|
||||
@@ -28,9 +32,9 @@ fn parse_config() -> Result<Config, String> {
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
// Connect to database
|
||||
// Connect to database and return connection pool
|
||||
async fn db_connect(config: &Config) -> Result<Pool, Box<dyn Error>> {
|
||||
let database_url = "mysql://".to_string()
|
||||
let database_url = "mysql://".to_string() // JDBC connection url
|
||||
+ &config.database_username
|
||||
+ ":"
|
||||
+ &config.database_password
|
||||
@@ -45,9 +49,9 @@ async fn db_connect(config: &Config) -> Result<Pool, Box<dyn Error>> {
|
||||
}
|
||||
|
||||
// Write array to database
|
||||
async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec<VkMessage>) -> Result<(), Box<dyn Error>> {
|
||||
async fn write_arr_to_db(mut conn: Conn, arr: Vec<VkMessage>) {
|
||||
let start = Instant::now();
|
||||
transaction.exec_batch(
|
||||
conn.exec_batch( // Might be faster to do in a single transaction, but unsure how to do this
|
||||
r"INSERT INTO messages (time, id, sender, message, sticker) VALUES (:time, :id, :sender, :message, :sticker)",
|
||||
arr.iter().map(|p| {
|
||||
params! {
|
||||
@@ -65,17 +69,84 @@ async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec<VkMessage>
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
.await.expect("Failed to execute request to DB");
|
||||
let duration = start.elapsed();
|
||||
println!("Time elapsed in write_arr_to_db() is: {:?}", duration);
|
||||
println!(
|
||||
"Time elapsed in write_arr_to_db() is: {:.2}s",
|
||||
duration.as_secs_f32()
|
||||
);
|
||||
println!("{}", "Inserted into DB!".green());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_from_file_vkscript() -> String {
|
||||
let file_string = fs::read_to_string("request.vkscript").expect("Reading VKScript file");
|
||||
file_string
|
||||
}
|
||||
|
||||
// This thread handles full database rebuild
|
||||
async fn thread_requests<'a>(count: i64, code: String, config: Config, pool: Pool) -> Config {
|
||||
let mut handles: Vec<JoinHandle<()>> = vec![]; // Vector for joining database writing threads later
|
||||
for offset in (0..count).step_by(5000) {
|
||||
// Get messages in batches of 5k because that is the maximum that /method/execute allows
|
||||
let start = Instant::now();
|
||||
let unwrapped_result: Option<VkRespCode>;
|
||||
loop {
|
||||
// Just in case something fails -- retry
|
||||
let replaced_code = code
|
||||
.replace("{1}", &offset.to_string())
|
||||
.replace("{2}", &config.peer_id); // Put in peer_id and offsets
|
||||
let urlencoded: String = byte_serialize(replaced_code.as_bytes()).collect(); // Encode string for request to VK
|
||||
|
||||
let result = reqwest::get(api_url( // Send GET request to VK API
|
||||
"execute",
|
||||
vec![&("code=".to_string() + &urlencoded)],
|
||||
&config,
|
||||
))
|
||||
.await;
|
||||
if result.is_err() {
|
||||
println!("{}", "VK request failed".red().bold());
|
||||
continue;
|
||||
}
|
||||
let result_text = result.unwrap().text().await.expect("Request to VK failed");
|
||||
let result_2 = serde_json::from_str::<VkRespCode>(&result_text);
|
||||
if result_2.is_err() {
|
||||
println!("{}", "Error while deserializing request".red().bold());
|
||||
continue;
|
||||
}
|
||||
println!("{}", format!("Processed {}", offset).green());
|
||||
unwrapped_result = Some(result_2.unwrap());
|
||||
break;
|
||||
}
|
||||
// Spawn a separate thread for concurrent writes to database (they are not really concurrent) and dump data there
|
||||
println!("Writing to DB");
|
||||
let conn = pool
|
||||
.get_conn()
|
||||
.await
|
||||
.expect("Failed to get connection from DB");
|
||||
|
||||
let handle = tokio::spawn(write_arr_to_db(
|
||||
conn,
|
||||
unwrapped_result.unwrap().clone().response,
|
||||
));
|
||||
handles.push(handle); // Store for joining later
|
||||
let duration = start.elapsed();
|
||||
println!(
|
||||
"Time elapsed in main FOR is: {:.2}s",
|
||||
duration.as_secs_f32()
|
||||
);
|
||||
}
|
||||
println!("{}", "Done! Awaiting for DB writes...".yellow());
|
||||
for task in handles {
|
||||
task.await.expect("Failed to await DB writing task");
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
|
||||
// Clear database and start anew
|
||||
async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>> {
|
||||
async fn rebuild(pool: &mut Pool, config: Config) -> Result<Config, Box<dyn Error>> {
|
||||
let mut conn = pool.get_conn().await?;
|
||||
let result = conn.query_drop("DROP TABLE messages").await;
|
||||
let result = conn.query_drop("DROP TABLE messages").await; // Drop old table
|
||||
if result.is_err() {
|
||||
println!(
|
||||
"{}",
|
||||
@@ -88,6 +159,7 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>>
|
||||
}
|
||||
|
||||
conn.query_drop(
|
||||
// Create new table
|
||||
"CREATE TABLE IF NOT EXISTS messages
|
||||
(n MEDIUMINT NOT NULL AUTO_INCREMENT,
|
||||
time INT, id INT, sender INT, message TEXT,
|
||||
@@ -99,11 +171,12 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>>
|
||||
println!("{}", "Recreated table.".green());
|
||||
drop(conn);
|
||||
let count: i64;
|
||||
// Get message count
|
||||
loop {
|
||||
let result = reqwest::get(api_url(
|
||||
"messages.getHistory",
|
||||
vec!["count=200", &("peer_id=".to_owned() + &config.peer_id)],
|
||||
config,
|
||||
&config,
|
||||
))
|
||||
.await?
|
||||
.json::<VkResponse>()
|
||||
@@ -115,54 +188,20 @@ async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>>
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(340)).await;
|
||||
}
|
||||
println!("{}", count);
|
||||
let conn = &mut pool.get_conn().await?;
|
||||
let mut transaction = conn.start_transaction(mysql_async::TxOpts::new()).await?;
|
||||
for offset in (0..count).step_by(200) {
|
||||
let start = Instant::now();
|
||||
let unwrapped_result: Option<VkResponse>;
|
||||
loop {
|
||||
let result = reqwest::get(api_url(
|
||||
"messages.getHistory",
|
||||
vec![
|
||||
"count=200",
|
||||
&format!("offset={}", offset),
|
||||
&("peer_id=".to_owned() + &config.peer_id),
|
||||
"rev=1",
|
||||
],
|
||||
config,
|
||||
))
|
||||
.await?
|
||||
.json::<VkResponse>()
|
||||
.await;
|
||||
if result.is_err() {
|
||||
continue;
|
||||
}
|
||||
println!("{}", format!("Processed {}", offset).green());
|
||||
unwrapped_result = Some(result.unwrap());
|
||||
break;
|
||||
}
|
||||
println!("Writing to DB");
|
||||
|
||||
write_arr_to_db(
|
||||
&mut transaction,
|
||||
&unwrapped_result.unwrap().response.items,
|
||||
)
|
||||
.await?;
|
||||
let duration = start.elapsed();
|
||||
print!("Time elapsed in main FOR is: {:?}", duration);
|
||||
if duration.as_millis() < 340 {
|
||||
let new_dur = tokio::time::Duration::from_millis(340) - duration;
|
||||
println!(", so sleeping for {}ms", new_dur.as_millis());
|
||||
tokio::time::sleep(new_dur).await;
|
||||
} else {print!("\n");}
|
||||
}
|
||||
transaction.commit().await?;
|
||||
Ok(())
|
||||
// TODO: get it from config file
|
||||
let code = get_from_file_vkscript().await;
|
||||
|
||||
// Rebuild in separate thread
|
||||
let main_handle = tokio::spawn(thread_requests(count, code, config, pool.clone()));
|
||||
let config = join!(main_handle)
|
||||
.0
|
||||
.expect("Couldn't join main rebuilding thread");
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let cli = clap::Args::parse();
|
||||
let cli = Args::parse();
|
||||
println!(
|
||||
"{}",
|
||||
"VK dialogue statistics collector initializing..."
|
||||
@@ -190,7 +229,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
if cli.rebuild > 0 {
|
||||
println!("{}", "Rebuilding mode.".yellow());
|
||||
rebuild(&mut pool, &config).await?;
|
||||
rebuild(&mut pool, config).await?;
|
||||
} else {
|
||||
println!(
|
||||
"{}",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use clap::{command, Parser};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct TomlFile {
|
||||
@@ -19,18 +19,22 @@ pub struct Config {
|
||||
pub database_name: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct VkResponse {
|
||||
pub response: VkResp,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct VkResp {
|
||||
pub count: i64,
|
||||
pub items: Vec<VkMessage>,
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct VkRespCode {
|
||||
pub response: Vec<VkMessage>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct VkMessage {
|
||||
pub date: i64,
|
||||
pub from_id: i64,
|
||||
@@ -40,12 +44,12 @@ pub struct VkMessage {
|
||||
pub attachments: Vec<Attachment>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct Attachment {
|
||||
pub sticker: Option<Sticker>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct Sticker {
|
||||
pub sticker_id: i64,
|
||||
}
|
||||
@@ -55,4 +59,4 @@ pub struct Sticker {
|
||||
pub struct Args {
|
||||
#[arg(short, long, action = clap::ArgAction::Count)]
|
||||
pub rebuild: u8,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user