| | |
|
| | help: remove this `mut`
|
|
|
| warning: variable does not need to be mutable
|
| --> src/parser/nsdl.rs:206:17
|
| |
|
| 206 | let mut bind_values = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
|
| | ----^^^^^^^^^^^
|
| | |
|
| | help: remove this `mut`
|
|
|
| error[E0382]: use of moved value: `tx`
|
| --> src/parser/nsdl.rs:217:13
|
| |
|
| 136 | let mut tx = self
|
| | ------ move occurs because `tx` has type `Transaction<'_, Postgres>`, which does not implement the `Copy` trait
|
| ...
|
| 142 | for line in deque.iter().skip(1) {
|
| | -------------------------------- inside of this loop
|
| ...
|
| 217 | tx.commit()
|
| | ^^ -------- `tx` moved due to this method call, in previous iteration of loop
|
| |
|
| note: `Transaction::<'c, DB>::commit` takes ownership of the receiver `self`, which moves `tx`
|
| --> /root/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sqlx-core-0.7.4/src/transaction.rs:82:29
|
| |
|
| 82 | pub async fn commit(mut self) -> Result<(), Error> {
|
| | ^^^^
|
|
|
| warning: unused import: `Row`
|
| --> src/parser/algorithms.rs:5:20
|
| |
|
| 5 | use sqlx::{PgPool, Row};
|
| | ^^^
|
|
|
| warning: unused import: `Row`
|
|
|
|
|
|
|
| ------------------------------------------------------
|
|
|
| use sqlx::Executor;
|
| use std::collections::{HashMap, VecDeque};
|
| use std::fs::{self, File};
|
| use std::io::{BufRead, BufReader, Cursor, Read};
|
| use std::path::{Path, PathBuf};
|
|
|
| use crate::parser::algorithms::{
|
| build_pg_connection_url, create_nsdl_staging_table, get_col_obj, get_file_obj,
|
| };
|
| use crate::parser::chunk_streamer::LogicalLineReader;
|
| use chrono::{NaiveDate, TimeZone, Utc};
|
| use chrono_tz::Asia::Kolkata;
|
| use pyo3::exceptions::PyRuntimeError;
|
| use pyo3::exceptions::PyValueError;
|
| use pyo3::prelude::*;
|
| use serde::Deserialize;
|
| use serde_json::{from_value, to_value};
|
| use sqlx::query;
|
| use sqlx::{PgPool, Row};
|
| use sqlx::{Postgres, QueryBuilder};
|
| use std::sync::Arc;
|
|
|
| use std::fmt;
|
|
|
| use crate::parser::data_struct::{date_format, ColumnMap, FileProcessor};
|
| use crate::parser::FileParser;
|
| use validator::Validate;
|
|
|
| use async_trait::async_trait;
|
| use sqlx::postgres::PgArguments;
|
| use sqlx::postgres::PgPoolOptions;
|
|
|
| pub struct NSDLParser {
|
| columns: ColumnMap,
|
| file: Option<LogicalLineReader>,
|
| pool: PgPool,
|
| }
|
| #[async_trait]
|
| impl FileParser for NSDLParser {
|
| async fn parse(&mut self) -> PyResult<usize> {
|
| let mut deque: VecDeque<String> = VecDeque::new();
|
| let mut reader = self.file.take().expect("file already taken");
|
| let mut join_set = JoinSet::new();
|
|
|
| create_nsdl_staging_table(&self.pool, "nsdl_staging_table")
|
| .await
|
| .map_err(|e| PyRuntimeError::new_err(format!("DB error: {}", e)))?; // Use QueryBuilder for type-safe insert
|
|
|
| for line_result in reader {
|
| let line = line_result?;
|
| if line.starts_with("01") {
|
| if let Some(last) = deque.back() {
|
| if last.starts_with("01") {
|
| deque.pop_back();
|
| } else if !deque.is_empty() {
|
| dbg!("Processing deque with {} lines", deque.len());
|
| let sub_deque = deque.clone();
|
| let parser = self;
|
| join_set.spawn(async move { parser.parse_subset(&sub_deque).await });
|
| deque.clear();
|
| }
|
| }
|
| }
|
|
|
| deque.push_back(line);
|
| }
|
|
|
| if !deque.is_empty() {
|
| let sub_deque = deque.clone();
|
| let parser = self;
|
| join_set.spawn(async move { parser.parse_subset(&sub_deque).await });
|
| }
|
| let mut merged_total: HashMap<(String, String), i32> = HashMap::new();
|
|
|
| while let Some(res) = join_set.join_next().await {
|
| let submap = res.map_err(|e| PyRuntimeError::new_err(format!("Join error: {e}")))??;
|
| for (k, v) in submap {
|
| *merged_total.entry(k).or_insert(0) += v;
|
| }
|
| }
|
|
|
| Ok(merged_total.len())
|
| }
|
| }
|
|
|
| impl NSDLParser {
|
| pub fn new(folder_path: &Path, columns_path: &Path, pool: PgPool) -> PyResult<Self> {
|
| let columns = get_col_obj(columns_path)?;
|
| let file = Some(LogicalLineReader::new(folder_path)?);
|
| Ok(Self {
|
| columns,
|
| file,
|
| pool,
|
| })
|
| }
|
| fn parse_02_line(&self, line: &str) -> HashMap<String, String> {
|
| self.columns["02"]
|
| .ordered
|
| .iter()
|
| .zip(line.split("##").skip(2).map(|s| s.trim().to_string()))
|
| .filter(|(_, val)| !val.is_empty())
|
| .map(|(col, val)| (col.clone(), val))
|
| .collect()
|
| }
|
|
|
| fn parse_03_line(&self, line: &str) -> HashMap<String, String> {
|
| self.columns["03"]
|
| .ordered
|
| .iter()
|
| .zip(line.split("##").skip(2).map(|s| s.trim().to_string()))
|
| .filter(|(_, val)| !val.is_empty())
|
| .map(|(col, val)| (col.clone(), val))
|
| .collect()
|
| }
|
|
|
| pub async fn parse_subset(
|
| &self,
|
| deque: &VecDeque<String>,
|
| ) -> PyResult<HashMap<(String, String), i32>> {
|
| let mut total_shares: HashMap<(String, String), i32> = HashMap::new();
|
|
|
| if deque.is_empty() {
|
| return Ok(total_shares);
|
| }
|
|
|
| let header = &deque[0];
|
| let parts: Vec<&str> = header.split("##").collect();
|
|
|
| if parts.len() < 5 {
|
| return Err(PyValueError::new_err("Invalid header line"));
|
| }
|
|
|
| let isin = parts[2].trim().to_string();
|
| let date_str = parts[3].trim();
|
|
|
| let mut tx = self
|
| .pool
|
| .begin()
|
| .await
|
| .map_err(|e| PyRuntimeError::new_err(format!("Transaction begin failed: {e}")))?;
|
|
|
| for line in deque.iter().skip(1) {
|
| let kv_pairs = match line.get(..2) {
|
| Some("02") => self.parse_02_line(line),
|
| Some("03") => self.parse_03_line(line),
|
| _ => continue,
|
| };
|
|
|
| let mut kv_pairs = kv_pairs;
|
| kv_pairs.insert("ISIN".to_string(), isin.clone());
|
| kv_pairs.insert("DATE".to_string(), date_str.to_string());
|
|
|
| let json_value = to_value(&kv_pairs).map_err(|_| {
|
| PyRuntimeError::new_err(format!("Failed to serialize row: {:?}", kv_pairs))
|
| })?;
|
|
|
| let mut processor: FileProcessor = match from_value(json_value) {
|
| Ok(fp) => fp,
|
| Err(err) => {
|
| eprintln!("⚠️ Failed to parse FileProcessor: {}", err);
|
| continue;
|
| }
|
| };
|
|
|
| if let Err(validation_err) = processor.validate() {
|
| eprintln!("⚠️ Validation failed: {}", validation_err);
|
| continue;
|
| }
|
|
|
| if let Err(e) = processor.finalize(&self.pool).await {
|
| eprintln!("⚠️ Finalize failed: {}", e);
|
| continue;
|
| }
|
|
|
| // Build insert query
|
| let mut qb: QueryBuilder<Postgres> =
|
| QueryBuilder::new("INSERT INTO nsdl_staging_table (");
|
|
|
| let mut columns = Vec::new();
|
| let mut values = Vec::new();
|
|
|
| if let Ok(val) = to_value(&processor) {
|
| if let Some(obj) = val.as_object() {
|
| for (key, value) in obj.iter() {
|
| columns.push(key.clone());
|
| values.push(value.clone());
|
| }
|
| }
|
| }
|
|
|
| qb.push(
|
| columns
|
| .iter()
|
| .map(|col| format!("\"{}\"", col))
|
| .collect::<Vec<_>>()
|
| .join(", "),
|
| );
|
| qb.push(") VALUES (");
|
|
|
| let mut separated = qb.separated(", ");
|
| for value in &values {
|
| separated.push_bind(value);
|
| }
|
| qb.push(")");
|
|
|
| let mut bind_values = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
|
| println!("SQL: {}", qb.sql());
|
| println!("Bound Values: {:?}", bind_values);
|
|
|
| let query = qb.build(); // ✅ must come BEFORE execution
|
|
|
| // if let Err(e) = query.execute(&mut *tx).await {
|
| // return Err(PyRuntimeError::new_err(format!("⚠️ Insert failed: {}", e)));
|
| // }
|
|
|
| tx.commit()
|
| .await
|
| .map_err(|e| PyRuntimeError::new_err(format!("Commit failed: {e}")))?;
|
| }
|
| Ok(total_shares)
|
| }
|
| }
|
| #[cfg(test)]
|
| mod tests {
|
| use super::*;
|
| use sqlx::postgres::PgPoolOptions;
|
| use std::path::Path;
|
|
|
| #[tokio::test]
|
| async fn test_nsdl_parser_reads_chunks_correctly() {
|
| pyo3::prepare_freethreaded_python();
|
|
|
| let folder = Path::new("/app/temp/aa30b366-2eb7-48e6-a029-d63a05e0686f");
|
| let cols_path = Path::new("/app/database/backend/cols/nsdl_cols.json");
|
| let database_url = build_pg_connection_url();
|
| let pool = PgPoolOptions::new()
|
| .max_connections(5)
|
| .connect(&database_url)
|
| .await
|
| .expect("Failed to connect to DB");
|
|
|
| let mut nsdl =
|
| NSDLParser::new(folder, cols_path, pool).expect("Failed to create NSDLParser");
|
|
|
| let result = nsdl.parse().await.expect("Failed to parse file");
|
|
|
| println!("Parsed {} record groups", result);
|
| assert!(result > 0);
|
| }
|
| }
|