Source code for pytrial.model_utils.drug
'''
Provide a series of tools for getting drug mappings, e.g., name to SMILES string,
drug-DDI matrix, drug ndc - rxcui - atc4 codes, drug cid-atc codes.
'''
import json
import wget
import os
import requests
import re
import pdb
import zipfile
import gzip
import networkx as nx
from networkx.readwrite import json_graph
import pandas as pd
from tqdm import tqdm
from ..utils.tabular_utils import read_csv_to_df, read_txt_to_df, read_excel_to_df
from ..utils.check import make_dir_if_not_exist
# DRUG_DDI_URL = 'https://uofi.box.com/shared/static/xdfgrnhzotz6ktyrdsrikrnz1th97fic.csv'
DRUG_DDI_URL = 'https://storage.googleapis.com/pytrial/drug-DDI.csv'
# DRUG_BANK_URL = 'https://uofi.box.com/shared/static/4f3g4dvdfyz5goubazeqfzdi0abcgzwf.csv'
DRUG_BANK_URL = 'https://storage.googleapis.com/pytrial/drugbank_drugs_info.csv'
RXCUI_ATC4_NDC11_URL = 'https://github.com/RyanWangZf/PyTrial/raw/main/resources/rxcui_atc4_ndc11.zip'
NDC_NAME_URL = 'https://github.com/RyanWangZf/PyTrial/raw/main/resources/ndc_name.csv'
NAME_SMILES_URL = 'https://github.com/RyanWangZf/PyTrial/raw/main/resources/drug_smiles.csv'
NAME_ATC_URL ='https://github.com/RyanWangZf/PyTrial/raw/main/resources/atc_drug.csv'
# ATC5_NDC_URL = 'https://uofi.box.com/shared/static/dk07wip4l4hkbolp09e3rz3un4hokocb.zip'
ATC5_NDC_URL = 'https://storage.googleapis.com/pytrial/atc5_ndc.zip'
# ATC_DEF_URL = 'https://uofi.box.com/shared/static/tdz6glo9waf353mwxvqw44l6r43vrfqm.zip'
ATC_DEF_URL = 'https://storage.googleapis.com/pytrial/ATC.csv.zip'
# FDA_NDC_NAME_URL = 'https://uofi.box.com/shared/static/ah6gk3ljaj0uz0cr2yoecmd3so2onih7.zip'
FDA_NDC_NAME_URL = 'https://storage.googleapis.com/pytrial/fda_ndc.zip'
[docs]class DrugTransformer:
'''
Provide a series of drug-related functions for
(1) drug name to atc / atc to drug name
(2) drug name to ndc-11 / ndc-11 to drug name
(3) drug name to smiles (molecule structure)
(4) atc to smiles
(5) ndc-11 to smiles
(6) ndc-11 to atc / atc to ndc
To convert ndc-10 ditis to ndc-11, use `convert_ndc10_ndc11`.
'''
def __init__(self):
# initialize using several preprocessed files
self._build_ndc_rxcui_atc_map()
self._build_ndc_name_map()
self._build_name_smiles()
self._build_atc_name()
self._build_atc_ndc_smiles()
[docs] def ndc2atc(self, code):
'''
Parameters
----------
code: str or list[str]
NDC-11 codes.
Returns
-------
atc codes: list[str] or list[list[str,None]]
'''
df = self.rxcui_atc4_ndc11
single_input = isinstance(code, str)
if single_input: code = [code]
outputs = []
for c in code:
df_sub = df[df['ndc11']==c]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['atc4'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def atc2ndc(self, code):
'''
Parameters
----------
code: str or list[str]
ATC-4 codes.
Returns
-------
NDC-11 codes: list[str] or list[list[str,None]]
'''
df = self.rxcui_atc4_ndc11
single_input = isinstance(code, str)
if single_input: code = [code]
outputs = []
for c in code:
df_sub = df[df['atc4']==c]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['ndc11'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def name2ndc(self, name):
'''
Parameters
----------
name: str or list[str]
Drug names.
Returns
-------
ndc codes: list[str] or list[list[str,None]]
'''
df = self.ndc11_name
single_input = isinstance(name, str)
if single_input: name = [name]
outputs = []
for c in name:
df_sub = df[df['drug']==c.lower()]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['ndc11'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def ndc2name(self, code):
'''
Parameters
----------
code: str or list[str]
NDC-11 codes.
Returns
-------
name: list[str] or list[list[str,None]]
Drug names.
'''
df = self.ndc11_name
single_input = isinstance(code, str)
if single_input: code = [code]
outputs = []
for c in code:
df_sub = df[df['ndc11']==c.lower()]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['drug'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def atc2name(self, code):
'''
Parameters
----------
code: str or list[str]
ATC4 codes.
Returns
-------
names: list[str] or list[list[str,None]]
Drug names.
'''
df = self.name_atc
single_input = isinstance(code, str)
if single_input: code = [code]
outputs = []
for c in code:
df_sub = df[df['atc4']==c]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['drug'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def name2atc(self, name):
'''
Parameters
----------
name: str or list[str]
Drug names.
Returns
-------
codes: list[str] or list[list[str,None]]
ATC4 codes.
'''
df = self.name_atc
single_input = isinstance(name, str)
if single_input: name = [name]
outputs = []
for c in name:
df_sub = df[df['drug']==c.lower()]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['atc4'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def name2smiles(self, name):
'''
Parameters
----------
name: str or list[str]
Drug names.
Returns
-------
smiles: list[str] or list[list[str,None]]
Drug molecule structures represented by SMILES.
'''
df = self.name_smiles
single_input = isinstance(name, str)
if single_input: name = [name]
outputs = []
for c in name:
df_sub = df[df['drug']==c.lower()]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['moldb_smiles'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def atc2smiles(self, code):
'''
Parameters
----------
code: str or list[str]
ATC4 codes.
Returns
-------
smiles: list[str] or list[list[str,None]]
Drug molecule structures represented by SMILES.
'''
df = self.atc_smiles
single_input = isinstance(code, str)
if single_input: code = [code]
outputs = []
for c in code:
df_sub = df[df['atc4']==c]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['moldb_smiles'].tolist())
if single_input: return outputs[0]
else: return outputs
[docs] def ndc2smiles(self, code):
'''
Parameters
----------
name: str or list[str]
Drug names.
Returns
-------
smiles: list[str] or list[list[str,None]]
Drug molecule structures represented by SMILES.
'''
'''
Parameters
----------
code: str or list[str]
NDC-11 codes.
Returns
-------
smiles: list[str] or list[list[str,None]]
Drug molecule structures represented by SMILES.
'''
df = self.ndc_smiles
single_input = isinstance(code, str)
if single_input: code = [code]
outputs = []
for c in code:
df_sub = df[df['ndc11']==c]
if len(df_sub) == 0:
outputs.append(None)
else:
outputs.append(df_sub['moldb_smiles'].tolist())
if single_input: return outputs[0]
else: return outputs
def _build_ndc_rxcui_atc_map(self):
# load ndc_rxcui_atc map csv
filename = './resources/rxcui_atc4_ndc11.csv'
make_dir_if_not_exist('resources')
if not os.path.exists('./resources/rxcui_atc4_ndc11.csv'):
wget.download(RXCUI_ATC4_NDC11_URL, out=filename.replace('.csv','.zip'))
f = zipfile.ZipFile(filename.replace('.csv','.zip'),'r')
f.extractall('./resources')
f.close()
print(f'Download RXCUI-NDC-ATC4 mapping file to {filename}.')
self.rxcui_atc4_ndc11 = read_csv_to_df(filename, dtype={'ndc11':str, 'rxcui':str}, low_memory=False)
def _build_ndc_name_map(self):
filename = './resources/ndc_name.csv'
make_dir_if_not_exist('resources')
if not os.path.exists(filename):
wget.download(NDC_NAME_URL, out=filename)
print(f'Download NDC-DrugName mapping file to {filename}.')
self.ndc11_name = read_csv_to_df(filename, dtype={'ndc11':str, 'DRUG':str}, low_memory=False)
self.ndc11_name = self.ndc11_name.applymap(lambda x: x.lower())
def _build_name_smiles(self):
filename = './resources/drug_smiles.csv'
make_dir_if_not_exist('resources')
if not os.path.exists(filename):
wget.download(NAME_SMILES_URL, out=filename)
print(f'Download DrugName-SMILES mapping file to {filename}.')
self.name_smiles = read_csv_to_df(filename, dtype={'DRUG':str, 'moldb_smiles':str}, low_memory=False)
self.name_smiles['drug'] = self.name_smiles['drug'].apply(lambda x: x.lower())
self.name_smiles = self.name_smiles.dropna()
def _build_atc_name(self):
filename = './resources/atc_drug.csv'
make_dir_if_not_exist('resources')
if not os.path.exists(filename):
wget.download(NAME_ATC_URL, out=filename)
print(f'Download DrugName-ATC4 mapping file to {filename}.')
self.name_atc = read_csv_to_df(filename, dtype={'DRUG':str, 'atc4':str}, low_memory=False)
self.name_atc['drug'] = self.name_atc['drug'].apply(lambda x: x.lower())
def _build_atc_ndc_smiles(self):
df_name_atc = self.name_atc
df_name_smiles = self.name_smiles
df = df_name_atc.merge(df_name_smiles, on='drug')[['atc4','moldb_smiles']]
self.atc_smiles = df.drop_duplicates().reset_index(drop=True)
df_ndc_name = self.ndc11_name
df = df_ndc_name.merge(df_name_smiles, on='drug').drop_duplicates().reset_index(drop=True)
self.ndc_smiles = df[['ndc11','moldb_smiles']]
def convert_ndc10_ndc11(code):
'''
Covert NDC 10 digits with hyphens to NDC 11 digits w/o hyphens.
Examples
--------
10-Digit Format: https://health.maryland.gov/phpa/OIDEOR/IMMUN/Shared%20Documents/Handout%203%20-%20NDC%20conversion%20to%2011%20digits.pdf
on Drug Package 10-Digit Format Example 11-Digit Format 11-Digit Format Example 10-Digit NDC Example 11-Digit Conversion of 10-Digit NDC Example
4-4-2 9999-9999-99 5-4-2 09999-9999-99 0002-7597-01 00002-7597-01
5-3-2 99999-999-99 5-4-2 99999-0999-99 50242-040-62 50242-0040-62
5-4-1 99999-9999-9 5-4-2 99999-9999-09 60574-4114-1 60574-4114-01
'''
s = code.split('-')
S = []
for i, L in enumerate([5,4,2]):
if len(s[i]) < L:
S.append('0'*(L-len(s[i])) + s[i])
else:
S.append(s[i])
return ''.join(S)
def download_drug_ddi(output_dir='./datasets'):
'''
Download the drug-ddi information file to disk.
Parameters
----------
output_dir: str
The output dir.
'''
make_dir_if_not_exist(output_dir)
filename = os.path.join(output_dir, 'drug-DDI.csv')
wget.download(DRUG_DDI_URL, out=filename)
print(f'Save drug DDI file to {filename}.')
def download_drug_bank(output_dir='./datasets'):
'''
Download the drug-bank information file to disk.
Contains drugnames and drug SMILES molecule strings.
Parameters
----------
output_dir: str
The output dir.
'''
make_dir_if_not_exist(output_dir)
filename = os.path.join(output_dir, 'drugbank.csv')
wget.download(DRUG_BANK_URL, out=filename)
print(f'Save drug bank file to {filename}.')
[docs]class DrugGraph:
'''
Provide tools to get hierarchy of drug by ACT codes.
From ATC2 - ATC4 (dont include ATC5)
'''
def __init__(self, input_dir='./resources'):
filename = os.path.join(input_dir, 'drug_hierarchy.json')
if not os.path.exists(filename):
self.preprocess(input_dir)
with open(filename, 'r') as f:
hierarchy = json.loads(f.read())
self.graph = json_graph.adjacency_graph(hierarchy)
print('load drug graph from', filename)
[docs] def preprocess(self, dir='./resources'):
'''
Process raw data and deposit the graph data to the local disk.
Search `ndc_atc.csv` under the given directory `dir`. If not,
download the raw files to the disk and unzip.
'''
make_dir_if_not_exist(dir)
atctree_file = os.path.join(dir, 'ndc_atc.csv')
if not os.path.exists(atctree_file):
self._download_ndc_atc_map(dir, atctree_file)
atcdef_file = os.path.join(dir, 'atc_def.csv')
if not os.path.exists(atcdef_file):
self._download_atc_def(dir, atcdef_file)
fdandc_file = os.path.join(dir, 'fda_ndc_name.csv')
if not os.path.exists(fdandc_file):
self._download_fda_ndc_name(dir, fdandc_file)
df = read_csv_to_df(atctree_file)
df['ndc'] = df['ndc'].apply(lambda x: convert_ndc10_ndc11(x))
df = df.dropna(subset=['atc4']).reset_index(drop=True)
df_atcdef = read_csv_to_df(atcdef_file)
df_atcdef['atc'] = df_atcdef['class id'].apply(lambda x: x.split('/')[-1])
# TODO: attach ndc code to its name
df_ndc = read_csv_to_df(fdandc_file)
df_ndc['ndc'] = df_ndc['ndcpackagecode'].apply(lambda x: convert_ndc10_ndc11(x))
df_ndc = df_ndc[['proprietaryname','ndc']]
nodes_list = []
for index, row in df_ndc.iterrows():
nodes_list.append(
(row['ndc'], {'description':row['proprietaryname']})
)
# add ndc code to the leaf code of G
G = nx.DiGraph()
G.add_nodes_from(nodes_list)
atc4 = df['atc4'].unique()
for code in tqdm(atc4):
# some drug dont have atc5 code
ndc_codes = df[df['atc4'] == code]['ndc'].unique()
edges = list(zip([code]*len(ndc_codes), ndc_codes))
G.add_edges_from(edges)
# build atc code graph
G_atc = self._build_atc_tree(df_atcdef)
G = nx.compose(G, G_atc)
res = nx.adjacency_data(G, attrs={'key':'description', 'id':'id'})
out_filename = os.path.join(dir, 'drug_hierarchy.json')
with open(out_filename, 'w') as f:
f.write(json.dumps(res))
print('done, save the drug hierarchy to', out_filename)
def _download_fda_ndc_name(self, dir, fdandc_file):
temp_filename = os.path.join(dir, 'fda_ndc.zip')
if os.path.exists(temp_filename): os.remove(temp_filename)
wget.download(FDA_NDC_NAME_URL, out=temp_filename)
f = zipfile.ZipFile(temp_filename,'r')
f.extractall(dir)
f.close()
temp_textname = os.path.join(dir, 'package.txt')
temp_textname2 = os.path.join(dir, 'product.txt')
df_pack = read_txt_to_df(temp_textname, encoding='cp1252')
df_prod = read_txt_to_df(temp_textname2, encoding='cp1252')
df = df_pack.merge(df_prod, on='productid')
df.to_csv(fdandc_file, index=False)
os.remove(temp_filename)
os.remove(temp_textname)
os.remove(temp_textname2)
print("\n Download raw file to", fdandc_file)
def _download_ndc_atc_map(self, dir, atctree_file):
temp_filename = os.path.join(dir, 'ndc_atc.zip')
if os.path.exists(temp_filename): os.remove(temp_filename)
wget.download(ATC5_NDC_URL, out=temp_filename)
f = zipfile.ZipFile(temp_filename,'r')
f.extractall(dir)
f.close()
temp_csvfilename = os.path.join(dir, 'ndc_map 2020_06_17 (atc5 atc4 ingredients).csv')
os.rename(temp_csvfilename, atctree_file)
os.remove(temp_filename)
print("Download raw file to", atctree_file)
def _download_atc_def(self, dir, atcdef_file):
temp_filename = os.path.join(dir, 'atc_def.csv.zip')
if os.path.exists(temp_filename): os.remove(temp_filename)
wget.download(ATC_DEF_URL, out=temp_filename)
f = zipfile.ZipFile(temp_filename,'r')
f.extractall(dir)
f.close()
temp_csvfilename = os.path.join(dir, 'ATC.csv')
os.rename(temp_csvfilename, atcdef_file)
os.remove(temp_filename)
print("\n Download raw file to", atcdef_file)
def _build_atc_tree(self, df_atcdef):
G = nx.DiGraph()
df_atc_label = df_atcdef[['atc','preferred label']]
df_atc_label = df_atc_label[df_atc_label['atc'].map(len) < 7]
nodes_list = []
for index, row in df_atc_label.iterrows():
nodes_list.append(
(row['atc'], {'description':row['preferred label']})
)
nodes_list.append(('root', {'description':'Anatomical Therapeutic Chemical (ATC) Classification'}))
G.add_nodes_from(nodes_list)
codes = df_atcdef['atc'].unique()
codes = pd.Series(codes)
atc4 = codes[codes.map(len)==5]
atc3 = codes[codes.map(len)==4]
atc2 = codes[codes.map(len)==3]
atc1 = codes[codes.map(len)==1]
for _, code in tqdm(atc3.iteritems()):
children = atc4[atc4.apply(lambda x: code == x[:4])]
edges = list(zip([code]*len(children), children.tolist()))
G.add_edges_from(edges)
for _, code in tqdm(atc2.iteritems()):
children = atc3[atc3.apply(lambda x: code == x[:3])]
edges = list(zip([code]*len(children), children.tolist()))
G.add_edges_from(edges)
for _, code in tqdm(atc1.iteritems()):
children = atc2[atc2.apply(lambda x: code == x[:1])]
edges = list(zip([code]*len(children), children.tolist()))
G.add_edges_from(edges)
edges = list(zip(['root']*len(atc1), atc1))
G.add_edges_from(edges)
return G
# if __name__ == '__main__':
# download_drug_ddi()
# download_drug_bank()