Skip to content

Commit

Permalink
Potentially solve EI-CoreBioinformatics#177: now ORFs will be seriali…
Browse files Browse the repository at this point in the history
…sed in parallel.
  • Loading branch information
lucventurini committed Sep 8, 2019
1 parent 58e553c commit 335e6a4
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 52 deletions.
50 changes: 37 additions & 13 deletions Mikado/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import bz2
from functools import partial
import magic
import multiprocessing as mp


class HeaderError(Exception):
Expand All @@ -27,17 +28,21 @@ class Parser(metaclass=abc.ABCMeta):

def __init__(self, handle):
self.__closed = False
self.__from_queue = False
if not isinstance(handle, io.IOBase):
if handle.endswith(".gz") or self.wizard.from_file(handle) == b"application/gzip":
opener = gzip.open
elif handle.endswith(".bz2") or self.wizard.from_file(handle) == b"application/x-bzip2":
opener = bz2.open
if isinstance(handle, mp.queues.Queue):
self.__from_queue = True
else:
opener = partial(open, **{"buffering": 1})
try:
handle = opener(handle, "rt")
except FileNotFoundError:
raise FileNotFoundError("File not found: {0}".format(handle))
if handle.endswith(".gz") or self.wizard.from_file(handle) == b"application/gzip":
opener = gzip.open
elif handle.endswith(".bz2") or self.wizard.from_file(handle) == b"application/x-bzip2":
opener = bz2.open
else:
opener = partial(open, **{"buffering": 1})
try:
handle = opener(handle, "rt")
except FileNotFoundError:
raise FileNotFoundError("File not found: {0}".format(handle))

self._handle = handle
self.closed = False
Expand All @@ -46,8 +51,20 @@ def __iter__(self):
return self

def __next__(self):
line = self._handle.readline()
return line

if self.__from_queue:
line = self._handle.get_nowait()
if isinstance(line, bytes):
line = line.decode()
if line in ("EXIT", b"EXIT"):
self.close()
else:
try:
line = self._handle.readline()
except StopIteration:
self.close()
raise StopIteration
return line

def __enter__(self):
if self.closed is True:
Expand All @@ -56,7 +73,10 @@ def __enter__(self):

def __exit__(self, *args):
_ = args
self._handle.close()
if self.__from_queue is False:
self._handle.close()
else:
self._handle.join()
self.closed = True

def close(self):
Expand All @@ -70,7 +90,10 @@ def name(self):
"""
Return the filename.
"""
return self._handle.name
if self.__from_queue:
return ""
else:
return self._handle.name

@property
def closed(self):
Expand Down Expand Up @@ -103,6 +126,7 @@ def closed(self, *args):
from . import blast_utils
from . import bam_parser


def to_gff(string, input_format=None):
"""
Function to recognize the input file type (GFF or GTF).
Expand Down
148 changes: 144 additions & 4 deletions Mikado/parsers/bed12.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from Bio.Data.IUPACData import ambiguous_rna_letters as _ambiguous_rna_letters
from Bio.Data import CodonTable
import multiprocessing as mp
import msgpack
import logging
import logging.handlers as logging_handlers


backup_valid_letters = set(_ambiguous_dna_letters.upper() + _ambiguous_rna_letters.upper())
Expand Down Expand Up @@ -125,7 +128,9 @@ def _translate_str(sequence, table, stop_symbol="*", to_stop=False, cds=False, p
"""

if cds and len(sequence) % 3 != 0:
raise CodonTable.TranslationError("Sequence length {0} is not a multiple of three".format(n))
raise CodonTable.TranslationError("Sequence length {0} is not a multiple of three".format(
len(sequence)
))
elif gap is not None and (not isinstance(gap, str) or len(gap) > 1):
raise TypeError("Gap character should be a single character "
"string.")
Expand Down Expand Up @@ -719,6 +724,25 @@ def copy(self):

return copy.deepcopy(self)

def as_simple_dict(self):

return {
"chrom": self.chrom,
"id": self.id,
"start": self.start,
"end": self.end,
"name": self.name,
"strand": self.strand,
"thick_start": self.thick_start,
"thick_end": self.thick_end,
"score": self.score,
"has_start_codon": self.has_start_codon,
"has_stop_codon": self.has_stop_codon,
"cds_len": self.cds_len,
"phase": self.phase,
"transcriptomic": self.transcriptomic,
}

