| use std::collections::{HashMap, VecDeque};
|
| use std::fs::{self, File};
|
| use std::io::{BufRead, BufReader, Cursor, Read};
|
| use std::path::PathBuf;
|
|
|
| use chrono::{NaiveDate, TimeZone, Utc};
|
| use chrono_tz::Asia::Kolkata;
|
| use pyo3::prelude::*;
|
|
|
| use serde::Deserialize;
|
| use serde_json::{from_value, to_value};
|
|
|
| use sqlx::PgPool;
|
|
|
| use crate::parser::data_struct::{FileProcessor, ColumnMap};
|
| use crate::parser::FileParser;
|
| use validator::Validate;
|
|
|
| use sqlx::postgres::PgArguments;
|
| use sqlx::QueryBuilder;
|
|
|
| // async fn bulk_insert(
|
| // pool: &PgPool,
|
| // table: &str,
|
| // columns: &[String],
|
| // rows: &[Vec<String>],
|
| // ) -> Result<(), sqlx::Error> {
|
| // let mut qb = QueryBuilder::new(format!("INSERT INTO {} (", table));
|
| // qb.push(columns.join(", "));
|
| // qb.push(") VALUES ");
|
|
|
| // let mut separated = qb.separated(", ");
|
| // for row in rows {
|
| // separated.push("(");
|
| // separated.push_bind_iter(row);
|
| // separated.push(")");
|
| // }
|
|
|
| // let query = qb.build();
|
| // query.execute(pool).await?;
|
| // Ok(())
|
| // }
|
|
|
| pub struct NSDLParser {
|
| columns: ColumnMap,
|
| file: Cursor<Vec<u8>>,
|
| }
|
| impl FileParser for NSDLParser {
|
| fn parse(&self) -> PyResult<usize> {
|
| let mut reader = BufReader::new(&self.file); // now directly uses original file cursor
|
|
|
| let mut records: Vec<(Vec<Vec<String>>, String, String)> = Vec::new();
|
| let mut deque: VecDeque<String> = VecDeque::new();
|
|
|
| for line in reader.lines() {
|
| let line = line?;
|
|
|
| if line.starts_with("01") {
|
| if let Some(last) = deque.back() {
|
| if last.starts_with("01") {
|
| deque.pop_back();
|
| } else if !deque.is_empty() {
|
| println!("Processing deque with {} lines", deque.len());
|
| let (rows, isin, date) = self.parse_subset(&deque)?;
|
| records.push((rows, isin, date));
|
| deque.clear();
|
| }
|
| }
|
| }
|
|
|
| deque.push_back(line);
|
| }
|
|
|
| if !deque.is_empty() {
|
| let (rows, isin, date) = self.parse_subset(&deque)?;
|
| records.push((rows, isin, date));
|
| }
|
|
|
| Ok(records.len())
|
| }
|
| }
|
|
|
| impl NSDLParser {
|
| pub fn new(folder_path: PathBuf, columns_path: PathBuf) -> PyResult<Self> {
|
| let columns = <Self as FileParser>::get_col_obj(&columns_path)?;
|
| let file = <Self as FileParser>::get_file_obj(&folder_path)?;
|
|
|
| Ok(Self { columns, file })
|
| }
|
| fn parse_subset(
|
| &self,
|
| deque: &VecDeque<String>,
|
| ) -> PyResult<(Vec<Vec<String>>, String, String)> {
|
| if deque.is_empty() {
|
| return Ok((vec![], "".to_string(), "".to_string()));
|
| }
|
|
|
| let header = &deque[0];
|
| let parts: Vec<&str> = header.split("##").collect();
|
|
|
| if parts.len() < 5 {
|
| return Err(pyo3::exceptions::PyValueError::new_err(
|
| "Invalid header line",
|
| ));
|
| }
|
|
|
| let isin = parts[2].trim().to_string();
|
| let date = parts[3].trim().to_string();
|
| let mut rows: Vec<Vec<String>> = Vec::new();
|
|
|
| for line in deque.iter().skip(1) {
|
| if line.starts_with("02") {
|
| rows.push(self.parse_02_line(line));
|
| } else if line.starts_with("03") {
|
| rows.push(self.parse_03_line(line));
|
| }
|
| }
|
|
|
| Ok((rows, isin, date))
|
| }
|
|
|
| fn parse_02_line(&self, line: &str) -> Vec<String> {
|
| line.split("##").map(|s| s.trim().to_string()).collect()
|
| }
|
|
|
| fn parse_03_line(&self, line: &str) -> Vec<String> {
|
| line.split("##").map(|s| s.trim().to_string()).collect()
|
| }
|
|
|
| fn execute(&self, isin: String, date: String, rows: Vec<Vec<String>>, row_type: &str) {
|
| let column_spec = self.columns.get(row_type).expect("Invalid row_type");
|
| let ordered_columns = &column_spec.ordered;
|
| let mut valid_rows: Vec<Vec<String>> = Vec::new();
|
|
|
| for row in rows {
|
| let map: HashMap<String, String> = ordered_columns
|
| .iter()
|
| .cloned()
|
| .zip(row.iter().cloned())
|
| .collect();
|
|
|
| let maybe_valid = to_value(&map)
|
| .ok()
|
| .and_then(|v| from_value::<FileProcessor>(v).ok())
|
| .and_then(|fp| fp.validate().ok().map(|_| fp));
|
|
|
| if let Some(_) = maybe_valid {
|
| let row_values = ordered_columns
|
| .iter()
|
| .map(|key| map.get(key).cloned().unwrap_or_default())
|
| .collect::<Vec<String>>();
|
|
|
| valid_rows.push(row_values);
|
| } else {
|
| eprintln!("❌ Skipping invalid row: {:?}", row);
|
| }
|
| }
|
|
|
| for mut row in valid_rows {
|
| row.push(isin.clone());
|
| row.push(date.clone());
|
|
|
| let mut all_columns = ordered_columns.clone();
|
| all_columns.push("ISIN".to_string());
|
| all_columns.push("DATE".to_string());
|
|
|
| let col_list = all_columns.join(", ");
|
| let val_list = row
|
| .iter()
|
| .map(|val| format!("'{}'", val.replace('\'', "''")))
|
| .collect::<Vec<_>>()
|
| .join(", ");
|
|
|
| let query = format!("INSERT INTO records ({}) VALUES ({});", col_list, val_list);
|
| println!("{}", query);
|
| }
|
| }
|
| }
|
|
|
| #[cfg(test)]
|
| mod tests {
|
| use super::*;
|
|
|
| #[test]
|
| fn test_nsdl_parser_reads_chunks_correctly() {
|
| pyo3::prepare_freethreaded_python();
|
| let folder = PathBuf::from("/app/temp/c2925146-94ed-4778-a013-10dd547f1828");
|
| let cols_path = PathBuf::from("/app/backend/database/backend/cols/nsdl_cols.json");
|
| let nsdl = NSDLParser::new(folder, cols_path).expect("Failed to create NSDLParser");
|
|
|
| let result = nsdl.parse().expect("Failed to parse file");
|
|
|
| println!("Parsed {} record groups", result);
|
| assert!(result > 0);
|
| }
|
| }
|