| @staticmethod
|
| def spot_caller(image_: ImageFile, **kwargs) -> Tuple[Any]:
|
| """ Reads the image and normalizes project array of the image.
|
| Creates processors to call the spot calculation methods
|
| separately. Svg is not recommended for linux machines.
|
|
|
| Args:
|
|
|
| image_ (ImageFile): ImageFile instance.
|
| **kwargs: Keyword variables.
|
|
|
| Returns:
|
|
|
| spots (DataFrame): The spots in the image.
|
| concat_df (DataFrame): Pandas objects along a particular axis.
|
| projected_norm (Array): Normalized project image.
|
|
|
| Raises:
|
|
|
| CountAgenError: Raised when the processes are facing difficulties.
|
| """
|
| output_folder: str = kwargs.get('output_folder')
|
| extension_format: str = kwargs.get('extension_format', 'png')
|
| pic = tff.imread(image_.previewUrl)
|
|
|
| if kwargs.get('z_index') is not None:
|
| image_ = SpotDetection.image_modifier(
|
| image_=image_, z_stack=kwargs.get('z_index'))
|
|
|
| projected = np.max(pic, axis=image_.z_stack)
|
| projected_norm = np.zeros(projected.shape)
|
| spots = []
|
| previewUrl = []
|
|
|
| SpotDetection.update_status(
|
| image_.name, projected.shape[0], "Initiated")
|
|
|
| with ProcessPoolExecutor(max_workers=4) as executors:
|
| futures = {executors.submit(
|
| SpotDetection._wrapper, index, projected[index], **kwargs):
|
| index for index in range(projected.shape[0])}
|
| for future in as_completed(futures):
|
| try:
|
| index, image, image_norm, spots_df = future.result()
|
| SpotDetection.update_status(
|
| image_.name, projected.shape[0], "Calculation Done", index=index)
|
| projected_norm[index] = image_norm
|
| spots_df['channel'] = index
|
| spots.append(spots_df)
|
| if kwargs.get('plot_blob', False):
|
|
|
| SpotDetection.update_status(
|
| image_.name, projected.shape[0], "Drawing graph", index=index)
|
|
|
| url = SpotDetection.plot_results(
|
| index, image, image_name=image_.name,
|
| spots_=spots_df, output_folder=output_folder,
|
| extension_format=extension_format)
|
|
|
| SpotDetection.update_status(
|
| image_.name, projected.shape[0], "Graph drown", index=index)
|
|
|
| previewUrl.append(url)
|
| except Exception as exc:
|
| raise CountAgenError(
|
| f"Could not run all the operations for channel {index} ") from exc
|
|
|
| concat_df = pd.concat(spots)
|
| return spots, concat_df, projected_norm, previewUrl
|
| """
|
| Using fastapis to call the core script for producing and
|
|
|
| """
|
| from dataclasses import asdict
|
| from typing import List
|
| from fastapi import FastAPI, HTTPException, Path
|
| from pydantic import BaseModel
|
| import uvicorn
|
| from src.spot_detection import SpotDetection, CountAgenCreationError, ImageFile
|
|
|
|
|
| class Item(BaseModel):
|
| """
|
| Base model for JSON request.
|
| """
|
| name: str
|
| channels: int
|
| z_stack: int
|
| x_dimension: int
|
| y_dimension: int
|
| previewUrl: str
|
|
|
|
|
| app = FastAPI()
|
| spot_detection = SpotDetection(None, None)
|
|
|
|
|
| @app.post("/files/run/")
|
| async def run_img_spotter(item: Item, threshold: float = 1.4):
|
| """
|
| Return the image created for the counted number of spots.
|
|
|
| Args:
|
|
|
| item (Item) : Item object containing
|
| - name: str
|
| - channels: int
|
| - z_stack: int
|
| - x_dimension: int
|
| - y_dimension: int
|
| - previewUrl: str
|
|
|
| threshold (float): Value of threshold. Default : 1.4
|
|
|
| Returns:
|
|
|
| dictionary containing image properties and link to generated graphs.
|
| """
|
| spot_detection.threshold = threshold
|
| image_object = ImageFile(name=item.name, channels=item.channels, x_dimension=item.x_dimension,
|
| y_dimension=item.y_dimension, z_stack=item.z_stack, previewUrl=item.previewUrl)
|
| kwargs = {
|
| "output_folder": spot_detection.output_img_dir,
|
| "plot_blob": True
|
| }
|
| try:
|
| *_, prev = spot_detection.spot_caller(image_object, **kwargs)
|
| except Exception as exc:
|
| raise HTTPException(
|
| status_code=500, detail=str(exc)) from exc
|
| return {"items": [{**asdict(image_object), "graphs": prev}]}
|