'''
Provide a series of tools for ICD-9/10 codes.
(1) extract ICD codes given string terms.
(2) get parental and children nodes for ICD-9 codes.
'''
import requests
import os
import json
import wget
import pdb
import pandas as pd
import networkx as nx
NIH_API_PREFIX_ICD10 = 'https://clinicaltables.nlm.nih.gov/api/icd10cm/v3/search?sf=code,name&terms='
NIH_API_PREFIX_ICD9_DX = 'https://clinicaltables.nlm.nih.gov/api/icd9cm_dx/v3/search?sf=code,long_name&terms='
NIH_API_PREFIX_ICD9_SG = 'https://clinicaltables.nlm.nih.gov/api/icd9cm_sg/v3/search?sf=code,long_name&terms='
NIH_API_PREFIX_ICD9_CONDITION = 'https://clinicaltables.nlm.nih.gov/api/conditions/v3/search?df=primary_name&terms='
NIH_API_PREFIX_ICD10_CONDITION = 'https://clinicaltables.nlm.nih.gov/api/conditions/v3/search?df=term_icd10cm_codes,primary_name&terms='
ICD9_SG_URL = 'https://github.com/RyanWangZf/PyTrial/raw/main/resources/CMS32_DESC_LONG_SHORT_SG.xlsx'
ICD9_DX_URL = 'https://github.com/RyanWangZf/PyTrial/raw/main/resources/CMS32_DESC_LONG_SHORT_DX.xlsx'
ICD9_GRAPH_URL = 'https://storage.googleapis.com/pytrial/resources/icd-9-hierarchy.json'
[docs]def get_icd10_from_nih(term):
'''
Query related ICD-10 codes for input terms.
Parameters
----------
term: str or list[str]
Disease names or a list of disease names.
Returns
-------
Outputs ICD codes: list[str] or list[list[str]]
'''
if isinstance(term, str):
terms = [term]
else:
terms = term
outputs = []
for term_ in terms:
url = NIH_API_PREFIX_ICD10 + term_
response = requests.get(url)
text = response.text
if text == '[0,[],null,[]]':
outputs.append(None)
continue
text = text[1:-1]
idx1 = text.find('[')
idx2 = text.find(']')
codes = text[idx1+1:idx2].split(',')
codes = [i[1:-1] for i in codes]
outputs.append(codes)
if isinstance(term, str):
outputs = outputs[0]
return outputs
[docs]def get_icd9dx_from_nih(term):
'''
Query related ICD-9-CM diagnosis codes for input terms.
Parameters
----------
term: str or list[str]
Disease names or a list of disease names.
Returns
-------
Outputs ICD codes: list[str] or list[list[str]]
'''
if isinstance(term, str):
terms = [term]
else:
terms = term
outputs = []
for term_ in terms:
url = NIH_API_PREFIX_ICD9_DX + term_
response = requests.get(url)
text = response.text
if text == '[0,[],null,[]]':
outputs.append(None)
continue
text = text[1:-1]
idx1 = text.find('[')
idx2 = text.find(']')
codes = text[idx1+1:idx2].split(',')
codes = [i[1:-1] for i in codes]
outputs.append(codes)
if isinstance(term, str):
outputs = outputs[0]
return outputs
[docs]def get_icd9sg_from_nih(term):
'''
Query related ICD-9-CM procedure codes for input terms.
Parameters
----------
term: str or list[str]
Disease names or a list of disease names.
Returns
-------
Outputs ICD codes: list[str] or list[list[str]]
'''
if isinstance(term, str):
terms = [term]
else:
terms = term
outputs = []
for term_ in terms:
url = NIH_API_PREFIX_ICD9_SG + term_
response = requests.get(url)
text = response.text
if text == '[0,[],null,[]]':
outputs.append(None)
continue
text = text[1:-1]
idx1 = text.find('[')
idx2 = text.find(']')
codes = text[idx1+1:idx2].split(',')
codes = [i[1:-1] for i in codes]
outputs.append(codes)
if isinstance(term, str):
outputs = outputs[0]
return outputs
[docs]def get_condition_synonym_from_nih(term):
'''
Query relevant medical conditions taking input symptoms/diseases using API: https://clinicaltables.nlm.nih.gov/apidoc/conditions/v3/doc.html
Parameters
----------
term: str or list[str]
Disease names or a list of disease names.
Returns
-------
Outputs ICD codes: list[str] or list[list[str]]
'''
if isinstance(term, str):
terms = [term]
else:
terms = term
outputs = []
for term_ in terms:
url = NIH_API_PREFIX_ICD9_CONDITION + term_
response = requests.get(url)
text = response.text
if text == '[0,[],null,[]]':
outputs.append(None)
continue
text = text[1:-1]
idx1 = text.find('[')
idx2 = text.find(']')
text = text[idx2+1:]
idx1 = text.find('[')
names = text[idx1+1:-1].split(',')
names = [n[2:-2].lower() for n in names]
outputs.append(names)
if isinstance(term, str):
outputs = outputs[0]
return outputs
class ICDGraphBase:
'''
The base class for ICD9/10 graph.
'''
def __init__(self, filename):
hierarchy = json.loads(open(filename, 'r').read())
self.graph = nx.readwrite.json_graph.tree_graph(
hierarchy['tree'],
attrs={'id':'id', 'children':'children', 'description':'description'}
)
def children(self, code):
'''Return children nodes of code.
Returns
-------
children: list[dict]
The children codes and their descriptions.
'''
code = code.replace('.','')
node_list = list(self.graph.successors(code))
return_list = []
for node in node_list:
return_list.append(self.__getitem__(node))
return return_list
def parent(self, code):
'''Return the parent node of code.
Returns
-------
parent: dict
The parent code and its descriptions.
'''
code = code.replace('.','')
node_list = list(self.graph.predecessors(code))
return_list = []
for node in node_list:
return_list.append(self.__getitem__(node))
return return_list[0]
def siblings(self, code):
'''Return sibling nodes of code.
Returns
-------
siblings: list[dict]
The sibling codes and their descriptions.
'''
code = code.replace('.', '')
parentnode = self.parent(code)['code']
return self.children(parentnode)
@property
def nodes(self):
return self.graph.nodes
@property
def edges(self):
return self.graph.edges
@property
def nxgraph(self):
return self.graph
def __getitem__(self, code):
'''Return the description dict of the codes.
'''
code = code.replace('.','')
return_dict = self.graph.nodes()[code]
return_dict['code'] = code
return return_dict
[docs]class ICD9Graph(ICDGraphBase):
'''
Get an ICD-9 knowledge graph to query parental and children nodes for each code.
Returns
-------
self.graph: nx.DiGraph
The hierarchy of ICD codes stored as graph in networkx.
self.codes: list[str]
All the unique codes.
'''
def __init__(self, input_dir=None):
if input_dir is None:
input_dir = './resources/icd9'
if not os.path.exists(input_dir):
os.makedirs(input_dir)
# download the ICD9 hierarchy
wget.download(ICD9_GRAPH_URL, input_dir)
filename = os.path.join(input_dir, 'icd-9-hierarchy.json')
super().__init__(filename)
[docs]class ICD10Graph(ICDGraphBase):
'''
Get an ICD-10 knowledge graph to query parental and children nodes for each code.
Parameters
----------
input_dir: str
The dir that stores the hierarchy files.
version: {'2022', '2021','2020','2019'}
The version of ICD-10 codes.
Returns
-------
self.graph: nx.DiGraph
The hierarchy of ICD codes stored as graph in networkx.
self.codes: list[str]
All the unique codes.
'''
def __init__(self, input_dir=None, version='2021'):
if input_dir is None:
input_dir = './resources/icd10'
if not os.path.exists(input_dir):
os.makedirs(input_dir)
# download the ICD10 hierarchy
url = f'https://github.com/icd-codex/icd-codex/raw/dev/icdcodex/data/icd-10-{version}-hierarchy.json'
wget.download(url, input_dir)
filename = os.path.join(input_dir, f'icd-10-{version}-hierarchy.json')
super().__init__(filename)
[docs]class ICD9_DX_VOC:
'''
Get a vocabulary containing the mapping of ICD9 Diagnosis code and its names.
Parameters
----------
input_dir: str
The dir that stores ICD9-dx file.
'''
def __init__(self, input_dir='./resources') -> None:
url = ICD9_DX_URL
filename = os.path.join(input_dir, 'CMS32_DESC_LONG_SHORT_DX.xlsx')
if not os.path.exists(filename):
# download to disk
wget.download(url, out=filename)
self.df = pd.read_excel(filename, dtype={'DIAGNOSIS CODE':str})
def __getitem__(self, code):
index = self.df['DIAGNOSIS CODE'].isin([code])
sum_index = index.sum()
if sum_index == 0:
return None
else:
return self.df[index]['LONG DESCRIPTION'].tolist()[0]
[docs] def code2desc(self, code):
'''
Get description of codes.
Parameters
----------
code: str or List[str]
The input icd code or list of codes.
'''
if isinstance(code, str):
code = [code]
res = []
for code_ in code:
res.append(self.__getitem__(code_))
if len(res) == 1:
return res[0]
else:
return res
[docs]class ICD9_SG_VOC:
'''
Get a vocabulary containing the mapping of ICD9 procedure code and its names.
Parameters
----------
input_dir: str
The dir that stores ICD9-sg file.
'''
def __init__(self, input_dir='./resources') -> None:
url = ICD9_SG_URL
filename = os.path.join(input_dir, 'CMS32_DESC_LONG_SHORT_SG.xlsx')
if not os.path.exists(filename):
# download to disk
wget.download(url, out=filename)
self.df = pd.read_excel(filename, dtype={'PROCEDURE CODE':str})
def __getitem__(self, code):
index = self.df['PROCEDURE CODE'].isin([code])
sum_index = index.sum()
if sum_index == 0:
return None
else:
return self.df[index]['LONG DESCRIPTION'].tolist()[0]
[docs] def code2desc(self, code):
'''
Get description of codes.
Parameters
----------
code: str or List[str]
The input icd code or list of codes.
'''
if isinstance(code, str):
code = [code]
res = []
for code_ in code:
res.append(self.__getitem__(code_, 'LONG DESCRIPTION'))
if len(res) == 1:
return res[0]
else:
return res
if __name__ == '__main__':
# graph = ICD9Graph('./resources')
# graph = ICD10Graph('./resources')
# print(get_icd10_from_nih(["lung neoplasm", "breast"]))
# print(get_icd9dx_from_nih(["lung neoplasm", "breast"]))
# print(get_icd9sg_from_nih(["lung neoplasm", "breast"]))
# print(get_condition_synonym_from_nih(['gastroenteri','salmonella']))
pass