# -*- coding: utf-8 -*-
"""PyHGNC loads HGNC contant into a relational database and provides a RESTFull API."""
import os
import re
import sys
import time
import logging
import json
import pandas
import numpy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session, load_only
from tqdm import tqdm
if sys.version_info[0] == 3:
from urllib.request import urlopen
else:
from urllib import urlopen
from datetime import datetime
from configparser import RawConfigParser, ConfigParser
from pyhgnc import constants
from . import models
from . import defaults
from ..constants import PYHGNC_LOG_DIR, HGNC_JSON
log = logging.getLogger('pyhgnc')
fh_path = os.path.join(PYHGNC_LOG_DIR, time.strftime('pyhgnc_database_%Y_%m_%d_%H_%M_%S.txt'))
fh = logging.FileHandler(fh_path)
fh.setLevel(logging.DEBUG)
log.addHandler(fh)
class BaseDbManager(object):
"""Creates a connection to database and a persistient session using SQLAlchemy"""
def __init__(self, connection=None, echo=False):
"""
:param str connection: SQLAlchemy
:param bool echo: True or False for SQL output of SQLAlchemy engine
"""
try:
self.connection = self.get_connection_string(connection)
self.engine = create_engine(self.connection, echo=echo)
self.sessionmaker = sessionmaker(bind=self.engine, autoflush=False, expire_on_commit=False)
self.session = scoped_session(self.sessionmaker)
except:
log.warning('No valid database connection. Execute `pyhgnc connection` on command line')
def _create_tables(self, checkfirst=True):
"""creates all tables from models in your database
:param checkfirst: True or False check if tables already exists
:type checkfirst: bool
:return:
"""
log.info('create tables in {}'.format(self.engine.url))
models.Base.metadata.create_all(self.engine, checkfirst=checkfirst)
def _drop_tables(self):
"""drops all tables in the database
:return:
"""
log.info('drop tables in {}'.format(self.engine.url))
models.Base.metadata.drop_all(self.engine)
@staticmethod
def get_connection_string(connection=None):
"""return sqlalchemy connection string if it is set
:param connection: get the SQLAlchemy connection string #TODO
:return:
"""
if not connection:
config = ConfigParser()
cfp = defaults.config_file_path
if os.path.exists(cfp):
log.info('fetch database configuration from {}'.format(cfp))
config.read(cfp)
connection = config['database']['sqlalchemy_connection_string']
log.info('load connection string from {}: {}'.format(cfp, connection))
else:
with open(cfp, 'w') as config_file:
connection = defaults.sqlalchemy_connection_string_default
config['database'] = {'sqlalchemy_connection_string': connection}
config.write(config_file)
log.info('create configuration file {}'.format(cfp))
return connection
class DbManager(BaseDbManager):
enzymes = {}
gene_families = {}
refseqs = {}
mgds = {}
uniprots = {}
pubmeds = {}
enas = {}
rgds = {}
def __init__(self, connection=None):
"""The DbManager implements all function to upload HGNC data into the database. Prefered SQL Alchemy
database is MySQL with pymysql.
:param connection: custom database connection SQL Alchemy string
:type connection: str
"""
super(DbManager, self).__init__(connection=connection)
def db_import(self, silent=False, hgnc_file_path=None, hcop_file_path=None, low_memory=False):
self._drop_tables()
self._create_tables()
json_data = DbManager.load_hgnc_json(hgnc_file_path=hgnc_file_path)
self.insert_hgnc(hgnc_dict=json_data, silent=silent, low_memory=low_memory)
self.insert_hcop(silent=silent, hcop_file_path=hcop_file_path)
@classmethod
def get_date(cls, hgnc, key):
date_value = hgnc.get(key)
if date_value:
return datetime.strptime(date_value, "%Y-%m-%d",).date()
@classmethod
def get_alias_symbols(cls, hgnc):
alias_symbols = []
if 'alias_symbol' in hgnc:
for alias in hgnc['alias_symbol']:
alias_symbols.append(models.AliasSymbol(alias_symbol=alias))
if 'prev_symbol' in hgnc:
for prev in hgnc['prev_symbol']:
alias_symbols.append(models.AliasSymbol(alias_symbol=prev, is_previous_symbol=True))
return alias_symbols
@classmethod
def get_alias_names(cls, hgnc):
alias_names = []
if 'alias_name' in hgnc:
for alias in hgnc['alias_name']:
alias_names.append(models.AliasName(alias_name=alias))
if 'prev_name' in hgnc:
for prev in hgnc['prev_name']:
alias_names.append(models.AliasName(alias_name=prev, is_previous_name=True))
return alias_names
def get_gene_families(self, hgnc):
gene_families = []
if 'gene_family' in hgnc:
for i, family in enumerate(hgnc['gene_family']):
family_identifier = hgnc['gene_family_id'][i]
if family_identifier not in self.gene_families:
gene_family = models.GeneFamily(family_identifier=family_identifier, family_name=family)
self.gene_families[family_identifier] = gene_family
gene_families.append(self.gene_families[family_identifier])
return gene_families
def get_refseq(self, hgnc):
refseqs = []
if 'refseq_accession' in hgnc:
for accession in hgnc['refseq_accession']:
if accession not in self.refseqs:
self.refseqs[accession] = models.RefSeq(accession=accession)
refseqs.append(self.refseqs[accession])
return refseqs
def get_mgds(self, hgnc):
mgds = []
regex_mgdid = re.compile("MGI:(?P<mgdid>\d+)")
if 'mgd_id' in hgnc:
for mgd in hgnc['mgd_id']:
mgdid_found = regex_mgdid.search(mgd)
if mgd not in self.mgds and mgdid_found:
mgdid = mgdid_found.groupdict()['mgdid']
self.mgds[mgd] = models.MGD(mgdid=int(mgdid))
if mgdid_found:
mgds.append(self.mgds[mgd])
return mgds
def get_rgds(self, hgnc):
rgds = []
if 'rgd_id' in hgnc:
for rgd in hgnc['rgd_id']:
if rgd not in self.rgds:
rgdid = int(rgd.split(':')[-1])
self.rgds[rgd] = models.RGD(rgdid=rgdid)
rgds.append(self.rgds[rgd])
return rgds
def get_omims(self, hgnc):
omims = []
if 'omim_id' in hgnc:
for omim in hgnc['omim_id']:
omims.append(models.OMIM(omimid=omim))
return omims
def get_uniprots(self, hgnc):
uniprots = []
if 'uniprot_ids' in hgnc:
for uniprot in hgnc['uniprot_ids']:
if uniprot not in self.uniprots:
self.uniprots[uniprot] = models.UniProt(uniprotid=uniprot)
uniprots.append(self.uniprots[uniprot])
return uniprots
def get_ccds(self, hgnc):
ccds = []
if 'ccds_id' in hgnc:
for ccdsid in hgnc['ccds_id']:
ccds.append(models.CCDS(ccdsid=ccdsid))
return ccds
def get_pubmeds(self, hgnc):
pubmeds = []
if 'pubmed_id' in hgnc:
for pubmed in hgnc['pubmed_id']:
if pubmed not in self.pubmeds:
self.pubmeds[pubmed] = models.PubMed(pubmedid=int(pubmed))
pubmeds.append(self.pubmeds[pubmed])
return pubmeds
def get_enas(self, hgnc):
enas = []
if 'ena' in hgnc:
for ena in hgnc['ena']:
if ena not in self.enas:
self.enas[ena] = models.ENA(enaid=ena)
enas.append(self.enas[ena])
return enas
def get_lsdbs(self, hgnc):
lsdbs = []
if 'lsdb' in hgnc:
for lsdb_url in hgnc['lsdb']:
lsdb, url = lsdb_url.split('|')
lsdbs.append(models.LSDB(lsdb=lsdb, url=url))
return lsdbs
def get_enzymes(self, hgnc):
enzymes = []
if 'enzyme_id' in hgnc:
for ec_number in hgnc['enzyme_id']:
if ec_number not in self.enzymes:
self.enzymes[ec_number] = models.Enzyme(ec_number=ec_number)
enzymes.append(self.enzymes[ec_number])
return enzymes
def insert_hgnc(self, hgnc_dict, silent=False, low_memory=False):
log.info('low_memory set to {}'.format(low_memory))
for hgnc_data in tqdm(hgnc_dict['docs'], disable=silent):
hgnc_table = {
'symbol': hgnc_data['symbol'],
'identifier': int(hgnc_data['hgnc_id'].split(':')[-1]),
'name': hgnc_data['name'],
'status': hgnc_data['status'],
'orphanet': hgnc_data.get('orphanet'),
'uuid': hgnc_data['uuid'],
'locus_group': hgnc_data['locus_group'],
'locus_type': hgnc_data['locus_type'],
'ensembl_gene': hgnc_data.get('ensembl_gene_id'),
'horde': hgnc_data.get('horde_id'),
'vega': hgnc_data.get('vega_id'),
'lncrnadb': hgnc_data.get('lncrnadb'),
'entrez': hgnc_data.get('entrez_id'),
'mirbase': hgnc_data.get('mirbase'),
'iuphar': hgnc_data.get('iuphar'),
'ucsc': hgnc_data.get('ucsc_id'),
'snornabase': hgnc_data.get('snornabase'),
'pseudogeneorg': hgnc_data.get('pseudogene.org'),
'bioparadigmsslc': hgnc_data.get('bioparadigms_slc'),
'locationsortable': hgnc_data.get('location_sortable'),
'merops': hgnc_data.get('merops'),
'location': hgnc_data.get('location'),
'cosmic': hgnc_data.get('cosmic'),
'imgt': hgnc_data.get('imgt'),
'date_name_changed': self.get_date(hgnc_data, 'date_name_changed'),
'date_modified': self.get_date(hgnc_data, 'date_modified'),
'date_symbol_changed': self.get_date(hgnc_data, 'date_symbol_changed'),
'date_approved_reserved': self.get_date(hgnc_data, 'date_approved_reserved'),
'alias_symbols': self.get_alias_symbols(hgnc_data),
'alias_names': self.get_alias_names(hgnc_data),
'gene_families': self.get_gene_families(hgnc_data),
'refseqs': self.get_refseq(hgnc_data),
'mgds': self.get_mgds(hgnc_data),
'rgds': self.get_rgds(hgnc_data),
'omims': self.get_omims(hgnc_data),
'uniprots': self.get_uniprots(hgnc_data),
'ccdss': self.get_ccds(hgnc_data),
'pubmeds': self.get_pubmeds(hgnc_data),
'enas': self.get_enas(hgnc_data),
'lsdbs': self.get_lsdbs(hgnc_data),
'enzymes': self.get_enzymes(hgnc_data)
}
self.session.add(models.HGNC(**hgnc_table))
if low_memory:
self.session.flush()
if not silent:
print('Insert HGNC data into database')
self.session.commit()
def insert_hcop(self, silent=False, hcop_file_path=None):
log_text = 'Load OrthologyPrediction data from {}'.format((hcop_file_path or constants.HCOP_GZIP))
log.info(log_text)
if not silent:
print(log_text)
df_hcop = pandas.read_table((hcop_file_path or constants.HCOP_GZIP), low_memory=False)
df_hcop.replace('-', numpy.NaN, inplace=True)
df_hcop.replace(to_replace={'hgnc_id': 'HGNC:'}, value='', regex=True, inplace=True)
df_hcop.hgnc_id = df_hcop.hgnc_id.fillna(-1).astype(int)
df_hcop.rename(columns={'hgnc_id': 'identifier'}, inplace=True)
df_hcop.set_index('identifier', inplace=True)
log_text = 'Join HGNC with HGNC'
log.info(log_text)
if not silent:
print(log_text)
data = self.session.query(models.HGNC).options(load_only(models.HGNC.id, models.HGNC.identifier))
data = [{'hgnc_id': x.id, 'identifier': x.identifier} for x in data]
df_hgnc = pandas.DataFrame(data)
df_hgnc.set_index('identifier', inplace=True)
df_hcnp4db = df_hcop.join(df_hgnc)
df_hcnp4db.reset_index(inplace=True)
df_hcnp4db.index += 1
df_hcnp4db.drop('identifier', axis=1, inplace=True)
df_hcnp4db.index.rename('id', inplace=True)
log_text = 'Load OrthologyPrediction data in database'
log.info(log_text)
if not silent:
print(log_text)
df_hcnp4db.to_sql(name=models.OrthologyPrediction.__tablename__, con=self.connection, if_exists='append')
@staticmethod
def load_hgnc_json(hgnc_file_path=None):
if hgnc_file_path:
with open(hgnc_file_path) as response:
log.info('loading json data from {}'.format(hgnc_file_path))
hgnc_dict = json.loads(response.read())
else:
response = urlopen(HGNC_JSON)
hgnc_dict = json.loads(response.read().decode())
return hgnc_dict['response']
[docs]def update(connection=None, silent=False, hgnc_file_path=None, hcop_file_path=None, low_memory=False):
"""Update the database with current version of HGNC
:param str connection: conncetion string
:param bool silent: silent while import
:param str hgnc_file_path: import from path HGNC
:param str hcop_file_path: import from path HCOP (orthologs)
:param bool low_memory: set to `True` if you have low memory
:return:
"""
database = DbManager(connection)
database.db_import(silent=silent, hgnc_file_path=hgnc_file_path, hcop_file_path=hcop_file_path, low_memory=low_memory)
database.session.close()
[docs]def set_connection(connection=defaults.sqlalchemy_connection_string_default):
"""Set the connection string for sqlalchemy and write it to the config file.
.. code-block:: python
import pyhgnc
pyhgnc.set_connection('mysql+pymysql://{user}:{passwd}@{host}/{db}?charset={charset}')
.. hint::
valid connection strings
- mysql+pymysql://user:passwd@localhost/database?charset=utf8
- postgresql://scott:tiger@localhost/mydatabase
- mssql+pyodbc://user:passwd@database
- oracle://user:passwd@127.0.0.1:1521/database
- Linux: sqlite:////absolute/path/to/database.db
- Windows: sqlite:///C:\path\to\database.db
:param str connection: sqlalchemy connection string
"""
config_path = defaults.config_file_path
config = RawConfigParser()
if not os.path.exists(config_path):
with open(config_path, 'w') as config_file:
config['database'] = {'sqlalchemy_connection_string': connection}
config.write(config_file)
log.info('create configuration file {}'.format(config_path))
else:
config.read(config_path)
config.set('database', 'sqlalchemy_connection_string', connection)
with open(config_path, 'w') as configfile:
config.write(configfile)
[docs]def set_mysql_connection(host='localhost', user='pyhgnc_user', passwd='pyhgnc_passwd', db='pyhgnc',
charset='utf8'):
"""Method to set a MySQL connection
:param str host: MySQL database host
:param str user: MySQL database user
:param str passwd: MySQL database password
:param str db: MySQL database name
:param str charset: MySQL database charater set
:return: connection string
:rtype: str
"""
connection_string = 'mysql+pymysql://{user}:{passwd}@{host}/{db}?charset={charset}'.format(
host=host,
user=user,
passwd=passwd,
db=db,
charset=charset
)
set_connection(connection_string)
return connection_string