Source code for fhirpack.load.base

import json
from typing import Union
import os

from fhirpy.lib import SyncFHIRResource
from fhirpy.lib import SyncFHIRReference
import numpy as np
import fhirpack.utils as utils

# LOGGER = CONFIG.getLogger(__name__)


[docs]class BaseLoaderMixin: # TODO rename sendResourcesToJSON
[docs] def sendResourcesToFiles( self, # TODO if None save to os.getcwd paths: list[str] = None, input: Union[ list[str], list[SyncFHIRReference], list[SyncFHIRResource], ] = None, combine: bool = False, ): """Serializes the input to JSON and saves it to the specified paths. Args: paths (Union[list[str], None]): Paths to save the JSON files to. Defaults to None. input (Union[list[str], list[SyncFHIRReference], list[SyncFHIRResource], None]): Input to serialize. Defaults to None. combine (Union[str, None]): If True, all input is combined into one file. Defaults to False. Raises: NotImplementedError: If input is a DataFrame and ignoreFrame is False. Exception: If the number of paths and the number of input blobs do not match. Returns: list[bool]: List of booleans indicating whether the operation was successful. """ result = [] if not input and self.isFrame: input = self.data if not paths: paths = self.paths elif input and not self.isFrame: input = self.prepareOperationInput(input, SyncFHIRResource) elif input and self.isFrame: # TODO raise error references and isFrame not allowed raise NotImplementedError n = len(input) paths = [os.path.abspath(e) for e in paths] if len(paths) == 1 and combine: with open(paths[0], "w", encoding="utf-8") as f: sobj = json.dumps( input.apply(lambda x: x.serialize()).to_list(), indent=4, sort_keys=True, ) f.write(sobj) elif len(paths) == len(input): pass else: raise Exception("number of paths and number of data blobs must be equal") successes = np.full(n, True) if not combine: for i, res, path in zip(range(n), input, paths): try: with open(path, "a+", encoding="utf-8") as f: sobj = json.dumps(res.serialize(), indent=4, sort_keys=True) f.write(sobj) except Exception as e: # TODO raise or log? successes[i] = False return successes
[docs] def sendBytesToFile( self, input: list[bytearray] = None, paths: list[str] = None, guessExtension: bool = False, combine: bool = False, params: dict = None, ): """Saves the input to the specified paths. Args: input (Union[list[bytearray], None]): Input to save. Defaults to None. paths (Union[list[str], None]): Paths to save the input to. Defaults to None. guessExtension (Union[str, None]): If True, the extension is guessed from the first 50 bytes of the input. Defaults to False. combine (Union[str, None]): If True, all input is combined into one file. Defaults to False. params (Union[dict, None]): Additional parameters. Defaults to None. Raises: NotImplementedError: If input is a DataFrame and ignoreFrame is False. Exception: If the number of paths and the number of input blobs do not match. Returns: list[bool]: List of booleans indicating whether the operation was successful. """ if not params: params = {} if not input and self.isFrame: input = self.data.values paths = self.path.values elif input and not self.isFrame: pass elif input and self.isFrame: raise NotImplementedError n = len(input) if len(paths) == 1 and combine: paths = [paths[0]] * n elif len(paths) == len(input): pass else: raise Exception("number of paths and number of data blobs must be equal") successes = np.full(n, True) for i, data, path in zip(range(n), input, paths): try: extension = utils.guessBufferMIMEType(bytes(data[:50])) abspath = os.path.abspath(path) if guessExtension: abspath += "." + extension with open(abspath, "wb+") as f: f.write(data) except Exception as e: successes[i] = False # TODO raise or log? return successes
[docs] def sendToTable( self, input: Union[ list[str], list[SyncFHIRReference], list[SyncFHIRResource], ] = None, # combine: bool = False, params: dict = None, ignoreFrame: bool = False, fileType: str = "csv", # TODO enforce existence of paths in checks paths: list[str] = None, ): """Saves the input in csv format to the specified paths. Args: input (Union[ list[str], list[SyncFHIRReference], list[SyncFHIRResource], None]): Input to save. Defaults to None. params (Union[dict, None]): Additional parameters. Defaults to None. ignoreFrame (Union[str, None]): If True, the input is ignored if the object is a DataFrame. Defaults to False. fileType (Union[str, None]): The file type to save the input as. Defaults to "csv". paths (Union[list[str], None]): Paths to save the input to. Defaults to None. Raises: NotImplementedError: If input is a DataFrame and ignoreFrame is False. NotImplementedError: If the number of paths and the number of input blobs do not match. ValueError: If fileType is not csv or xls. NotImplementedError: If more than one path is specified. """ params = {} if params is None else params input = [] if input is None else input paths = [] if paths is None else paths result = [] if len(input): raise NotImplementedError # input = self.castOperand(input, SyncFHIRReference, "replace") # result = self.getResources(input, resourceType="replace", raw=True) try: if self.isFrame and not ignoreFrame: input = self if len(paths) > 1: raise NotImplementedError if fileType not in ["csv", "xls"]: raise ValueError(str(fileType) + " not supported. Use csv, xls.") path = f"{paths[0]}.{fileType}" if "/" in path: os.makedirs(os.path.dirname(path), exist_ok=True) else: path = f"./{path}" if fileType == "xls": raise ValueError("XLS not yet supported.") # TODO: disabled until the env has an xls engine # inFrame.to_excel(path, engine="xlsxwriter", sheet_name="cohort") elif fileType == "csv": input.to_csv(path_or_buf=path, index=False) else: raise NotImplementedError else: raise NotImplementedError result = [True] except Exception as e: raise e result = self.prepareOutput(result) return result
[docs] def sendDICOMToFiles( self, input: Union[ list[str], list[SyncFHIRReference], list[SyncFHIRResource], ] = None, combine: bool = False, params: dict = None, ignoreFrame: bool = False, ): """Saves the input in the dicom format to the specified paths. Args: input (Union[ list[str], list[SyncFHIRReference], list[SyncFHIRResource], None]): Input to save. Defaults to None. combine (Union[str, None]): If True, all input is combined into one file. Defaults to False. params (Union[dict, None]): Additional parameters. Defaults to None. ignoreFrame (Union[str, None]): If True, the input is ignored if the object is a DataFrame. Defaults to False. Raises: NotImplementedError: If input is a DataFrame and ignoreFrame is False. Exception: If the number of paths and the number of input blobs do not match. NotImplementedError: If more than one path is specified. NotImplementedError: If the resource type is not ImagingStudy. """ params = {} if params is None else params input = [] if input is None else input result = [] if len(input): raise NotImplementedError # input = self.castOperand(input, SyncFHIRReference, "replace") # result = self.getResources(input, resourceType="replace", raw=True) elif self.isFrame and not ignoreFrame: input = self paths = self.path.values n = len(input) if len(paths) == 1 and combine: paths = [paths[0]] * n elif len(paths) == len(input): pass else: raise Exception( "number of paths and number of data blobs must be equal" ) if self.resourceTypeIs("ImagingStudy"): input.path.apply(lambda x: os.makedirs(x, exist_ok=True)) result = input.apply( lambda x: x.data.save_as(x.path + "/" + x.data.SOPInstanceUID), axis=1, ) else: raise NotImplementedError else: raise NotImplementedError result = self.prepareOutput(result) return result