use std::collections::{HashMap, VecDeque}; use std::fs::{self, File}; use std::io::{BufRead, BufReader, Cursor, Read}; use std::path::PathBuf; use chrono::{NaiveDate, TimeZone, Utc}; use chrono_tz::Asia::Kolkata; use pyo3::prelude::*; use serde::Deserialize; use serde_json::{from_value, to_value}; use sqlx::PgPool; use crate::parser::data_struct::{FileProcessor, ColumnMap}; use crate::parser::FileParser; use validator::Validate; use sqlx::postgres::PgArguments; use sqlx::QueryBuilder; // async fn bulk_insert( // pool: &PgPool, // table: &str, // columns: &[String], // rows: &[Vec], // ) -> Result<(), sqlx::Error> { // let mut qb = QueryBuilder::new(format!("INSERT INTO {} (", table)); // qb.push(columns.join(", ")); // qb.push(") VALUES "); // let mut separated = qb.separated(", "); // for row in rows { // separated.push("("); // separated.push_bind_iter(row); // separated.push(")"); // } // let query = qb.build(); // query.execute(pool).await?; // Ok(()) // } pub struct NSDLParser { columns: ColumnMap, file: Cursor>, } impl FileParser for NSDLParser { fn parse(&self) -> PyResult { let mut reader = BufReader::new(&self.file); // now directly uses original file cursor let mut records: Vec<(Vec>, String, String)> = Vec::new(); let mut deque: VecDeque = VecDeque::new(); for line in reader.lines() { let line = line?; if line.starts_with("01") { if let Some(last) = deque.back() { if last.starts_with("01") { deque.pop_back(); } else if !deque.is_empty() { println!("Processing deque with {} lines", deque.len()); let (rows, isin, date) = self.parse_subset(&deque)?; records.push((rows, isin, date)); deque.clear(); } } } deque.push_back(line); } if !deque.is_empty() { let (rows, isin, date) = self.parse_subset(&deque)?; records.push((rows, isin, date)); } Ok(records.len()) } } impl NSDLParser { pub fn new(folder_path: PathBuf, columns_path: PathBuf) -> PyResult { let columns = ::get_col_obj(&columns_path)?; let file = ::get_file_obj(&folder_path)?; Ok(Self { columns, file }) } fn parse_subset( &self, deque: &VecDeque, ) -> PyResult<(Vec>, String, String)> { if deque.is_empty() { return Ok((vec![], "".to_string(), "".to_string())); } let header = &deque[0]; let parts: Vec<&str> = header.split("##").collect(); if parts.len() < 5 { return Err(pyo3::exceptions::PyValueError::new_err( "Invalid header line", )); } let isin = parts[2].trim().to_string(); let date = parts[3].trim().to_string(); let mut rows: Vec> = Vec::new(); for line in deque.iter().skip(1) { if line.starts_with("02") { rows.push(self.parse_02_line(line)); } else if line.starts_with("03") { rows.push(self.parse_03_line(line)); } } Ok((rows, isin, date)) } fn parse_02_line(&self, line: &str) -> Vec { line.split("##").map(|s| s.trim().to_string()).collect() } fn parse_03_line(&self, line: &str) -> Vec { line.split("##").map(|s| s.trim().to_string()).collect() } fn execute(&self, isin: String, date: String, rows: Vec>, row_type: &str) { let column_spec = self.columns.get(row_type).expect("Invalid row_type"); let ordered_columns = &column_spec.ordered; let mut valid_rows: Vec> = Vec::new(); for row in rows { let map: HashMap = ordered_columns .iter() .cloned() .zip(row.iter().cloned()) .collect(); let maybe_valid = to_value(&map) .ok() .and_then(|v| from_value::(v).ok()) .and_then(|fp| fp.validate().ok().map(|_| fp)); if let Some(_) = maybe_valid { let row_values = ordered_columns .iter() .map(|key| map.get(key).cloned().unwrap_or_default()) .collect::>(); valid_rows.push(row_values); } else { eprintln!("❌ Skipping invalid row: {:?}", row); } } for mut row in valid_rows { row.push(isin.clone()); row.push(date.clone()); let mut all_columns = ordered_columns.clone(); all_columns.push("ISIN".to_string()); all_columns.push("DATE".to_string()); let col_list = all_columns.join(", "); let val_list = row .iter() .map(|val| format!("'{}'", val.replace('\'', "''"))) .collect::>() .join(", "); let query = format!("INSERT INTO records ({}) VALUES ({});", col_list, val_list); println!("{}", query); } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_nsdl_parser_reads_chunks_correctly() { pyo3::prepare_freethreaded_python(); let folder = PathBuf::from("/app/temp/c2925146-94ed-4778-a013-10dd547f1828"); let cols_path = PathBuf::from("/app/backend/database/backend/cols/nsdl_cols.json"); let nsdl = NSDLParser::new(folder, cols_path).expect("Failed to create NSDLParser"); let result = nsdl.parse().expect("Failed to parse file"); println!("Parsed {} record groups", result); assert!(result > 0); } }