This paste expires on 2023-04-27 04:22:06.708022. Repaste, or download this paste. . Pasted through web.

use std::{fs::File, env, error::Error, fmt, io::{BufReader, BufRead},
    time::Instant, thread::spawn, sync::{mpsc::{SyncSender, sync_channel}}};
use flate2::read::GzDecoder;
use rusqlite::{Connection, Result};
//use csv::Reader;
//use time_fmt::parse;
#[derive(Debug)]
struct ArgErr(&'static str);
impl Error for ArgErr {}
impl fmt::Display for ArgErr {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Argument Error: {}", &self.0)
    }
}
#[derive(PartialEq, Clone, Copy)]
enum Format {
    TwentySeventeen,
    TwentyTwentyTwo
}
#[derive(Default)]
struct Record {
    ts: i64,
    userhash: String,
    color: String,
    coords: Vec<u16>
}
const BATCH_SIZE: usize = 1000000;
fn reader(mut inp: csv::Reader<Box<dyn BufRead + Send>>, outp: SyncSender<Vec<Record>>, fmt: Format) {
    let mut batch = Vec::<Record>::new();
for line in inp.records() {
if line.is_err() { continue } // Bad line, see if iterator keeps going
let values = line.unwrap();
        let mut r: Record = Record::default();
        match fmt {
            Format::TwentyTwentyTwo => {
                r.ts = time_fmt::parse::parse_date_time_maybe_with_zone("%Y-%m-%d %H:%M:%S", &values[0]).expect("Bad timestamp").0.assume_utc().unix_timestamp();
                r.userhash = values[1].to_string();
                r.color = values[2].to_string();
                r.coords = values[3].split(",").map(|x| x.parse::<u16>().unwrap()).collect(); // Variable number of coordinates, split commas, for each section convert to u16, collect into vec
            } // match case 2022
            Format::TwentySeventeen => {
                r.ts = values[0].parse::<i64>().expect("Bad timestamp");
                r.userhash = values[1].to_string();
                r.color = values[4].to_string();
                r.coords = Vec::new(); // Only 2 coords.
                r.coords.push(values[2].parse::<u16>().expect("Bad coords"));
                r.coords.push(values[3].parse::<u16>().expect("Bad coords"));
            } // match case 2017
        } // match
batch.push(r);
if batch.len() >= BATCH_SIZE { // Ready to send this batch?
outp.send(batch).expect("Error sending batch to main");
batch = Vec::<Record>::new();
} // end of batch test
} // for each csv line
outp.send(batch).expect("Writer: error sending batch");
} // fn reader
fn main() -> Result<(), Box<dyn Error>> {
    let args: Vec<String> = env::args().collect();
    if args.len() < 4 {
        println!("usage: {} DATABASE INPUT FORMAT", args[0]);
        return Err(Box::new(ArgErr("Not enough arguments")));
    }
    let file = &args[1]; // DB filename
    let infile = args[2].clone(); // Input filename
    let fmt = match args[3].parse::<usize>().expect("FORMAT should be 2017 or 2022") {
        2017 => Format::TwentySeventeen,
        2022 => Format::TwentyTwentyTwo,
        _ => {return Err(Box::new(ArgErr("FORMAT should be 2017 or 2022")))}
    };
    let (sender, receiver) = sync_channel::<Vec<Record>>(1); // Pipe from reader to main
    let bufrdr: Box<dyn BufRead + Send>; // Input stream
    if infile.contains(".gz") { // Compressed input?
        bufrdr = Box::new(BufReader::new(GzDecoder::new(File::open(infile).expect("Couldn't open the input file."))));
    }
    else { // PLain input.
        bufrdr = Box::new(BufReader::new(File::open(infile).expect("Couldn't open the input file.")));
    }
    spawn(move || {reader(csv::Reader::from_reader(bufrdr), sender, fmt)}); // Start the reader
    let db = Connection::open(file)?;
    db.execute_batch( // Prepare the database
        "PRAGMA journal_mode = MEMORY;
             PRAGMA synchronus = 0;
             PRAGMA locking_mode = EXCLUSIVE;
             PRAGMA page_size = 65536;
             VACUUM;
             CREATE TABLE IF NOT EXISTS placements (timestamp integer, userhash text, color text, x integer, y integer);
             CREATE TABLE IF NOT EXISTS moderation (timestamp integer, userhash text, color text, x1 integer, x2 integer, y1 integer, y2 integer);"
        )?;
    let mut istmt = db.prepare(format!("INSERT INTO placements VALUES (?, ?, ?, ?, ?)").as_str())?;
    let mut modstmt = db.prepare(format!("INSERT INTO moderation VALUES (?, ?, ?, ?, ?, ?, ?)").as_str())?;
    let mut beginstmt = db.prepare("BEGIN TRANSACTION")?;
    let mut comstmt = db.prepare("COMMIT")?;
    let startts = Instant::now(); // Start time
    let mut segts = startts.clone(); // Segment time, reset each batch.
    let mut i: usize = 0; // Number of records encountered
    beginstmt.execute([])?;
    for batch in receiver.iter() {
        for rcd in batch {
            i += 1;
            match rcd.coords.len() {
                2 => {istmt.execute(rusqlite::params![rcd.ts, rcd.userhash, rcd.color, rcd.coords[0], rcd.coords[1]])?;} // Normal event
                4 => {modstmt.execute(rusqlite::params![rcd.ts, rcd.userhash, rcd.color, rcd.coords[0], rcd.coords[1], rcd.coords[2], rcd.coords[3]])?;} // Moderation rectangle
                x => {
                    println!("wtf? weird coords length {}", x);
                    continue;
                } // match case x
            } // match
        } // for each record
        comstmt.execute([])?; // Next transaction
        beginstmt.execute([])?;
        println!("{} transactions completed in {:#?} (this batch: {:#?})", i, Instant::now() - startts, Instant::now() - segts);
        segts = Instant::now();
    } // for each batch
    comstmt.execute([])?; // Commit, report, bail.
    println!("{} transactions completed in {:#?} (this batch: {:#?})", i, Instant::now() - startts, Instant::now() - segts);
    Ok(())
} // fn main()
Filename: None. Size: 6kb. View raw, , hex, or download this file.