@property
def strand(self):
"""
Expand Down Expand Up @@ -1138,7 +1162,6 @@ def __init__(self, handle,
max_regression=0,
is_gff=False,
coding=False,
procs=None,
table=0):
"""
Constructor method.
Expand Down Expand Up @@ -1167,11 +1190,13 @@ def __init__(self, handle,
fasta_index[numpy.random.choice(fasta_index.keys(), 1)],
Bio.SeqRecord.SeqRecord)
elif fasta_index is not None:
if isinstance(fasta_index, str):
if isinstance(fasta_index, (str, bytes)):
if isinstance(fasta_index, bytes):
fasta_index = fasta_index.decode()
assert os.path.exists(fasta_index)
fasta_index = pysam.FastaFile(fasta_index)
else:
assert isinstance(fasta_index, pysam.FastaFile)
assert isinstance(fasta_index, pysam.FastaFile), type(fasta_index)

self.fasta_index = fasta_index
self.__closed = False
Expand Down Expand Up @@ -1267,3 +1292,118 @@ def coding(self, coding):
if coding not in (False, True):
raise ValueError(coding)
self.__coding = coding


class Bed12ParseWrapper(mp.Process):

def __init__(self,
rec_queue=None,
return_queue=None,
log_queue=None, level="DEBUG",
fasta_index=None,
transcriptomic=False,
max_regression=0,
is_gff=False,
coding=False,
table=0):

"""
:param send_queue:
:type send_queue: mp.Queue
:param return_queue:
:type send_queue: mp.Queue
:param kwargs:
"""

super().__init__()
self.rec_queue = rec_queue
self.return_queue = return_queue
self.logging_queue = log_queue
self.handler = logging_handlers.QueueHandler(self.logging_queue)
self.logger = logging.getLogger(self.name)
self.logger.addHandler(self.handler)
self.logger.setLevel(level)
self.logger.propagate = False
self.transcriptomic = transcriptomic
self.__max_regression = 0
self._max_regression = max_regression
self.coding = coding

if isinstance(fasta_index, dict):
# check that this is a bona fide dictionary ...
assert isinstance(
fasta_index[numpy.random.choice(fasta_index.keys(), 1)],
Bio.SeqRecord.SeqRecord)
elif fasta_index is not None:
if isinstance(fasta_index, (str, bytes)):
if isinstance(fasta_index, bytes):
fasta_index = fasta_index.decode()
assert os.path.exists(fasta_index)
fasta_index = pysam.FastaFile(fasta_index)
else:
assert isinstance(fasta_index, pysam.FastaFile), type(fasta_index)

self.fasta_index = fasta_index
self.__closed = False
self.header = False
self.__table = table
self._is_bed12 = (not is_gff)

def bed_next(self, line):
"""
:return:
"""

bed12 = BED12(line,
fasta_index=self.fasta_index,
transcriptomic=self.transcriptomic,
max_regression=self._max_regression,
coding=self.coding,
table=self.__table)
return bed12

def gff_next(self, line):
"""
:return:
"""

line = GffLine(line)

if line.feature != "CDS":
return None
# Compatibility with BED12
bed12 = BED12(line,
fasta_index=self.fasta_index,
transcriptomic=self.transcriptomic,
max_regression=self._max_regression,
table=self.__table)
# raise NotImplementedError("Still working on this!")
return bed12

def run(self, *args, **kwargs):
while True:
line = self.rec_queue.get()
if line in ("EXIT", b"EXIT"):
self.rec_queue.put(b"EXIT")
break
try:
line = line.decode()
except AttributeError:
pass
if not self._is_bed12:
row = self.gff_next(line)
else:
row = self.bed_next(line)

if not row or row.header is True:
continue
if row.invalid is True:
self.logger.warn("Invalid entry, reason: %s\n%s",
row.invalid_reason,
row)
continue
self.return_queue.put(msgpack.dumps(row.as_simple_dict()))

# self.join()
Loading

0 comments on commit 335e6a4

Please sign in to comment.