Source code for fhirpack.extraction.base
import chunk
import math
import json
from typing import Union
import time
import requests
from tqdm import tqdm
from typing import Tuple
from dicomweb_client.api import DICOMwebClient
from fhirpy.lib import SyncFHIRResource
from fhirpy.lib import SyncFHIRReference
import fhirpack
from fhirpack.constants import CONFIG
# TODO build dinamically from metadata/capability statement
SEARCH_PARAMS = {
"Condition": [ # https://www.hl7.org/fhir/condition.html
"_content",
"_id",
"_sort",
"_include",
"code",
"identifier",
"patient",
"subject",
"recorded-date__eq",
"recorded-date__lt",
"recorded-date__gt",
"recorded-date__ge",
],
"EpisodeOfCare": [
"_id",
"_content",
"_sort",
"_include",
"code",
"identifier",
"patient",
],
"Encounter": [
"_id",
"_content",
"_sort",
"_include",
"code",
"identifier",
"subject",
],
"DiagnosticReport": [
"_id",
"_content",
"_sort",
"_include",
"code",
"identifier",
"subject",
"issued",
"category",
"encounter",
"issued__lt",
"issued__gt",
"issued__ge",
"date__lt",
"date__gt",
"date__ge",
],
"DocumentReference": [
"_id",
"_content",
"_sort",
"_include",
"code",
"identifier",
"subject",
"category",
"date",
"contenttype",
"description",
"location",
"patient",
"related",
"relatesto",
"status",
"type",
],
"FamilyMemberHistory": [
"_content",
"_id",
"_sort",
"_include",
"code",
"identifier",
"patient",
"_content",
],
"MedicationAdministration": [
"_content",
"_id",
"_sort",
"_include",
"code",
"identifier",
"subject",
],
"MedicationRequest": [
"_id",
"_content",
"_sort",
"_include",
"code",
"identifier",
"subject",
],
"Observation": [
"_content",
"_id",
"_sort",
"_include",
"code",
"_count",
"__count",
"count",
"identifier",
"patient",
],
"Patient": [
"_content",
"_id",
"_sort",
"_include",
"code",
"identifier",
"given",
"family",
"name",
"link",
"link:missing",
],
"ImagingStudy": [
"_content",
"_id",
"_sort",
"_include",
"code",
"identifier",
"subject",
"endpoint:missing",
"shipProcedureCode",
],
"Procedure": [
"_id",
"_content",
"_sort",
"_include",
"code",
"identifier",
"subject",
],
"List": ["_id", "_content", "_sort", "_include", "code", "identifier"],
}
META_RESOURCE_TYPES = {"RootPatient": "Patient", "LinkedPatient": "Patient"}
# first key is source resourcetype
# second key is destination resourcetype
# field is by what to search in destination resourcetype
# path is where to find the values to search in field
# path: None means id
SEARCH_ATTRIBUTES = {
"Patient": {
"Condition": {"field": "subject", "path": None},
"DiagnosticReport": {"field": "subject", "path": None},
"EpisodeOfCare": {"field": "patient", "path": None},
"Encounter": {"field": "patient", "path": None},
"FamilyMemberHistory": {"field": "_content", "path": None},
"ImagingStudy": {"field": "subject", "path": None},
"List": {"field": "", "path": None},
"MedicationAdministration": {"field": "subject", "path": None},
"MedicationRequest": {"field": "subject", "path": None},
"Observation": {"field": "patient", "path": None},
"Procedure": {"field": "patient", "path": None},
"RootPatient": {"field": "link", "path": None},
"LinkedPatient": {"field": "_id", "path": None},
},
"RootPatient": {
"Patient": {"field": "_id", "path": "link.other"},
"LinkedPatient": {"field": "_id", "path": "link.other"},
# "LinkedPatient": {"field": "_id", "path": None}
},
"LinkedPatient": {
# "Patient": {"field": "_id", "path": "link.other"}
# "RootPatient": {"field": "_id", "path": None},
"Patient": {"field": "_id", "path": "id"}
},
"ImagingStudy": {"Patient": {"field": "_id", "path": "subject"}},
"Condition": {
"Patient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"Encounter": {"field": "_id", "path": "encounter"},
},
"Observation": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
"Encounter": {"field": "_id", "path": "encounter"},
},
"MedicationAdministration": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"DiagnosticReport": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"AllergyIntolerance": {"Patient": {"field": "_id", "path": "patient"}},
"CarePlan": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"CarePlan": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"Claim": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"Encounter": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"EpisodeOfCare": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"Goal": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"Immunization": {
"Patient": {"field": "_id", "path": "patient"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
"Procedure": {
"Patient": {"field": "_id", "path": "subject"},
"RootPatient": {"field": "_id", "path": "subject"},
"LinkedPatient": {"field": "_id", "path": "subject"},
},
}
[docs]class BaseExtractorMixin:
[docs] def getReferences(
self,
input: Union[
list[str],
list[SyncFHIRReference],
list[SyncFHIRResource],
] = None,
params: dict = None,
ignoreFrame: bool = False,
raw: bool = False,
):
"""Gets all the References found in the input Resources
Args:
input (Union[list[str], list[SyncFHIRReference], list[SyncFHIRResource], None]): Input resources. Defaults to None.
params (Union[dict, None]): Additional parameters. Defaults to None.
ignoreFrame (Union[bool, None]): Whether to ignore the frame. Defaults to False.
raw (Union[bool, None]): If True, the method will return the raw result. Defaults to False.
"""
params = {} if params is None else params
if not input and self.isFrame:
input = self.data
pass
elif input and not self.isFrame:
input = self.prepareOperationInput(input, SyncFHIRReference)
pass
elif input and self.isFrame:
# TODO raise error references and isFrame not allowed
# TODO raise in other similar methods
raise NotImplementedError
if not raw:
result = self.prepareOutput(input, resourceType="Reference")
return result
[docs] def getResources(
self,
input: Union[
list[str],
list[SyncFHIRReference],
list[SyncFHIRResource],
] = None,
searchParams: dict = None,
params: dict = None,
resourceType: str = None,
metaResourceType: str = None,
ignoreFrame: bool = False,
raw: bool = False,
progressSuffix: str = "",
):
"""This method retrieves FHIR resources based on the provided resource type.
Args:
input (Union[list[str], list[SyncFHIRReference], list[SyncFHIRResource], None]): IDs, references or resources of the desired FHIR resources.
searchParams (dict): FHIR search parameters to execute a search. Only valid FHIR parameters for the specified resource type can be used. This can not be combined with input.
params (Union[dict, None]): Additional parameters.
resourceType (Union[str, None]): Type of the desired FHIR resources.
metaResourceType (Union[str, None]): Used to avoid conflicts when non-standard fhir resources are used.
ignoreFrame (Union[str, None]): True when data inside the Frame object should be ignored.
raw (Union[str, None]): True when the raw output should be returned.
Returns:
Frame: Frame object containing the desired FHIR resources.
"""
if metaResourceType is None:
metaResourceType = resourceType
searchActive = False if searchParams is None else True
searchParams = {} if searchParams is None else searchParams
params = {} if params is None else params
input = [] if input is None else input
searchValues = []
result = []
if len(input):
inputReprs = set()
uniqueInput = []
for i in input:
r = repr(i)
if r not in inputReprs:
uniqueInput.append(i)
inputReprs.update([r])
input = uniqueInput
for element in tqdm(
input, desc=f"GET[{metaResourceType}]{progressSuffix}> ", leave=False
):
element = self.castOperand(element, SyncFHIRResource, resourceType)
result.extend(element)
elif self.isFrame and not ignoreFrame:
# utils.validateFrame(self)
input = self.data
# source Type is the type of resources contained in a frame
sourceType = self.resourceType
# the target type is the desired resource type
# getPatients().getConditions() -> "Patient" source, "Condition" target
targetType = resourceType
# handles
# pack.getReferences().getResources
if targetType is None:
if "/" not in self.data.values[0] and resourceType is None:
resourceType = self.resourceType
return self.getResources(self.data.values, resourceType=resourceType)
field, basePath = self.getConversionPath(
sourceType=sourceType, targetType=metaResourceType
)
path = "id" if basePath is None else f"{basePath}.id"
searchValues = self.gatherSimplePaths(
[path], columns=["searchValue"]
).dropna()
if not searchValues.size:
path = f"{basePath}.reference"
searchValues = self.gatherSimplePaths([path], columns=["searchValue"])
if (
searchValues["searchValue"].apply(type).astype(str) == "<class 'list'>"
).any():
searchValues = searchValues.explode("searchValue")
searchValues = searchValues.dropna()
if "reference" in path:
searchValues = searchValues["searchValue"].str.split("/").str[-1]
else:
searchValues = searchValues["searchValue"].values
elif searchActive:
# getResources is for getting/searching known resources
# delegate to search for special handling
return self.searchResources(
input=input,
searchParams=searchParams,
params=params,
ignoreFrame=ignoreFrame,
resourceType=resourceType,
)
n = len(searchValues)
chunkSize = 100
nChunks = math.ceil(n / chunkSize)
i, j = 0, 0
total = []
while j < n:
j = j + chunkSize if j + chunkSize < n else n
searchValuesChunk = searchValues[i:j]
searchValuesChunk = ",".join(searchValuesChunk)
searchParams.update({field: searchValuesChunk})
result += self.searchResources(
searchParams=searchParams,
resourceType=resourceType,
raw=True,
progressSuffix=f"({math.ceil(j/chunkSize)}/{nChunks})",
)
i = i + chunkSize
if not raw:
indexList = []
result = self.prepareOutput(result, resourceType=resourceType)
input, result = self.attachOperandIds(self, result, metaResourceType)
return result
[docs] def searchResources(
self,
input: Union[
list[str],
list[SyncFHIRReference],
list[SyncFHIRResource],
] = None,
searchParams: dict = None,
params: dict = None,
resourceType: str = None,
metaResourceType: str = None,
ignoreFrame: bool = True,
raw: bool = False,
progressSuffix: str = "",
):
"""Execute a FHIR search based on the provided resource type and search parameters.
Args:
input (Union[ list[str], list[SyncFHIRReference], list[SyncFHIRResource], ): Not implemented.
searchParams (Union[dict, None]): FHIR search parameters to execute a search.
params (Union[dict, None]): Additional parameters.
resourceType (Union[str, None]): Type of the desired FHIR resource.
metaResourceType (Union[str, None]): Used to avoid conflicts when non-standard fhir resources are used.
ignoreFrame (Union[str, None]): True when data inside the Frame object should be ignored.
raw (Union[str, None]): True when the raw output should be returned.
Returns:
Frame: Frame object containing the desired FHIR resources.
"""
self.authenticate()
if metaResourceType is None:
metaResourceType = resourceType
searchActive = False if searchParams is None else True
searchParams = {} if searchParams is None else searchParams
params = {} if params is None else params
input = [] if input is None else input
# TODO: evaluate the usefulness of search parameter restrictions
# if searchParams:
# invalidsearchParams = set(searchParams.keys()) - set(
# SEARCH_PARAMS[resourceType]
# )
# if invalidsearchParams:
# raise Exception(f"non allowed search parameters {invalidsearchParams}")
if len(input):
raise NotImplementedError
elif self.isFrame and not ignoreFrame:
raise NotImplementedError
elif searchActive:
pass
resourcePageSize = 1000
search = (
self.client.resources(resourceType)
.limit(resourcePageSize)
.search(**searchParams)
)
result = []
resourceCount = 0
nonEmptyBundle = bool(len(search.limit(1).fetch()))
if nonEmptyBundle:
try:
resourceCount = search.limit(1).fetch_raw().get("total", None)
except:
# server doesn't support _total parameter nor returns total
# element in each request https://build.fhir.org/bundle.html#searchset
pass
if not resourceCount:
resourceCount = search.count()
for element in tqdm(
search,
desc=f"SEARCH[{metaResourceType}]{progressSuffix}> ",
total=resourceCount,
leave=False,
):
result.append(element)
# unlike iterating over search,
# fetch returns a bundle (page)
# result.extend(search.fetch())
if not raw:
result = self.prepareOutput(result, resourceType)
input, result = self.attachOperandIds(self, result, metaResourceType)
return result
[docs] def getConversionPath(self, sourceType: str, targetType: str) -> Tuple[str, str]:
"""Retrieve the needed FHIR searchParams (field) and the
respective path for a {sourceType:targetType} pair from the handler ditcionary
Args:
sourceType (str): Resource type the method is operating on.
targetType (str): Desired Resource type.
Returns:
Tuple(str, str): Field and path in the handler dictionary.
"""
sourceDict = SEARCH_ATTRIBUTES.get(sourceType, {})
targetDict = sourceDict.get(targetType, {})
field, path = targetDict.get("field"), targetDict.get("path")
if field: # and path:
return field, path
else:
aliasSourceDict = SEARCH_ATTRIBUTES.get(META_RESOURCE_TYPES[sourceType], {})
aliasTargetDict = aliasSourceDict.get(targetType, {})
aliasField, aliasPath = aliasTargetDict.get("field"), targetDict.get("path")
if aliasField:
return aliasField, aliasPath
raise RuntimeError(
f"No handler for source {sourceType} and target {targetType}"
)
[docs] def getAbsolutePaths(
self,
paths: list[str],
input: Union[
list[str],
list[SyncFHIRReference],
list[SyncFHIRResource],
] = None,
searchParams: dict = None,
params: dict = None,
):
searchActive = False if searchParams is None else True
searchParams = {} if searchParams is None else searchParams
params = {} if params is None else params
input = [] if input is None else input
# invalidsearchParams = None
# if searchParams:
# invalidsearchParams = set(searchParams.keys()) - set(
# base.SEARCH_PARAMS["MedicationAdministration"]
# )
# if invalidsearchParams:
# raise Exception(f"non allowed search parameters {invalidsearchParams}")
if not input and self.isFrame:
input = self.data
elif input and not self.isFrame:
raise NotImplementedError
elif input and self.isFrame:
raise NotImplementedError
if self.resourceTypeIs("patient"):
searchParams["subject"] = ",".join([e.id for e in self.data])
else:
raise NotImplementedError
finalResults = {}
# TODO move allowed absolute paths allowed for patients somewhere else
# these relative paths are allowed because they reference a subject or patient
relativePaths = {
"Appointment": [],
"CarePlan": [],
"ClinicalImpression": [],
"Condition": [],
"DiagnosticReport": [],
"DocumentReference": [],
"Encounter": [],
"EpisodeOfCare": [],
"ImagingStudy": [],
"Immunization": [],
"List": [],
"MedicationRequest": [],
"MedicationStatement": [],
"Observation": [],
"Procedure": [],
"QuestionnaireResponse": [],
"ServiceRequest": [],
# 'BiologicallyDerivedProduct':[],
# 'DocumentReference':[],
# 'FamilyMemberHistory':[],
# 'Media':[],
# 'Medication':[],
# 'MedicationAdministration':[],
# 'Organization':[],
# 'Patient':[],
# 'Practitioner':[],
# 'Specimen':[],
# 'Substance':[]
}
paths = [e.split(".") for e in sorted(paths)]
for absp in paths:
relativePaths[absp[0]].append(absp[1:])
resourceType = self.resourceType
for resourceType, relpaths in relativePaths.items():
if not relpaths:
continue
result = self.searchResources(
resourceType=resourceType, searchParams=searchParams
)
n = len(result)
filteredResults = []
# TODO handle multiple filters
# filter = filter.popitem()
filteredResults = result.gatherSimplePaths([".".join(e) for e in relpaths])
filteredResults.columns = [
resourceType + "." + key for key in filteredResults.columns
]
# filteredRecord.update({f"{resourceType}.{filter[0]}": filter[1]})
# filteredResults.append(filteredRecord)
finalResults[resourceType] = filteredResults
return finalResults
[docs] def getURLBytes(
self,
input: list[str] = None,
operateOnCol: str = "data",
resultInCol: str = None,
params: dict = {},
):
params = {} if params is None else params
input = [] if input is None else input
if not input and self.isFrame:
if operateOnCol:
input = self[operateOnCol].values
elif self.resourceTypeIs("DiagnosticReport"):
input = self.gatherSimplePaths(["presentedForm.url"])
else:
raise NotImplementedError
elif input and not self.isFrame:
raise NotImplementedError
elif input and self.isFrame:
raise NotImplementedError
results = []
for i, url in zip(range(len(input)), input):
response = requests.get(
url,
headers=self.client._build_request_headers(),
stream=True,
)
data = bytearray()
if not response.ok:
# TODO log to fhirpack.log
data = None
# raise Exception(f"{response}")
else:
for block in response.iter_content(1024):
data.extend(block)
if not block:
break
time.sleep(0.5)
results.append(data)
if resultInCol:
result = self.assign(**{resultInCol: results})
else:
result = self.prepareOutput(results, "Binary")
return result
[docs] def getFromFiles(self, input: list[str]):
"""Generate a Frame object from FHIR resources stored in json files.
Args:
input: List of files containing json fhir resources.
Returns:
Frame: Frame object storing the FHIR resources.
"""
pathsData = []
for iPath in input:
with open(iPath, "r") as f:
rawJson = json.load(f)
fileData = []
if isinstance(rawJson, list):
fileData.extend(rawJson)
elif rawJson["resourceType"] == "Bundle":
for r in rawJson["entry"]:
fileData.append(r["resource"])
else:
fileData.append(rawJson)
pathsData.extend(fileData)
for element in pathsData:
if element["resourceType"] != pathsData[0]["resourceType"]:
raise TypeError("All resources have to be of the same type.")
result = [
SyncFHIRResource(self.client, e["resourceType"], **e) for e in pathsData
]
result = self.prepareOutput(result)
return result
[docs] def getDICOMInstances(
self,
input: list[str] = None,
operateOnCol: str = "data",
resultInCol: str = None,
params: dict = None,
inPlace: dict = False,
):
params = {} if params is None else params
input = [] if input is None else input
if not input and self.isFrame:
if self.resourceTypeIs("ImagingStudy"):
input = self
else:
raise NotImplementedError
elif input and not self.isFrame:
raise NotImplementedError
elif input and self.isFrame:
raise NotImplementedError
result = []
for i, series, study, endpoint in input[
["series", "study", "endpoint"]
].itertuples():
client = DICOMwebClient(
endpoint,
headers={
"Authorization": f"Bearer {CONFIG.get('EXTRACTION_BASE_TOKEN_DICOM')}"
},
)
instances = list(client.iter_series(study, series))
result.append(instances)
if inPlace:
self.data = result
result = self
else:
result = self.prepareOutput(result)
return result