New paste Repaste Download
use std::collections::{HashMap, VecDeque};
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Cursor, Read};
use std::path::PathBuf;
use chrono::{NaiveDate, TimeZone, Utc};
use chrono_tz::Asia::Kolkata;
use pyo3::prelude::*;
use serde::Deserialize;
use serde_json::{from_value, to_value};
use sqlx::PgPool;
use crate::parser::data_struct::{FileProcessor, ColumnMap};
use crate::parser::FileParser;
use validator::Validate;
use sqlx::postgres::PgArguments;
use sqlx::QueryBuilder;
// async fn bulk_insert(
//     pool: &PgPool,
//     table: &str,
//     columns: &[String],
//     rows: &[Vec<String>],
// ) -> Result<(), sqlx::Error> {
//     let mut qb = QueryBuilder::new(format!("INSERT INTO {} (", table));
//     qb.push(columns.join(", "));
//     qb.push(") VALUES ");
//     let mut separated = qb.separated(", ");
//     for row in rows {
//         separated.push("(");
//         separated.push_bind_iter(row);
//         separated.push(")");
//     }
//     let query = qb.build();
//     query.execute(pool).await?;
//     Ok(())
// }
pub struct NSDLParser {
    columns: ColumnMap,
    file: Cursor<Vec<u8>>,
}
impl FileParser for NSDLParser {
    fn parse(&self) -> PyResult<usize> {
        let mut reader = BufReader::new(&self.file); // now directly uses original file cursor
        let mut records: Vec<(Vec<Vec<String>>, String, String)> = Vec::new();
        let mut deque: VecDeque<String> = VecDeque::new();
        for line in reader.lines() {
            let line = line?;
            if line.starts_with("01") {
                if let Some(last) = deque.back() {
                    if last.starts_with("01") {
                        deque.pop_back();
                    } else if !deque.is_empty() {
                        println!("Processing deque with {} lines", deque.len());
                        let (rows, isin, date) = self.parse_subset(&deque)?;
                        records.push((rows, isin, date));
                        deque.clear();
                    }
                }
            }
            deque.push_back(line);
        }
        if !deque.is_empty() {
            let (rows, isin, date) = self.parse_subset(&deque)?;
            records.push((rows, isin, date));
        }
        Ok(records.len())
    }
}
impl NSDLParser {
    pub fn new(folder_path: PathBuf, columns_path: PathBuf) -> PyResult<Self> {
        let columns = <Self as FileParser>::get_col_obj(&columns_path)?;
        let file = <Self as FileParser>::get_file_obj(&folder_path)?;
        Ok(Self { columns, file })
    }
    fn parse_subset(
        &self,
        deque: &VecDeque<String>,
    ) -> PyResult<(Vec<Vec<String>>, String, String)> {
        if deque.is_empty() {
            return Ok((vec![], "".to_string(), "".to_string()));
        }
        let header = &deque[0];
        let parts: Vec<&str> = header.split("##").collect();
        if parts.len() < 5 {
            return Err(pyo3::exceptions::PyValueError::new_err(
                "Invalid header line",
            ));
        }
        let isin = parts[2].trim().to_string();
        let date = parts[3].trim().to_string();
        let mut rows: Vec<Vec<String>> = Vec::new();
        for line in deque.iter().skip(1) {
            if line.starts_with("02") {
                rows.push(self.parse_02_line(line));
            } else if line.starts_with("03") {
                rows.push(self.parse_03_line(line));
            }
        }
        Ok((rows, isin, date))
    }
    fn parse_02_line(&self, line: &str) -> Vec<String> {
        line.split("##").map(|s| s.trim().to_string()).collect()
    }
    fn parse_03_line(&self, line: &str) -> Vec<String> {
        line.split("##").map(|s| s.trim().to_string()).collect()
    }
    fn execute(&self, isin: String, date: String, rows: Vec<Vec<String>>, row_type: &str) {
        let column_spec = self.columns.get(row_type).expect("Invalid row_type");
        let ordered_columns = &column_spec.ordered;
        let mut valid_rows: Vec<Vec<String>> = Vec::new();
        for row in rows {
            let map: HashMap<String, String> = ordered_columns
                .iter()
                .cloned()
                .zip(row.iter().cloned())
                .collect();
            let maybe_valid = to_value(&map)
                .ok()
                .and_then(|v| from_value::<FileProcessor>(v).ok())
                .and_then(|fp| fp.validate().ok().map(|_| fp));
            if let Some(_) = maybe_valid {
                let row_values = ordered_columns
                    .iter()
                    .map(|key| map.get(key).cloned().unwrap_or_default())
                    .collect::<Vec<String>>();
                valid_rows.push(row_values);
            } else {
                eprintln!("❌ Skipping invalid row: {:?}", row);
            }
        }
        for mut row in valid_rows {
            row.push(isin.clone());
            row.push(date.clone());
            let mut all_columns = ordered_columns.clone();
            all_columns.push("ISIN".to_string());
            all_columns.push("DATE".to_string());
            let col_list = all_columns.join(", ");
            let val_list = row
                .iter()
                .map(|val| format!("'{}'", val.replace('\'', "''")))
                .collect::<Vec<_>>()
                .join(", ");
            let query = format!("INSERT INTO records ({}) VALUES ({});", col_list, val_list);
            println!("{}", query);
        }
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn test_nsdl_parser_reads_chunks_correctly() {
        pyo3::prepare_freethreaded_python();
        let folder = PathBuf::from("/app/temp/c2925146-94ed-4778-a013-10dd547f1828");
        let cols_path = PathBuf::from("/app/backend/database/backend/cols/nsdl_cols.json");
        let nsdl = NSDLParser::new(folder, cols_path).expect("Failed to create NSDLParser");
        let result = nsdl.parse().expect("Failed to parse file");
        println!("Parsed {} record groups", result);
        assert!(result > 0);
    }
}
Filename: None. Size: 6kb. View raw, , hex, or download this file.

This paste expires on 2025-07-12 21:09:15.840641. Pasted through web.