New paste Repaste Download
|             |
    |             help: remove this `mut`
warning: variable does not need to be mutable
   --> src/parser/nsdl.rs:206:17
    |
206 |             let mut bind_values = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
    |                 ----^^^^^^^^^^^
    |                 |
    |                 help: remove this `mut`
error[E0382]: use of moved value: `tx`
   --> src/parser/nsdl.rs:217:13
    |
136 |         let mut tx = self
    |             ------ move occurs because `tx` has type `Transaction<'_, Postgres>`, which does not implement the `Copy` trait
...
142 |         for line in deque.iter().skip(1) {
    |         -------------------------------- inside of this loop
...
217 |             tx.commit()
    |             ^^ -------- `tx` moved due to this method call, in previous iteration of loop
    |
note: `Transaction::<'c, DB>::commit` takes ownership of the receiver `self`, which moves `tx`
   --> /root/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sqlx-core-0.7.4/src/transaction.rs:82:29
    |
82  |     pub async fn commit(mut self) -> Result<(), Error> {
    |                             ^^^^
warning: unused import: `Row`
--> src/parser/algorithms.rs:5:20
  |
5 | use sqlx::{PgPool, Row};
  |                    ^^^
warning: unused import: `Row`
------------------------------------------------------
use sqlx::Executor;
use std::collections::{HashMap, VecDeque};
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Cursor, Read};
use std::path::{Path, PathBuf};
use crate::parser::algorithms::{
    build_pg_connection_url, create_nsdl_staging_table, get_col_obj, get_file_obj,
};
use crate::parser::chunk_streamer::LogicalLineReader;
use chrono::{NaiveDate, TimeZone, Utc};
use chrono_tz::Asia::Kolkata;
use pyo3::exceptions::PyRuntimeError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use serde::Deserialize;
use serde_json::{from_value, to_value};
use sqlx::query;
use sqlx::{PgPool, Row};
use sqlx::{Postgres, QueryBuilder};
use std::sync::Arc;
use std::fmt;
use crate::parser::data_struct::{date_format, ColumnMap, FileProcessor};
use crate::parser::FileParser;
use validator::Validate;
use async_trait::async_trait;
use sqlx::postgres::PgArguments;
use sqlx::postgres::PgPoolOptions;
pub struct NSDLParser {
    columns: ColumnMap,
    file: Option<LogicalLineReader>,
    pool: PgPool,
}
#[async_trait]
impl FileParser for NSDLParser {
    async fn parse(&mut self) -> PyResult<usize> {
        let mut deque: VecDeque<String> = VecDeque::new();
        let mut reader = self.file.take().expect("file already taken");
        let mut join_set = JoinSet::new();
        create_nsdl_staging_table(&self.pool, "nsdl_staging_table")
            .await
            .map_err(|e| PyRuntimeError::new_err(format!("DB error: {}", e)))?; // Use QueryBuilder for type-safe insert
        for line_result in reader {
            let line = line_result?;
            if line.starts_with("01") {
                if let Some(last) = deque.back() {
                    if last.starts_with("01") {
                        deque.pop_back();
                    } else if !deque.is_empty() {
                        dbg!("Processing deque with {} lines", deque.len());
                        let sub_deque = deque.clone();
                        let parser = self;
                        join_set.spawn(async move { parser.parse_subset(&sub_deque).await });
                        deque.clear();
                    }
                }
            }
            deque.push_back(line);
        }
        if !deque.is_empty() {
            let sub_deque = deque.clone();
            let parser = self;
            join_set.spawn(async move { parser.parse_subset(&sub_deque).await });
        }
        let mut merged_total: HashMap<(String, String), i32> = HashMap::new();
        while let Some(res) = join_set.join_next().await {
            let submap = res.map_err(|e| PyRuntimeError::new_err(format!("Join error: {e}")))??;
            for (k, v) in submap {
                *merged_total.entry(k).or_insert(0) += v;
            }
        }
        Ok(merged_total.len())
    }
}
impl NSDLParser {
    pub fn new(folder_path: &Path, columns_path: &Path, pool: PgPool) -> PyResult<Self> {
        let columns = get_col_obj(columns_path)?;
        let file = Some(LogicalLineReader::new(folder_path)?);
        Ok(Self {
            columns,
            file,
            pool,
        })
    }
    fn parse_02_line(&self, line: &str) -> HashMap<String, String> {
        self.columns["02"]
            .ordered
            .iter()
            .zip(line.split("##").skip(2).map(|s| s.trim().to_string()))
            .filter(|(_, val)| !val.is_empty())
            .map(|(col, val)| (col.clone(), val))
            .collect()
    }
    fn parse_03_line(&self, line: &str) -> HashMap<String, String> {
        self.columns["03"]
            .ordered
            .iter()
            .zip(line.split("##").skip(2).map(|s| s.trim().to_string()))
            .filter(|(_, val)| !val.is_empty())
            .map(|(col, val)| (col.clone(), val))
            .collect()
    }
    pub async fn parse_subset(
        &self,
        deque: &VecDeque<String>,
    ) -> PyResult<HashMap<(String, String), i32>> {
        let mut total_shares: HashMap<(String, String), i32> = HashMap::new();
        if deque.is_empty() {
            return Ok(total_shares);
        }
        let header = &deque[0];
        let parts: Vec<&str> = header.split("##").collect();
        if parts.len() < 5 {
            return Err(PyValueError::new_err("Invalid header line"));
        }
        let isin = parts[2].trim().to_string();
        let date_str = parts[3].trim();
        let mut tx = self
            .pool
            .begin()
            .await
            .map_err(|e| PyRuntimeError::new_err(format!("Transaction begin failed: {e}")))?;
        for line in deque.iter().skip(1) {
            let kv_pairs = match line.get(..2) {
                Some("02") => self.parse_02_line(line),
                Some("03") => self.parse_03_line(line),
                _ => continue,
            };
            let mut kv_pairs = kv_pairs;
            kv_pairs.insert("ISIN".to_string(), isin.clone());
            kv_pairs.insert("DATE".to_string(), date_str.to_string());
            let json_value = to_value(&kv_pairs).map_err(|_| {
                PyRuntimeError::new_err(format!("Failed to serialize row: {:?}", kv_pairs))
            })?;
            let mut processor: FileProcessor = match from_value(json_value) {
                Ok(fp) => fp,
                Err(err) => {
                    eprintln!("⚠️ Failed to parse FileProcessor: {}", err);
                    continue;
                }
            };
            if let Err(validation_err) = processor.validate() {
                eprintln!("⚠️ Validation failed: {}", validation_err);
                continue;
            }
            if let Err(e) = processor.finalize(&self.pool).await {
                eprintln!("⚠️ Finalize failed: {}", e);
                continue;
            }
            // Build insert query
            let mut qb: QueryBuilder<Postgres> =
                QueryBuilder::new("INSERT INTO nsdl_staging_table (");
            let mut columns = Vec::new();
            let mut values = Vec::new();
            if let Ok(val) = to_value(&processor) {
                if let Some(obj) = val.as_object() {
                    for (key, value) in obj.iter() {
                        columns.push(key.clone());
                        values.push(value.clone());
                    }
                }
            }
            qb.push(
                columns
                    .iter()
                    .map(|col| format!("\"{}\"", col))
                    .collect::<Vec<_>>()
                    .join(", "),
            );
            qb.push(") VALUES (");
            let mut separated = qb.separated(", ");
            for value in &values {
                separated.push_bind(value);
            }
            qb.push(")");
            let mut bind_values = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
            println!("SQL: {}", qb.sql());
            println!("Bound Values: {:?}", bind_values);
            let query = qb.build(); // ✅ must come BEFORE execution
            // if let Err(e) = query.execute(&mut *tx).await {
            //     return Err(PyRuntimeError::new_err(format!("⚠️ Insert failed: {}", e)));
            // }
            tx.commit()
                .await
                .map_err(|e| PyRuntimeError::new_err(format!("Commit failed: {e}")))?;
        }
        Ok(total_shares)
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use sqlx::postgres::PgPoolOptions;
    use std::path::Path;
    #[tokio::test]
    async fn test_nsdl_parser_reads_chunks_correctly() {
        pyo3::prepare_freethreaded_python();
        let folder = Path::new("/app/temp/aa30b366-2eb7-48e6-a029-d63a05e0686f");
        let cols_path = Path::new("/app/database/backend/cols/nsdl_cols.json");
        let database_url = build_pg_connection_url();
        let pool = PgPoolOptions::new()
            .max_connections(5)
            .connect(&database_url)
            .await
            .expect("Failed to connect to DB");
        let mut nsdl =
            NSDLParser::new(folder, cols_path, pool).expect("Failed to create NSDLParser");
        let result = nsdl.parse().await.expect("Failed to parse file");
        println!("Parsed {} record groups", result);
        assert!(result > 0);
    }
}
Filename: None. Size: 10kb. View raw, , hex, or download this file.

This paste expires on 2025-07-18 17:13:58.577309. Pasted through web.