| | | help: remove this `mut` warning: variable does not need to be mutable --> src/parser/nsdl.rs:206:17 | 206 | let mut bind_values = values.iter().map(|v| v.to_string()).collect::>(); | ----^^^^^^^^^^^ | | | help: remove this `mut` error[E0382]: use of moved value: `tx` --> src/parser/nsdl.rs:217:13 | 136 | let mut tx = self | ------ move occurs because `tx` has type `Transaction<'_, Postgres>`, which does not implement the `Copy` trait ... 142 | for line in deque.iter().skip(1) { | -------------------------------- inside of this loop ... 217 | tx.commit() | ^^ -------- `tx` moved due to this method call, in previous iteration of loop | note: `Transaction::<'c, DB>::commit` takes ownership of the receiver `self`, which moves `tx` --> /root/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sqlx-core-0.7.4/src/transaction.rs:82:29 | 82 | pub async fn commit(mut self) -> Result<(), Error> { | ^^^^ warning: unused import: `Row` --> src/parser/algorithms.rs:5:20 | 5 | use sqlx::{PgPool, Row}; | ^^^ warning: unused import: `Row` ------------------------------------------------------ use sqlx::Executor; use std::collections::{HashMap, VecDeque}; use std::fs::{self, File}; use std::io::{BufRead, BufReader, Cursor, Read}; use std::path::{Path, PathBuf}; use crate::parser::algorithms::{ build_pg_connection_url, create_nsdl_staging_table, get_col_obj, get_file_obj, }; use crate::parser::chunk_streamer::LogicalLineReader; use chrono::{NaiveDate, TimeZone, Utc}; use chrono_tz::Asia::Kolkata; use pyo3::exceptions::PyRuntimeError; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use serde::Deserialize; use serde_json::{from_value, to_value}; use sqlx::query; use sqlx::{PgPool, Row}; use sqlx::{Postgres, QueryBuilder}; use std::sync::Arc; use std::fmt; use crate::parser::data_struct::{date_format, ColumnMap, FileProcessor}; use crate::parser::FileParser; use validator::Validate; use async_trait::async_trait; use sqlx::postgres::PgArguments; use sqlx::postgres::PgPoolOptions; pub struct NSDLParser { columns: ColumnMap, file: Option, pool: PgPool, } #[async_trait] impl FileParser for NSDLParser { async fn parse(&mut self) -> PyResult { let mut deque: VecDeque = VecDeque::new(); let mut reader = self.file.take().expect("file already taken"); let mut join_set = JoinSet::new(); create_nsdl_staging_table(&self.pool, "nsdl_staging_table") .await .map_err(|e| PyRuntimeError::new_err(format!("DB error: {}", e)))?; // Use QueryBuilder for type-safe insert for line_result in reader { let line = line_result?; if line.starts_with("01") { if let Some(last) = deque.back() { if last.starts_with("01") { deque.pop_back(); } else if !deque.is_empty() { dbg!("Processing deque with {} lines", deque.len()); let sub_deque = deque.clone(); let parser = self; join_set.spawn(async move { parser.parse_subset(&sub_deque).await }); deque.clear(); } } } deque.push_back(line); } if !deque.is_empty() { let sub_deque = deque.clone(); let parser = self; join_set.spawn(async move { parser.parse_subset(&sub_deque).await }); } let mut merged_total: HashMap<(String, String), i32> = HashMap::new(); while let Some(res) = join_set.join_next().await { let submap = res.map_err(|e| PyRuntimeError::new_err(format!("Join error: {e}")))??; for (k, v) in submap { *merged_total.entry(k).or_insert(0) += v; } } Ok(merged_total.len()) } } impl NSDLParser { pub fn new(folder_path: &Path, columns_path: &Path, pool: PgPool) -> PyResult { let columns = get_col_obj(columns_path)?; let file = Some(LogicalLineReader::new(folder_path)?); Ok(Self { columns, file, pool, }) } fn parse_02_line(&self, line: &str) -> HashMap { self.columns["02"] .ordered .iter() .zip(line.split("##").skip(2).map(|s| s.trim().to_string())) .filter(|(_, val)| !val.is_empty()) .map(|(col, val)| (col.clone(), val)) .collect() } fn parse_03_line(&self, line: &str) -> HashMap { self.columns["03"] .ordered .iter() .zip(line.split("##").skip(2).map(|s| s.trim().to_string())) .filter(|(_, val)| !val.is_empty()) .map(|(col, val)| (col.clone(), val)) .collect() } pub async fn parse_subset( &self, deque: &VecDeque, ) -> PyResult> { let mut total_shares: HashMap<(String, String), i32> = HashMap::new(); if deque.is_empty() { return Ok(total_shares); } let header = &deque[0]; let parts: Vec<&str> = header.split("##").collect(); if parts.len() < 5 { return Err(PyValueError::new_err("Invalid header line")); } let isin = parts[2].trim().to_string(); let date_str = parts[3].trim(); let mut tx = self .pool .begin() .await .map_err(|e| PyRuntimeError::new_err(format!("Transaction begin failed: {e}")))?; for line in deque.iter().skip(1) { let kv_pairs = match line.get(..2) { Some("02") => self.parse_02_line(line), Some("03") => self.parse_03_line(line), _ => continue, }; let mut kv_pairs = kv_pairs; kv_pairs.insert("ISIN".to_string(), isin.clone()); kv_pairs.insert("DATE".to_string(), date_str.to_string()); let json_value = to_value(&kv_pairs).map_err(|_| { PyRuntimeError::new_err(format!("Failed to serialize row: {:?}", kv_pairs)) })?; let mut processor: FileProcessor = match from_value(json_value) { Ok(fp) => fp, Err(err) => { eprintln!("⚠️ Failed to parse FileProcessor: {}", err); continue; } }; if let Err(validation_err) = processor.validate() { eprintln!("⚠️ Validation failed: {}", validation_err); continue; } if let Err(e) = processor.finalize(&self.pool).await { eprintln!("⚠️ Finalize failed: {}", e); continue; } // Build insert query let mut qb: QueryBuilder = QueryBuilder::new("INSERT INTO nsdl_staging_table ("); let mut columns = Vec::new(); let mut values = Vec::new(); if let Ok(val) = to_value(&processor) { if let Some(obj) = val.as_object() { for (key, value) in obj.iter() { columns.push(key.clone()); values.push(value.clone()); } } } qb.push( columns .iter() .map(|col| format!("\"{}\"", col)) .collect::>() .join(", "), ); qb.push(") VALUES ("); let mut separated = qb.separated(", "); for value in &values { separated.push_bind(value); } qb.push(")"); let mut bind_values = values.iter().map(|v| v.to_string()).collect::>(); println!("SQL: {}", qb.sql()); println!("Bound Values: {:?}", bind_values); let query = qb.build(); // ✅ must come BEFORE execution // if let Err(e) = query.execute(&mut *tx).await { // return Err(PyRuntimeError::new_err(format!("⚠️ Insert failed: {}", e))); // } tx.commit() .await .map_err(|e| PyRuntimeError::new_err(format!("Commit failed: {e}")))?; } Ok(total_shares) } } #[cfg(test)] mod tests { use super::*; use sqlx::postgres::PgPoolOptions; use std::path::Path; #[tokio::test] async fn test_nsdl_parser_reads_chunks_correctly() { pyo3::prepare_freethreaded_python(); let folder = Path::new("/app/temp/aa30b366-2eb7-48e6-a029-d63a05e0686f"); let cols_path = Path::new("/app/database/backend/cols/nsdl_cols.json"); let database_url = build_pg_connection_url(); let pool = PgPoolOptions::new() .max_connections(5) .connect(&database_url) .await .expect("Failed to connect to DB"); let mut nsdl = NSDLParser::new(folder, cols_path, pool).expect("Failed to create NSDLParser"); let result = nsdl.parse().await.expect("Failed to parse file"); println!("Parsed {} record groups", result); assert!(result > 0); } }