1
0

Initial commit

This commit is contained in:
2024-04-16 13:36:18 +03:00
commit 9e76d112fc
4 changed files with 302 additions and 0 deletions

22
.gitignore vendored Executable file
View File

@@ -0,0 +1,22 @@
# Created by https://www.toptal.com/developers/gitignore/api/rust
# Edit at https://www.toptal.com/developers/gitignore?templates=rust
### Rust ###
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# End of https://www.toptal.com/developers/gitignore/api/rust
config.toml

19
Cargo.toml Normal file
View File

@@ -0,0 +1,19 @@
[package]
name = "rust_vk_stats"
version = "0.1.0"
edition = "2021"
[profile.release]
debug = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
colored = "2.1.0"
mysql_async = { version = "0.34.1", features = ["chrono"]}
reqwest = {version = "0.12.2", features = ["json"]}
serde = { version = "1.0.97", features = ["derive"] }
serde_json = "1.0.115"
tokio = { version = "1.36.0", features = ["full"] }
toml = "0.8.12"

203
src/main.rs Executable file
View File

@@ -0,0 +1,203 @@
use colored::Colorize;
use mysql_async::{prelude::*, Pool, Transaction};
use std::time::Instant;
use std::{error::Error, fs};
use structs::*;
pub mod structs;
// Construct URL for API access
fn api_url(method: &str, params: Vec<&str>, config: &Config) -> String {
let mut url: String = "https://api.vk.com/method/".to_string() + method + "?";
for param in params {
url = url + "&" + param;
}
url + "&v=" + &config.vk_version + "&access_token=" + &config.access_token
}
// Parse config from config.toml
fn parse_config() -> Result<Config, String> {
let config_contents =
fs::read_to_string("config.toml").expect("Couldn't read config file. Aborting! :(");
let deserialized = toml::from_str::<TomlFile>(&config_contents);
if deserialized.is_err() {
return Err("wrong_config".to_string());
}
let yaml: TomlFile = deserialized.unwrap();
let config = yaml.config;
Ok(config)
}
// Connect to database
async fn db_connect(config: &Config) -> Result<Pool, Box<dyn Error>> {
let database_url = "mysql://".to_string()
+ &config.database_username
+ ":"
+ &config.database_password
+ "@"
+ &config.database_hostname
+ ":"
+ &config.database_port
+ "/"
+ &config.database_name;
let pool = mysql_async::Pool::new(database_url.as_str());
Ok(pool)
}
// Write array to database
async fn write_arr_to_db(transaction: &mut Transaction<'_>, arr: &Vec<VkMessage>) -> Result<(), Box<dyn Error>> {
let start = Instant::now();
transaction.exec_batch(
r"INSERT INTO messages (time, id, sender, message, sticker) VALUES (:time, :id, :sender, :message, :sticker)",
arr.iter().map(|p| {
params! {
"time" => p.date,
"id" => p.id,
"sender" => p.from_id,
"message" => p.text.clone(),
"sticker" => {
if p.attachments.len() > 0 && p.attachments[0].sticker.is_some() {
p.attachments[0].sticker.as_ref().unwrap().sticker_id
} else {
0
}
},
}
}),
)
.await?;
let duration = start.elapsed();
println!("Time elapsed in write_arr_to_db() is: {:?}", duration);
println!("{}", "Inserted into DB!".green());
Ok(())
}
// Clear database and start anew
async fn rebuild(pool: &mut Pool, config: &Config) -> Result<(), Box<dyn Error>> {
let mut conn = pool.get_conn().await?;
let result = conn.query_drop("DROP TABLE messages").await;
if result.is_err() {
println!(
"{}",
"Couldn't delete table. Doesn't exist? Proceeding..."
.yellow()
.bold()
);
} else {
println!("{}", "Dropped messages table.".red().bold());
}
conn.query_drop(
"CREATE TABLE IF NOT EXISTS messages
(n MEDIUMINT NOT NULL AUTO_INCREMENT,
time INT, id INT, sender INT, message TEXT,
sticker INT,
sticker_url TEXT,
CONSTRAINT n PRIMARY KEY (n))",
)
.await?;
println!("{}", "Recreated table.".green());
drop(conn);
let count: i64;
loop {
let result = reqwest::get(api_url(
"messages.getHistory",
vec!["count=200", &("peer_id=".to_owned() + &config.peer_id)],
config,
))
.await?
.json::<VkResponse>()
.await;
if result.is_ok() {
count = result.unwrap().response.count;
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(340)).await;
}
println!("{}", count);
let conn = &mut pool.get_conn().await?;
let mut transaction = conn.start_transaction(mysql_async::TxOpts::new()).await?;
for offset in (0..count).step_by(200) {
let start = Instant::now();
let unwrapped_result: Option<VkResponse>;
loop {
let result = reqwest::get(api_url(
"messages.getHistory",
vec![
"count=200",
&format!("offset={}", offset),
&("peer_id=".to_owned() + &config.peer_id),
"rev=1",
],
config,
))
.await?
.json::<VkResponse>()
.await;
if result.is_err() {
continue;
}
println!("{}", format!("Processed {}", offset).green());
unwrapped_result = Some(result.unwrap());
break;
}
println!("Writing to DB");
write_arr_to_db(
&mut transaction,
&unwrapped_result.unwrap().response.items,
)
.await?;
let duration = start.elapsed();
print!("Time elapsed in main FOR is: {:?}", duration);
if duration.as_millis() < 340 {
let new_dur = tokio::time::Duration::from_millis(340) - duration;
println!(", so sleeping for {}ms", new_dur.as_millis());
tokio::time::sleep(new_dur).await;
} else {print!("\n");}
}
transaction.commit().await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = clap::Args::parse();
println!(
"{}",
"VK dialogue statistics collector initializing..."
.yellow()
.bold()
);
println!("{}", "Parsing config from config.yml.".yellow().bold());
let config = parse_config();
if config.is_err() {
println!(
"{}",
"Error parsing config. Please check if it's correct! Shutting down :("
.red()
.bold()
);
return Ok(());
}
let config = config.unwrap();
let pool = db_connect(&config).await;
let mut pool = pool.expect("Error while connecting to DB. Shutting down :(");
println!(
"{}",
"Connected to MYSQL database successfully!".green().bold()
);
if cli.rebuild > 0 {
println!("{}", "Rebuilding mode.".yellow());
rebuild(&mut pool, &config).await?;
} else {
println!(
"{}",
"No argument specified. Running in usual mode.".yellow()
);
}
pool.disconnect().await?;
Ok(())
}

58
src/structs.rs Normal file
View File

@@ -0,0 +1,58 @@
use serde::{Deserialize, Serialize};
use clap::{command, Parser};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct TomlFile {
pub config: Config,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Config {
pub peer_id: String,
pub access_token: String,
pub vk_version: String,
pub database_hostname: String,
pub database_port: String,
pub database_username: String,
pub database_password: String,
pub database_password_root: String,
pub database_name: String,
}
#[derive(Serialize, Deserialize)]
pub struct VkResponse {
pub response: VkResp,
}
#[derive(Serialize, Deserialize)]
pub struct VkResp {
pub count: i64,
pub items: Vec<VkMessage>,
}
#[derive(Serialize, Deserialize)]
pub struct VkMessage {
pub date: i64,
pub from_id: i64,
pub id: i64,
pub text: String,
pub peer_id: i64,
pub attachments: Vec<Attachment>,
}
#[derive(Serialize, Deserialize)]
pub struct Attachment {
pub sticker: Option<Sticker>,
}
#[derive(Serialize, Deserialize)]
pub struct Sticker {
pub sticker_id: i64,
}
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Args {
#[arg(short, long, action = clap::ArgAction::Count)]
pub rebuild: u8,
}