| use std::{fs::File, env, error::Error, fmt, io::{BufReader, BufRead},
|
| time::Instant, thread::spawn, sync::{mpsc::{SyncSender, sync_channel}}};
|
| use flate2::read::GzDecoder;
|
| use rusqlite::{Connection, Result};
|
| //use csv::Reader;
|
| //use time_fmt::parse;
|
|
|
| #[derive(Debug)]
|
| struct ArgErr(&'static str);
|
| impl Error for ArgErr {}
|
| impl fmt::Display for ArgErr {
|
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
| write!(f, "Argument Error: {}", &self.0)
|
| }
|
| }
|
|
|
| #[derive(PartialEq, Clone, Copy)]
|
| enum Format {
|
| TwentySeventeen,
|
| TwentyTwentyTwo
|
| }
|
|
|
| #[derive(Default)]
|
| struct Record {
|
| ts: i64,
|
| userhash: String,
|
| color: String,
|
| coords: Vec<u16>
|
| }
|
|
|
| const BATCH_SIZE: usize = 1000000;
|
|
|
| fn reader(mut inp: csv::Reader<Box<dyn BufRead + Send>>, outp: SyncSender<Vec<Record>>, fmt: Format) {
|
| let mut batch = Vec::<Record>::new();
|
| for line in inp.records() {
|
| if line.is_err() { continue } // Bad line, see if iterator keeps going
|
| let values = line.unwrap();
|
| let mut r: Record = Record::default();
|
| match fmt {
|
| Format::TwentyTwentyTwo => {
|
| r.ts = time_fmt::parse::parse_date_time_maybe_with_zone("%Y-%m-%d %H:%M:%S", &values[0]).expect("Bad timestamp").0.assume_utc().unix_timestamp();
|
| r.userhash = values[1].to_string();
|
| r.color = values[2].to_string();
|
| r.coords = values[3].split(",").map(|x| x.parse::<u16>().unwrap()).collect(); // Variable number of coordinates, split commas, for each section convert to u16, collect into vec
|
| } // match case 2022
|
| Format::TwentySeventeen => {
|
| r.ts = values[0].parse::<i64>().expect("Bad timestamp");
|
| r.userhash = values[1].to_string();
|
| r.color = values[4].to_string();
|
| r.coords = Vec::new(); // Only 2 coords.
|
| r.coords.push(values[2].parse::<u16>().expect("Bad coords"));
|
| r.coords.push(values[3].parse::<u16>().expect("Bad coords"));
|
| } // match case 2017
|
| } // match
|
| batch.push(r);
|
| if batch.len() >= BATCH_SIZE { // Ready to send this batch?
|
| outp.send(batch).expect("Error sending batch to main");
|
| batch = Vec::<Record>::new();
|
| } // end of batch test
|
| } // for each csv line
|
| outp.send(batch).expect("Writer: error sending batch");
|
| } // fn reader
|
|
|
| fn main() -> Result<(), Box<dyn Error>> {
|
| let args: Vec<String> = env::args().collect();
|
| if args.len() < 4 {
|
| println!("usage: {} DATABASE INPUT FORMAT", args[0]);
|
| return Err(Box::new(ArgErr("Not enough arguments")));
|
| }
|
| let file = &args[1]; // DB filename
|
| let infile = args[2].clone(); // Input filename
|
| let fmt = match args[3].parse::<usize>().expect("FORMAT should be 2017 or 2022") {
|
| 2017 => Format::TwentySeventeen,
|
| 2022 => Format::TwentyTwentyTwo,
|
| _ => {return Err(Box::new(ArgErr("FORMAT should be 2017 or 2022")))}
|
| };
|
| let (sender, receiver) = sync_channel::<Vec<Record>>(1); // Pipe from reader to main
|
| let bufrdr: Box<dyn BufRead + Send>; // Input stream
|
| if infile.contains(".gz") { // Compressed input?
|
| bufrdr = Box::new(BufReader::new(GzDecoder::new(File::open(infile).expect("Couldn't open the input file."))));
|
| }
|
| else { // PLain input.
|
| bufrdr = Box::new(BufReader::new(File::open(infile).expect("Couldn't open the input file.")));
|
| }
|
| spawn(move || {reader(csv::Reader::from_reader(bufrdr), sender, fmt)}); // Start the reader
|
| let db = Connection::open(file)?;
|
| db.execute_batch( // Prepare the database
|
| "PRAGMA journal_mode = MEMORY;
|
| PRAGMA synchronus = 0;
|
| PRAGMA locking_mode = EXCLUSIVE;
|
| PRAGMA page_size = 65536;
|
| VACUUM;
|
| CREATE TABLE IF NOT EXISTS placements (timestamp integer, userhash text, color text, x integer, y integer);
|
| CREATE TABLE IF NOT EXISTS moderation (timestamp integer, userhash text, color text, x1 integer, x2 integer, y1 integer, y2 integer);"
|
| )?;
|
| let mut istmt = db.prepare(format!("INSERT INTO placements VALUES (?, ?, ?, ?, ?)").as_str())?;
|
| let mut modstmt = db.prepare(format!("INSERT INTO moderation VALUES (?, ?, ?, ?, ?, ?, ?)").as_str())?;
|
| let mut beginstmt = db.prepare("BEGIN TRANSACTION")?;
|
| let mut comstmt = db.prepare("COMMIT")?;
|
| let startts = Instant::now(); // Start time
|
| let mut segts = startts.clone(); // Segment time, reset each batch.
|
| let mut i: usize = 0; // Number of records encountered
|
| beginstmt.execute([])?;
|
| for batch in receiver.iter() {
|
| for rcd in batch {
|
| i += 1;
|
| match rcd.coords.len() {
|
| 2 => {istmt.execute(rusqlite::params![rcd.ts, rcd.userhash, rcd.color, rcd.coords[0], rcd.coords[1]])?;} // Normal event
|
| 4 => {modstmt.execute(rusqlite::params![rcd.ts, rcd.userhash, rcd.color, rcd.coords[0], rcd.coords[1], rcd.coords[2], rcd.coords[3]])?;} // Moderation rectangle
|
| x => {
|
| println!("wtf? weird coords length {}", x);
|
| continue;
|
| } // match case x
|
| } // match
|
| } // for each record
|
| comstmt.execute([])?; // Next transaction
|
| beginstmt.execute([])?;
|
| println!("{} transactions completed in {:#?} (this batch: {:#?})", i, Instant::now() - startts, Instant::now() - segts);
|
| segts = Instant::now();
|
| } // for each batch
|
| comstmt.execute([])?; // Commit, report, bail.
|
| println!("{} transactions completed in {:#?} (this batch: {:#?})", i, Instant::now() - startts, Instant::now() - segts);
|
| Ok(())
|
| } // fn main()
|