diff --git a/reform.py b/reform.py index d164c5a..6b97b7c 100755 --- a/reform.py +++ b/reform.py @@ -6,17 +6,27 @@ from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord + +## Importing gzip or pgzip module for file compression +print("------------------------------------------") +print(f"Compression Library Use:") try: - import pgzip as gzip_module - print("\nUsing pgzip for gzip operations.\n") + import pgzip as gzip_module + print(f"Using pgzip for gzip operations.") except ImportError: - import gzip as gzip_module - print("\npgzip not found, falling back to gzip.\n") + import gzip as gzip_module + print(f"pgzip not found, falling back to gzip.") def main(): ## Retrieve command line arguments and number of iterations in_arg, iterations = get_input_args() + + ## Print reference file paths at start of process + print("------------------------------------------") + print(f"Path to Reference Files:") + print(f"Reference FASTA: {os.path.realpath(in_arg.ref_fasta)}") + print(f"Reference Annotation: {os.path.realpath(in_arg.ref_gff)}") ## List for previous postion and modification length prev_modifications = [] @@ -28,114 +38,204 @@ def main(): ## Sequential processing for index in range(iterations): # Start interation - print("-------------------------------------------") - print(f"Begin modification from in{index+1}.fa") - print("-------------------------------------------") - ## Read the new fasta (to be inserted into the ref genome) - try: - filename_fa = in_arg.in_fasta[index] - if not os.path.exists(filename_fa): - raise FileNotFoundError(f"Error: File {filename_fa} does not exist.") - real_path_fa = os.path.realpath(filename_fa) - record = list(SeqIO.parse(in_arg.in_fasta[index], "fasta"))[0] - except IndexError: - raise ValueError(f"Error: {filename_fa} is not a valid FASTA file.") - except Exception as e: - raise ValueError(f"Error parsing FASTA file: {str(e)}") - print("Preparing to create new FASTA file") - print(f"Original FASTA: {real_path_fa}") - filename_gff = in_arg.in_gff[index] - if not os.path.exists(filename_gff): - raise FileNotFoundError(f"Error: File {filename_gff} does not exist.") - real_path_gff = os.path.realpath(filename_gff) - print("Preparing to create new annotation file") - print(f"Original Annotation: {real_path_gff}") - print() ### print new line - - ## Generate index of sequences from ref reference fasta - if prev_fasta_path: - chrom_seqs = index_fasta(prev_fasta_path) - os.remove(prev_fasta_path) - else: - chrom_seqs = index_fasta(in_arg.ref_fasta) - - ## Obtain the sequence of the chromosome we want to modify - seq = chrom_seqs[in_arg.chrom] - seq_str = str(seq.seq) - - ## Get the position to insert the new sequence - positions = get_position(index, in_arg.position, in_arg.upstream_fasta, in_arg.downstream_fasta, in_arg.chrom, seq_str, prev_modifications) - position = positions['position'] - down_position = positions['down_position'] - - ## Save current modification which include position(index) and length changed. - if position == down_position: - length_changed = len(str(record.seq)) + if hasattr(in_arg, 'chrom') and in_arg.chrom is not None: + ## Modify existing chrom seq + print("-------------------------------------------") + print(f"Begin modification from {in_arg.in_fasta[index]}") + print("-------------------------------------------") + new_fasta, annotation_ext, new_gff_path, prev_fasta_path, prev_gff_path = \ + modify_existing_chrom_seq(in_arg, index, prev_fasta_path, prev_modifications, \ + iterations, prev_gff_path) else: - length_changed = len(str(record.seq)) - (down_position - position - 1) - prev_modifications.append((position,length_changed)) - - if position != down_position: - print("Removing nucleotides from position {} - {}".format(position, down_position)) - print("Proceeding to insert sequence '{}' from {} at position {} on chromsome {}" - .format(record.description, in_arg.in_fasta[index], position, in_arg.chrom)) - - ## Build the new chromosome sequence with the inserted_seq - ## If the chromosome sequence length is in the header, replace it with new length - new_seq = seq_str[:position] + str(record.seq) + seq_str[down_position:] - chrom_length = str(len(seq_str)) - new_length = str(len(new_seq)) - new_record = SeqRecord( - Seq(new_seq), - id=seq.id, - description=seq.description.replace(chrom_length, new_length) + ## Add new chrom seq + print("-------------------------------------------") + print(f"Begin adding a new chromosome from {in_arg.in_fasta[index]}") + print("-------------------------------------------") + new_fasta, annotation_ext, new_gff_path, prev_fasta_path, prev_gff_path = \ + add_new_chrom_seq(in_arg, index, prev_fasta_path, prev_gff_path, iterations) + + print("------------------------------------------") + print(f"Reform Complete") + print(f"New .fa file created: {os.path.realpath(new_fasta)}") + print(f"New {annotation_ext} file created: {os.path.realpath(new_gff_path)}") + print("------------------------------------------") + +def modify_existing_chrom_seq(in_arg, index, prev_fasta_path, prev_modifications, iterations, prev_gff_path): + """ + Modifies a specified and existing chromosome sequence by inserting/replacing a new sequence at a given + position and updates the corresponding FASTA and GFF files. + """ + ## Read FASTA + record, chrom_seqs= read_fasta(in_arg, index, prev_fasta_path) + ## Read annotation (gff/gtf) + check_gff(in_arg, index) + ## Obtain the sequence of the chromosome we want to modify + existing_seq = chrom_seqs[in_arg.chrom] + existing_seq_str = str(existing_seq.seq) + + ## Get the position to insert the new sequence + positions = get_position(index, in_arg.position, in_arg.upstream_fasta, in_arg.downstream_fasta, in_arg.chrom, existing_seq_str, prev_modifications) + position = positions['position'] + down_position = positions['down_position'] + ## Save current modification which include position(index) and length changed. + if position == down_position: + length_changed = len(str(record.seq)) + else: + length_changed = len(str(record.seq)) - (down_position - position - 1) + prev_modifications.append((position,length_changed)) + if position != down_position: + print(f"Removing nucleotides from position {position} - {down_position}") + print(f"Proceeding to insert sequence '{record.description.strip()}' from {in_arg.in_fasta[index]} at position {position} on chromosome {in_arg.chrom}") + ## Build the new chromosome sequence with the inserted_seq + ## If the chromosome sequence length is in the header, replace it with new length + new_seq = existing_seq_str[:position] + str(record.seq) + existing_seq_str[down_position:] + chrom_length = str(len(existing_seq_str)) + new_length = str(len(new_seq)) + new_record = SeqRecord( + Seq(new_seq), + id=existing_seq.id, + description=existing_seq.description.replace(chrom_length, new_length) ) - - ## Create new fasta file with modified chromosome - if index < iterations - 1: - new_fasta_file = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.fa') - new_fasta_file.close() - new_fasta = new_fasta_file.name - prev_fasta_path = new_fasta - else: - ref_basename, _ = get_ref_basename(in_arg.ref_fasta) - ref_name = ref_basename - new_fasta = ref_name + '_reformed.fa' - with open(new_fasta, "w") as f: - for s in chrom_seqs: - if s == seq.id: - SeqIO.write([new_record], f, "fasta") - else: - SeqIO.write([chrom_seqs[s]], f, "fasta") - - ## Read in new GFF features from in_gff - in_gff_lines = get_in_gff_lines(in_arg.in_gff[index]) - - ## Create a temp file for gff, if index is not equal to last iteration - annotation_name, annotation_ext = get_ref_basename(in_arg.ref_gff) - if index < iterations - 1: - temp_gff = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=annotation_ext) - temp_gff_name = temp_gff.name - temp_gff.close() - if prev_gff_path: - new_gff_path = create_new_gff(temp_gff_name, prev_gff_path, in_gff_lines, position, down_position, seq.id, len(str(record.seq))) - os.remove(prev_gff_path) + ## Create new fasta file with modified chromosome + if index < iterations - 1: + new_fasta_file = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.fa') + new_fasta_file.close() + new_fasta = new_fasta_file.name + prev_fasta_path = new_fasta + else: + ref_basename, _ = get_ref_basename(in_arg.ref_fasta) + ref_name = ref_basename + new_fasta = ref_name + '_reformed.fa' + with open(new_fasta, "w") as f: + for s in chrom_seqs: + if s == existing_seq.id: + SeqIO.write([new_record], f, "fasta") else: - new_gff_path = create_new_gff(temp_gff_name, in_arg.ref_gff, in_gff_lines, position, down_position, seq.id, len(str(record.seq))) + SeqIO.write([chrom_seqs[s]], f, "fasta") + ## Read in new GFF features from in_gff, False means modify existing chrom + in_gff_lines = get_in_gff_lines(in_gff=in_arg.in_gff[index], existing_chrom=in_arg.chrom, new_chrom=None) + ## Create a temp file for gff, if index is not equal to last iteration + annotation_name, annotation_ext = get_ref_basename(in_arg.ref_gff) + if index < iterations - 1: + temp_gff = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=annotation_ext) + temp_gff_name = temp_gff.name + temp_gff.close() + if prev_gff_path: + new_gff_path = create_new_gff(temp_gff_name, prev_gff_path, in_gff_lines, position, down_position, existing_seq.id, len(str(record.seq))) + os.remove(prev_gff_path) else: - new_gff_name = annotation_name + '_reformed' + annotation_ext - if prev_gff_path: - new_gff_path = create_new_gff(new_gff_name, prev_gff_path, in_gff_lines, position, down_position, seq.id, len(str(record.seq))) - else: - new_gff_path = create_new_gff(new_gff_name, in_arg.ref_gff, in_gff_lines, position, down_position, seq.id, len(str(record.seq))) - prev_gff_path = new_gff_path - if (index == iterations-1): - print("------------------------------------------") - print("Reform Complete") - print("------------------------------------------") - print(f"New .fa file created: {os.path.realpath(new_fasta)}") - print(f"New {annotation_ext} file created: {os.path.realpath(new_gff_path)}") + new_gff_path = create_new_gff(temp_gff_name, in_arg.ref_gff, in_gff_lines, position, down_position, existing_seq.id, len(str(record.seq))) + else: + new_gff_name = annotation_name + '_reformed' + annotation_ext + if prev_gff_path: + new_gff_path = create_new_gff(new_gff_name, prev_gff_path, in_gff_lines, position, down_position, existing_seq.id, len(str(record.seq))) + else: + new_gff_path = create_new_gff(new_gff_name, in_arg.ref_gff, in_gff_lines, position, down_position, existing_seq.id, len(str(record.seq))) + + prev_gff_path = new_gff_path + + return new_fasta, annotation_ext, new_gff_path, prev_fasta_path, prev_gff_path +def add_new_chrom_seq(in_arg, index, prev_fasta_path, prev_gff_path, iterations): + ## Read FASTA + record, chrom_seqs= read_fasta(in_arg, index, prev_fasta_path) + ## Read annotation (gff/gtf) + check_gff(in_arg, index) + ## Check if new chromosome existed in sequence + if in_arg.new_chrom[index] in chrom_seqs: + raise ValueError(f"Chromosome {in_arg.new_chrom[index]} already exists in the FASTA file.") + ## Build the new chromosome sequence by append new sequence below + ## Using new_chrom as the id of the new chromosome + new_seq = str(record.seq) + new_seq_length = len(new_seq) + new_record = SeqRecord( + Seq(new_seq), + id=in_arg.new_chrom[index], # Use new_chrom + description=f"{in_arg.new_chrom[index]}" + ) + ## Create new fasta file with modified chromosome + if index < iterations - 1: + new_fasta_file = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.fa') + new_fasta_file.close() + new_fasta = new_fasta_file.name + prev_fasta_path = new_fasta + else: + ref_basename, _ = get_ref_basename(in_arg.ref_fasta) + ref_name = ref_basename + new_fasta = ref_name + '_reformed.fa' + ## Write all the original chromosomes first, then new chromosomes. + with open(new_fasta, "w") as f: + for s in chrom_seqs: + SeqIO.write([chrom_seqs[s]], f, "fasta") + SeqIO.write([new_record], f, "fasta") + ## Read in new GFF features from in_gff + ## Pass the new_chrom name from command line and the length of the new sequence to correct ##sequence-region line + in_gff_lines = get_in_gff_lines(in_gff=in_arg.in_gff[index], new_chrom=in_arg.new_chrom[index], sequence_length=new_seq_length) + ## Create a temp file for gff, if index is not equal to last iteration + annotation_name, annotation_ext = get_ref_basename(in_arg.ref_gff) + if index < iterations - 1: + temp_gff = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=annotation_ext) + temp_gff_name = temp_gff.name + temp_gff.close() + if prev_gff_path: + new_gff_path = create_new_gff_for_existing_gff(temp_gff_name, prev_gff_path, in_gff_lines, in_arg.new_chrom[index], new_seq_length) + os.remove(prev_gff_path) + else: + new_gff_path = create_new_gff_for_existing_gff(temp_gff_name, in_arg.ref_gff, in_gff_lines, in_arg.new_chrom[index], new_seq_length) + else: + new_gff_name = annotation_name + '_reformed' + annotation_ext + if prev_gff_path: + new_gff_path = create_new_gff_for_existing_gff(new_gff_name, prev_gff_path, in_gff_lines, in_arg.new_chrom[index], new_seq_length) + else: + new_gff_path = create_new_gff_for_existing_gff(new_gff_name, in_arg.ref_gff, in_gff_lines, in_arg.new_chrom[index], new_seq_length) + prev_gff_path = new_gff_path + return new_fasta, annotation_ext, new_gff_path, prev_fasta_path, prev_gff_path + +def read_fasta(in_arg, index, prev_fasta_path): + """ + Reads the FASTA file, extracts the sequence to be inserted, + and indexes the reference genome chromosome sequences. + """ + ## Read the new fasta (to be inserted into the ref genome) + try: + filename_fa = in_arg.in_fasta[index] + if not os.path.exists(filename_fa): + raise FileNotFoundError(f"Error: File {filename_fa} does not exist.") + real_path_fa = os.path.realpath(filename_fa) + record = list(SeqIO.parse(in_arg.in_fasta[index], "fasta"))[0] + # Check for mismatch between FASTA record ID and command line chromosome name + if hasattr(in_arg, 'new_chrom') and in_arg.new_chrom is not None: + if record.id != in_arg.new_chrom[index]: + print(f"** WARNING: Mismatch detected between chromosome name in input FASTA ({record.id}) " + f"and command line parameter ({in_arg.new_chrom[index]}).") + print(f"Using command line chromosome name: {in_arg.new_chrom[index]}") + # The actual override happens in add_new_chrom_seq where a new SeqRecord is created + + except IndexError: + raise ValueError(f"Error: {filename_fa} is not a valid FASTA file.") + except Exception as e: + raise ValueError(f"Error parsing FASTA file: {str(e)}") + print(f"Preparing to create new FASTA file") + print(f"Input FASTA: {real_path_fa}") + ## Generate index of sequences from ref reference fasta + if prev_fasta_path: + chrom_seqs = index_fasta(prev_fasta_path) + os.remove(prev_fasta_path) + else: + chrom_seqs = index_fasta(in_arg.ref_fasta) + return record, chrom_seqs + +def check_gff(in_arg, index): + """ + Reads the GFF file, verifies its existence, and prints its file path information. + """ + filename_gff = in_arg.in_gff[index] + if not os.path.exists(filename_gff): + raise FileNotFoundError(f"Error: File {filename_gff} does not exist.") + real_path_gff = os.path.realpath(filename_gff) + print("Preparing to create new annotation file") + print(f"Input Annotation: {real_path_gff}") + print() ### print new line def index_fasta(fasta_path): ''' @@ -168,7 +268,6 @@ def get_ref_basename(filepath): name, ext = os.path.splitext(base) return name, ext - def modify_gff_line(elements, start=None, end=None, comment=None): ''' Modifies an existing GFF line and returns the modified line. Currently, you can @@ -191,8 +290,27 @@ def modify_gff_line(elements, start=None, end=None, comment=None): ## Return the modified line return("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}".format(elements[0], elements[1], elements[2], start, end, elements[5], elements[6], elements[7], comment)) - -def get_in_gff_lines(in_gff): + +def valid_gff_line(line_elements): + ''' + Checks if the splited line is a valid GFF line. + Returns True if valid, False otherwise. + ''' + if not line_elements[0].startswith("##sequence-region"): + if len(line_elements) != 9: + print(f"** ERROR: in_gff file does not have 9 columns, it has {len(line_elements)}") + print(line_elements) + return False + else: + ## Check if ##sequence-region line has 4 columns, the reason why use 5 here is because last element is + ## spliting format indicator. + if len(line_elements) != 5: + print(f"** ERROR: ##sequence-region line does not have 4 columns, it has {len(line_elements) - 1}") + print(line_elements) + return False + return True + +def get_in_gff_lines(in_gff=None, existing_chrom=None, new_chrom=None, sequence_length=None): ''' Takes a gff file and returns a list of lists where each parent list item is a single line of the gff file @@ -201,15 +319,51 @@ def get_in_gff_lines(in_gff): with open(in_gff, "r") as f: in_gff_lines = [] for line in f: - line_elements = line.split('\t') - # Skip comment lines - if line.startswith("#"): + # Skip empty lines + if not line.strip(): continue - # Ensure lines have 9 columns - if len(line_elements) != 9: - print("** ERROR: in_gff file does not have 9 columns, it has", len(line_elements)) - print(line_elements) - exit() + + # Handle differently based on whether we're adding a new chromosome or modifying existing + if line.startswith("##sequence-region") and new_chrom is not None: + ## Paste ##sequence-region line which only exists in gtf/gff for adding new chromosome. + ## Select user used delimiter based on content + if '\t' in line: + line_elements = line.split('\t') + line_elements.append('\t') ## Add tab to the end as a format indicator + else: + line_elements = line.split() + line_elements.append(' ') ## Add whitespace to the end as a format indicator + if not valid_gff_line(line_elements): + exit() + # Validate new_chrom value and correct if needed + original_chrom = line_elements[1] + if original_chrom != new_chrom: + print(f"** INFO: Updating chromosome name from {original_chrom} to {new_chrom} to fit the\ + input from new_chrom parameter in command-line.") + line_elements[1] = new_chrom + # Validate sequence_length value and correct if needed + original_start, original_end = line_elements[2], line_elements[3] + if original_start != "1": + print(f"** INFO: Updating start position from {original_start} to 1 to fit the\ + format requirement of annotation file.") + line_elements[2] = "1" + if original_end != str(sequence_length): + print(f"** INFO: Updating sequence length from {original_end} to {sequence_length} to fit the\ + length of sequence in input FASTA file.") + line_elements[3] = str(sequence_length) + elif line.startswith("#"): + ## Ignore other comment lines + continue + else: + ## Split, check and add feature lines + line_elements = line.split('\t') + chorme_id = existing_chrom if existing_chrom else new_chrom + if line_elements[0] != chorme_id: + print(f"** Warning: Mismatch detected between chromosome name in input annotation ({line_elements[0]}) and command line parameter ({chorme_id}).") + print(f"Using command line chromosome name: {chorme_id}") + line_elements[0] = chorme_id + if not valid_gff_line(line_elements): + exit() in_gff_lines.append(line_elements) return in_gff_lines @@ -230,7 +384,7 @@ def get_position(index, positions, upstream, downstream, chrom, seq_str, prev_mo position += lc if position > len(seq_str): print("** ERROR: Position greater than length of chromosome.") - print("Chromosome: {}\Chromosome length: {}\nPosition: \n{}".format(chrom, len(seq_str), position)) + print(f"Chromosome: {chrom}\nChromosome length: {len(seq_str)}\nPosition: {position}") exit() elif position == -1: position = len(seq_str) @@ -257,8 +411,8 @@ def get_position(index, positions, upstream, downstream, chrom, seq_str, prev_mo else: print("** ERROR: The upstream and downstream target sequences must be present and unique in the specified chromosome.") print("Chromosome: {}\n".format(chrom)) - print("Upstream sequence found {} times".format(upstream_seq_count)) - print("Downstream sequence found {} times".format(downstream_seq_count)) + print(f"Upstream sequence found {upstream_seq_count} times") + print(f"Downstream sequence found {downstream_seq_count} times") exit() else: print("** ERROR: You must specify a valid position or upstream and downstream sequences.") @@ -268,36 +422,39 @@ def get_position(index, positions, upstream, downstream, chrom, seq_str, prev_mo exit() return {'position': position, 'down_position': down_position} -def write_in_gff_lines(gff_out, in_gff_lines, position, split_features, sequence_length, chrom): +def calculate_new_length_for_in_gff(in_gff_lines, position, sequence_length): ''' - in_gff_lines: a list of lists where each nested list is a list of - columns (in gff format) associated with each new feature to insert - split_features: contains information about features in the original GFF - file that were split due to the insertion of the new sequence. - sequence_length: length of the inserted sequence, used to determine - the new end positions in the GFF file. + Calculate the new length of the chromosome after modification. + This function checks the start and end positions of the features in the GFF file + and adjusts them based on the insertion position and the length of the inserted sequence. ''' - ## Replace the chromosome ID from in_gff with the correct chromosome ID - for l in in_gff_lines: - l[0] = chrom + ## List to store new gff lines + new_gff_lines = [] # Handling of single-line comments if len(in_gff_lines) == 1: - l = in_gff_lines[0] - ## Check length - ## l[3] is start position of fasta in in.gtf and l[4] is end position - seq_id = l[0] - if int(l[4]) - int(l[3]) + 1 != sequence_length: - print(f"** WARNING: Inconsistent length for {seq_id}. Correcting start position to 1 and end position to {sequence_length}.") - ## Correct start(l[3]) to 1 and end(l[4]) to length of insert fasta - new_gff_line = modify_gff_line( - l, start=1 + position, end=sequence_length + position) - gff_out.write(new_gff_line) + ## Check if the line isn't a comment line + if in_gff_lines[0][0].startswith("##sequence-region"): + new_gff_lines.append(in_gff_lines[0]) + else: + l = in_gff_lines[0] + ## Check length + ## l[3] is start position of fasta in in.gtf and l[4] is end position + seq_id = l[0] + if int(l[4]) - int(l[3]) + 1 != sequence_length: + print(f"** WARNING: Annotation start and end positions do not match the sequence length in the FASTA input. Adjusting to match the input sequence: start=1, end={sequence_length}.\n→ Affected annotation: {in_gff_lines}") + ## Correct start(l[3]) to 1 and end(l[4]) to length of insert fasta + new_gff_line = modify_gff_line( + l, start=1 + position, end=sequence_length + position) + new_gff_lines.append(new_gff_line) # Handling of multiple-line comments else: ### Step1: extract all start and end into corresponding set() start_positions = set() end_positions = set() for l in in_gff_lines: + ## Check length and ignore comment lines + if l[0].startswith("##sequence-region"): + continue start_positions.add(int(l[3])) end_positions.add(int(l[4])) ### Step2: Find min start and max end @@ -314,11 +471,32 @@ def write_in_gff_lines(gff_out, in_gff_lines, position, split_features, sequence ### Step4: Adjust start and end. Offset will be 0 if no adjust need offset = min_start - 1 for l in in_gff_lines: + ## Update length and ignore comment lines for lenght calculation + if l[0].startswith("##sequence-region"): + new_gff_lines.append(l) + continue ### Step5: Correct start(l[3]) and end(l[4]) by minus offset new_gff_line = modify_gff_line( l, start = int(l[3]) - offset + position, end = int(l[4]) - offset + position) - gff_out.write(new_gff_line) + new_gff_lines.append(new_gff_line) + return new_gff_lines +def write_in_gff_lines(gff_out, in_gff_lines, position, split_features, sequence_length, chrom): + ''' + in_gff_lines: a list of lists where each nested list is a list of + columns (in gff format) associated with each new feature to insert + split_features: contains information about features in the original GFF + file that were split due to the insertion of the new sequence. + sequence_length: length of the inserted sequence, used to determine + the new end positions in the GFF file. + ''' + ## Replace the chromosome ID from in_gff with the correct chromosome ID + for l in in_gff_lines: + l[0] = chrom + ## Check if the lenght in_gff_lines are valid, and correct if needed + new_gff_lines = calculate_new_length_for_in_gff(in_gff_lines, position, sequence_length) + for gff_line in new_gff_lines: + gff_out.write(gff_line) ## If insertion caused any existing features to be split, add ## the split features now immediately after adding the new features for sf in split_features: @@ -359,15 +537,22 @@ def create_new_gff(new_gff_name, ref_gff, in_gff_lines, position, down_position, ref_gff_path = tmp_f.name with open(ref_gff_path, "r") as f: for line in f: - line_elements = line.split('\t') + # For header lines, handle both tab and space-delimited formats if line.startswith("#"): - if line_elements[0] == "##sequence-region" and line_elements[1] == chrom_id: + if '\t' in line: + line_elements = line.split('\t') + else: + line_elements = line.split() + + if line.startswith("##sequence-region") and line_elements[1] == chrom_id: ## Edit the length of the chromosome original_length = int(line_elements[3]) - new_length = original_length - (down_position - position) + new_seq_length + new_length = calculate_new_length(original_length, position, down_position, new_seq_length) line = line.replace(str(original_length), str(new_length)) gff_out.write(line) else: + # Regular feature lines are always tab-delimited + line_elements = line.split('\t') gff_chrom_id = line_elements[0] gff_feat_start = int(line_elements[3]) gff_feat_end = int(line_elements[4]) @@ -403,7 +588,7 @@ def create_new_gff(new_gff_name, ref_gff, in_gff_lines, position, down_position, # "chromosome" or "region") elif gff_feat_type in ['chromosome', 'region']: original_length = gff_feat_end - new_length = original_length - (down_position - position) + new_seq_length + new_length = calculate_new_length(original_length, position, down_position, new_seq_length) line = line.replace(str(original_length), str(new_length)) gff_out.write(line) @@ -449,8 +634,7 @@ def create_new_gff(new_gff_name, ref_gff, in_gff_lines, position, down_position, elif gff_feat_start <= position and gff_feat_end <= down_position: # Which side of the feature depends on the strand (we add this as a comment) x = "3" if gff_feat_strand == "+" else "5" - print("Feature cut off - {} prime side of feature cut off ({} strand)" - .format(x, gff_feat_strand)) + print(f"Feature cut off - {x} prime side of feature cut off ({gff_feat_strand} strand)") new_comment = format_comment( "{} prime side of feature cut-off by inserted sequence".format(x), gff_ext @@ -478,8 +662,7 @@ def create_new_gff(new_gff_name, ref_gff, in_gff_lines, position, down_position, and gff_feat_start <= down_position and gff_feat_end > down_position): x = "5" if gff_feat_strand == "+" else "3" - print("Feature cut off - {} prime side of feature cut off ({} strand)" - .format(x, gff_feat_strand)) + print(f"Feature cut off - {x} prime side of feature cut off ({gff_feat_strand} strand)") new_comment = format_comment( "{} prime side of feature cut-off by inserted sequence".format(x), gff_ext @@ -503,7 +686,7 @@ def create_new_gff(new_gff_name, ref_gff, in_gff_lines, position, down_position, gff_out.write(modified_line) else: - print("** Error: Unknown case for GFF modification. Exiting " + str(line_elements)) + print(f"** Error: Unknown case for GFF modification. Exiting {line_elements}") exit() # If we've iterated over the entire original gff @@ -528,6 +711,57 @@ def create_new_gff(new_gff_name, ref_gff, in_gff_lines, position, down_position, os.remove(ref_gff_path) return new_gff_name +def create_new_gff_for_existing_gff(new_gff_name, ref_gff, in_gff_lines, chrom_id, sequence_length): + """ + Appends new annotations to an existing GFF file without modifying existing features. + """ + gff_splitor = '' + ref_gff_path = ref_gff + ## Handle compressed .gz GFF files + if ref_gff.endswith('.gz'): + with gzip_module.open(ref_gff, 'rt') as f: + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp_f: + for line in f: + tmp_f.write(line) + tmp_f.flush() + ref_gff_path = tmp_f.name + ## Open new GFF file for writing + with open(new_gff_name, "w") as gff_out: + ## Copy all existing annotations to new GFF file + with open(ref_gff_path, "r", encoding="utf-8") as f: + for line in f: + if line.startswith("##sequence-region") and gff_splitor == '': + ## Select user used delimiter based on content + if '\t' in line: + gff_splitor = ('\t') + else: + gff_splitor = (' ') + gff_out.write(line) + ## Append new annotations if present + if in_gff_lines: + ## Call calculate_new_length_for_in_gff to correct the length of the chromosome + ## Since we are not modifying existing features, we can set position to 0 + new_gff_lines = calculate_new_length_for_in_gff(in_gff_lines, 0, sequence_length) + print(f"Appending {len(in_gff_lines)} new annotations to chromosome {chrom_id}.") + for new_annotation in new_gff_lines: + if new_annotation[0] == "##sequence-region": + ## Use predefined format for sequence-region line + if gff_splitor == '': + gff_splitor = new_annotation[-1] + ## Remove format indicater, and add new line + gff_out.write(gff_splitor.join(new_annotation[:-1])+'\n') + elif new_annotation: + gff_out.write(new_annotation) + + ## Cleanup temp file if needed + if ref_gff.endswith('.gz') and ref_gff_path != ref_gff: + try: + os.remove(ref_gff_path) + except Exception as e: + print(f"Warning: Failed to delete temp file {ref_gff_path}: {e}") + + return new_gff_name + def format_comment(comment, ext): ''' Format comment according to ext (GFF or GTF) and return @@ -537,7 +771,7 @@ def format_comment(comment, ext): elif ext.lower().startswith('gff'): new_comment = "; reform_comment={}".format(comment) else: - print("** Error: Unrecognized extension {} in format_comment(). Exiting".format(ext)) + print(f"** Error: Unrecognized extension {ext} in format_comment(). Exiting") exit() return new_comment @@ -549,22 +783,31 @@ def rename_id(line): attributes = line.split('\t')[8].strip() elements = attributes.split(';') if elements[0].startswith("ID="): - print("Renaming split feature {} --> {}_split".format(elements[0], elements[0])) + print(f"Renaming split feature {elements[0]} --> {elements[0]}_split") return ("{}_split;{}".format(elements[0], ';'.join(elements[1:]))) elif elements[0].startswith("gene_id "): gene_id = re.match(r'gene_id \"(.+)\"', elements[0])[1] - print('Renaming split feature {} --> {}_split'.format(gene_id, gene_id)) + print(f"Renaming split feature {gene_id} --> {gene_id}_split") return ('gene _id "{}_split";{}'.format(gene_id, ';'.join(elements[1:]))) else: - print("This feature will not be renamed because it does not has an ID/gene_id attribute:\n", line) + print(f"This feature will not be renamed because it does not have an ID/gene_id attribute:\n{line}") return attributes - + +def calculate_new_length(original_length, position, down_position, new_seq_length): + """ + Calculate the new chromosome/region length after a modification. + Returns: The new calculated length + """ + return original_length - (down_position - position) + new_seq_length + def get_input_args(): parser = argparse.ArgumentParser() - - parser.add_argument('--chrom', type = str, required = True, - help = "Chromosome name (String)") + chrom_group = parser.add_mutually_exclusive_group(required=True) + chrom_group.add_argument('--chrom', type=str, + help="Chromosome name (String)") + chrom_group.add_argument('--new_chrom', type=str, + help="Comma-separated new chromosome name (String)") parser.add_argument('--in_fasta', type=str, required=True, help="Path(s) to new sequence(s) to be inserted into reference genome in fasta format") parser.add_argument('--in_gff', type=str, required=True, @@ -583,35 +826,46 @@ def get_input_args(): in_args = parser.parse_args() in_args.in_fasta = in_args.in_fasta.split(',') in_args.in_gff = in_args.in_gff.split(',') - if in_args.upstream_fasta: - in_args.upstream_fasta = in_args.upstream_fasta.split(',') - if in_args.downstream_fasta: - in_args.downstream_fasta = in_args.downstream_fasta.split(',') - - if in_args.position is None and (in_args.upstream_fasta is None or in_args.downstream_fasta is None): - print("** Error: You must provide either the position, or the upstream and downstream sequences.") - exit() - - if not in_args.position and len(in_args.upstream_fasta) != len(in_args.downstream_fasta): - print("** Error: The number of upstream_fasta and downstream_fasta files does not match.") + if (len(in_args.in_fasta) != len(in_args.in_gff)): + print("** Error: The number of inserted FASTA files does not match the number of annotation files, or their counts and positions do not align.") exit() - - if in_args.position is not None: - try: - in_args.position = list(map(int, in_args.position.split(','))) - iterations = iterations = len(in_args.position) - except ValueError: - print("** Error: Position must be a comma-separated list of integers, like 1,5,-1.") + else: + iterations = len(in_args.in_fasta) + ## Modify existing chrom + if in_args.chrom: + if in_args.upstream_fasta: + in_args.upstream_fasta = in_args.upstream_fasta.split(',') + if in_args.downstream_fasta: + in_args.downstream_fasta = in_args.downstream_fasta.split(',') + if in_args.position is None and (in_args.upstream_fasta is None or in_args.downstream_fasta is None): + print("** Error: You must provide either the position, or the upstream and downstream sequences.") exit() + if in_args.position is not None: + try: + in_args.position = list(map(int, in_args.position.split(','))) + except ValueError: + print("** Error: Position must be a comma-separated list of integers, like 1,5,-1.") + exit() + if iterations != len(in_args.position): + print("** Error: Position must be a equal to number of input FASTA") + exit() + else: + if iterations != len(in_args.upstream_fasta): + print("** Error: Upstream FASTA must be a equal to number of input FASTA") + exit() + if not in_args.position and len(in_args.upstream_fasta) != len(in_args.downstream_fasta): + print("** Error: The number of upstream_fasta and downstream_fasta files does not match.") + exit() + ## Add new chrom else: - iterations = len(in_args.upstream_fasta) - - if (len(in_args.in_fasta) != len(in_args.in_gff)) or (len(in_args.in_fasta) != iterations): - print("** Error: The number of inserted FASTA files does not match the number of GTF files, or their counts and positions do not align.") - exit() - + if in_args.position or in_args.upstream_fasta or in_args.downstream_fasta: + parser.error("** Error: When using --new_chrom, you cannot provide --position, --upstream_fasta, or --downstream_fasta.") + exit() + ## Convert new_chrom from string to list + in_args.new_chrom = [x.strip() for x in in_args.new_chrom.split(',')] + return in_args, iterations if __name__ == "__main__": main() - + diff --git a/test_data/17/gold.fa b/test_data/17/gold.fa new file mode 100644 index 0000000..79a60e4 --- /dev/null +++ b/test_data/17/gold.fa @@ -0,0 +1,4 @@ +>X +ZZZZABBBBBDDDDDCCCCCIIIIIKKKKK +>Y +AAAATTTTGGGGCCCC diff --git a/test_data/17/gold.gtf b/test_data/17/gold.gtf new file mode 100644 index 0000000..83f530e --- /dev/null +++ b/test_data/17/gold.gtf @@ -0,0 +1,8 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +Y ref exon 1 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref CDS 4 14 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref start_codon 1 3 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref stop_codon 14 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; diff --git a/test_data/17/in.fa b/test_data/17/in.fa new file mode 100644 index 0000000..2b55705 --- /dev/null +++ b/test_data/17/in.fa @@ -0,0 +1,2 @@ +>Y +AAAATTTTGGGGCCCC diff --git a/test_data/17/in.gtf b/test_data/17/in.gtf new file mode 100644 index 0000000..7dc556d --- /dev/null +++ b/test_data/17/in.gtf @@ -0,0 +1,4 @@ +Y ref exon 1 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref CDS 4 14 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref start_codon 1 3 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref stop_codon 14 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; diff --git a/test_data/17/ref.fa b/test_data/17/ref.fa new file mode 100644 index 0000000..6fa4d80 --- /dev/null +++ b/test_data/17/ref.fa @@ -0,0 +1,2 @@ +>X +ZZZZABBBBBDDDDDCCCCCIIIIIKKKKK diff --git a/test_data/17/ref.gtf b/test_data/17/ref.gtf new file mode 100644 index 0000000..6737be8 --- /dev/null +++ b/test_data/17/ref.gtf @@ -0,0 +1,4 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; diff --git a/test_data/18/gold.fa b/test_data/18/gold.fa new file mode 100644 index 0000000..0194856 --- /dev/null +++ b/test_data/18/gold.fa @@ -0,0 +1,8 @@ +>X +ZZZZABBBBBDDDDDCCCCCIIIIIKKKKK +>Y +AAAATTTTGGGGCCCC +>H +GGGGAATTCCCCGGGG +>M +CCCCGGGGAAAATTTT diff --git a/test_data/18/gold.gtf b/test_data/18/gold.gtf new file mode 100644 index 0000000..29476bf --- /dev/null +++ b/test_data/18/gold.gtf @@ -0,0 +1,18 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +##sequence-region Y 1 16 +Y ref exon 1 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref CDS 4 14 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref start_codon 1 3 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref stop_codon 14 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +##sequence-region H 1 16 +H ref exon 1 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref CDS 4 14 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref start_codon 1 3 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref stop_codon 14 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +M ref exon 1 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref CDS 4 14 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref start_codon 1 3 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref stop_codon 14 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; diff --git a/test_data/18/in1.fa b/test_data/18/in1.fa new file mode 100644 index 0000000..2b55705 --- /dev/null +++ b/test_data/18/in1.fa @@ -0,0 +1,2 @@ +>Y +AAAATTTTGGGGCCCC diff --git a/test_data/18/in1.gtf b/test_data/18/in1.gtf new file mode 100644 index 0000000..326d9fa --- /dev/null +++ b/test_data/18/in1.gtf @@ -0,0 +1,5 @@ +##sequence-region Y.11 1 16 +Y ref exon 1 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref CDS 4 14 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref start_codon 1 3 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref stop_codon 14 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; diff --git a/test_data/18/in2.fa b/test_data/18/in2.fa new file mode 100644 index 0000000..0b5cf0c --- /dev/null +++ b/test_data/18/in2.fa @@ -0,0 +1,2 @@ +>H +GGGGAATTCCCCGGGG diff --git a/test_data/18/in2.gtf b/test_data/18/in2.gtf new file mode 100644 index 0000000..9e0b425 --- /dev/null +++ b/test_data/18/in2.gtf @@ -0,0 +1,5 @@ +##sequence-region F.11 3 27 +H ref exon 1 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref CDS 4 14 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref start_codon 1 3 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref stop_codon 14 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; diff --git a/test_data/18/in3.fa b/test_data/18/in3.fa new file mode 100644 index 0000000..38306a6 --- /dev/null +++ b/test_data/18/in3.fa @@ -0,0 +1,2 @@ +>M +CCCCGGGGAAAATTTT diff --git a/test_data/18/in3.gtf b/test_data/18/in3.gtf new file mode 100644 index 0000000..ee9257c --- /dev/null +++ b/test_data/18/in3.gtf @@ -0,0 +1,4 @@ +M ref exon 1 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref CDS 4 14 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref start_codon 1 3 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref stop_codon 14 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; diff --git a/test_data/18/ref.fa b/test_data/18/ref.fa new file mode 100644 index 0000000..6fa4d80 --- /dev/null +++ b/test_data/18/ref.fa @@ -0,0 +1,2 @@ +>X +ZZZZABBBBBDDDDDCCCCCIIIIIKKKKK diff --git a/test_data/18/ref.gtf b/test_data/18/ref.gtf new file mode 100644 index 0000000..6737be8 --- /dev/null +++ b/test_data/18/ref.gtf @@ -0,0 +1,4 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; diff --git a/test_data/19/gold.fa b/test_data/19/gold.fa new file mode 100644 index 0000000..0194856 --- /dev/null +++ b/test_data/19/gold.fa @@ -0,0 +1,8 @@ +>X +ZZZZABBBBBDDDDDCCCCCIIIIIKKKKK +>Y +AAAATTTTGGGGCCCC +>H +GGGGAATTCCCCGGGG +>M +CCCCGGGGAAAATTTT diff --git a/test_data/19/gold.gtf b/test_data/19/gold.gtf new file mode 100644 index 0000000..29476bf --- /dev/null +++ b/test_data/19/gold.gtf @@ -0,0 +1,18 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +##sequence-region Y 1 16 +Y ref exon 1 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref CDS 4 14 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref start_codon 1 3 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Y ref stop_codon 14 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +##sequence-region H 1 16 +H ref exon 1 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref CDS 4 14 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref start_codon 1 3 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref stop_codon 14 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +M ref exon 1 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref CDS 4 14 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref start_codon 1 3 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref stop_codon 14 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; diff --git a/test_data/19/in1.fa b/test_data/19/in1.fa new file mode 100644 index 0000000..53a0c2f --- /dev/null +++ b/test_data/19/in1.fa @@ -0,0 +1,2 @@ +>Z +AAAATTTTGGGGCCCC diff --git a/test_data/19/in1.gtf b/test_data/19/in1.gtf new file mode 100644 index 0000000..26db7ce --- /dev/null +++ b/test_data/19/in1.gtf @@ -0,0 +1,5 @@ +##sequence-region Y.11 1 16 +Z ref exon 1 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Z ref CDS 4 14 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Z ref start_codon 1 3 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; +Z ref stop_codon 14 16 . + 0 gene_id "new_gene"; transcript_id "new_gene.1"; diff --git a/test_data/19/in2.fa b/test_data/19/in2.fa new file mode 100644 index 0000000..baf880f --- /dev/null +++ b/test_data/19/in2.fa @@ -0,0 +1,2 @@ +>Y +GGGGAATTCCCCGGGG diff --git a/test_data/19/in2.gtf b/test_data/19/in2.gtf new file mode 100644 index 0000000..9e0b425 --- /dev/null +++ b/test_data/19/in2.gtf @@ -0,0 +1,5 @@ +##sequence-region F.11 3 27 +H ref exon 1 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref CDS 4 14 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref start_codon 1 3 . + 0 gene_id "gene2"; transcript_id "gene2.1"; +H ref stop_codon 14 16 . + 0 gene_id "gene2"; transcript_id "gene2.1"; diff --git a/test_data/19/in3.fa b/test_data/19/in3.fa new file mode 100644 index 0000000..38306a6 --- /dev/null +++ b/test_data/19/in3.fa @@ -0,0 +1,2 @@ +>M +CCCCGGGGAAAATTTT diff --git a/test_data/19/in3.gtf b/test_data/19/in3.gtf new file mode 100644 index 0000000..46fe5e2 --- /dev/null +++ b/test_data/19/in3.gtf @@ -0,0 +1,5 @@ +##Test Data +M ref exon 1 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +K ref CDS 4 14 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +Z ref start_codon 1 3 . + 0 gene_id "gene4"; transcript_id "gene4.1"; +M ref stop_codon 14 16 . + 0 gene_id "gene4"; transcript_id "gene4.1"; diff --git a/test_data/19/ref.fa b/test_data/19/ref.fa new file mode 100644 index 0000000..6fa4d80 --- /dev/null +++ b/test_data/19/ref.fa @@ -0,0 +1,2 @@ +>X +ZZZZABBBBBDDDDDCCCCCIIIIIKKKKK diff --git a/test_data/19/ref.gtf b/test_data/19/ref.gtf new file mode 100644 index 0000000..6737be8 --- /dev/null +++ b/test_data/19/ref.gtf @@ -0,0 +1,4 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; diff --git a/test_data/20/gold.fa b/test_data/20/gold.fa new file mode 100644 index 0000000..26b1a4a --- /dev/null +++ b/test_data/20/gold.fa @@ -0,0 +1,4 @@ +>X +XZZZABBBBBDDDDDCCCCCIIIIIKKKKK +>Y +---------- diff --git a/test_data/20/gold.gtf b/test_data/20/gold.gtf new file mode 100644 index 0000000..789a83d --- /dev/null +++ b/test_data/20/gold.gtf @@ -0,0 +1,5 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +Y new exon 1 10 . + 0 gene_id "new"; transcript_id "new.1"; diff --git a/test_data/20/in.fa b/test_data/20/in.fa new file mode 100644 index 0000000..fa2f7d3 --- /dev/null +++ b/test_data/20/in.fa @@ -0,0 +1,2 @@ +>test 1 +---------- diff --git a/test_data/20/in.gtf b/test_data/20/in.gtf new file mode 100644 index 0000000..7beb1a8 --- /dev/null +++ b/test_data/20/in.gtf @@ -0,0 +1 @@ +X new exon 1 3 . + 0 gene_id "new"; transcript_id "new.1"; diff --git a/test_data/20/ref.fa b/test_data/20/ref.fa new file mode 100644 index 0000000..b73c712 --- /dev/null +++ b/test_data/20/ref.fa @@ -0,0 +1,2 @@ +>X +XZZZABBBBBDDDDDCCCCCIIIIIKKKKK diff --git a/test_data/20/ref.gtf b/test_data/20/ref.gtf new file mode 100644 index 0000000..6737be8 --- /dev/null +++ b/test_data/20/ref.gtf @@ -0,0 +1,4 @@ +X ref exon 5 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref CDS 8 22 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref start_codon 5 7 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; +X ref stop_codon 23 25 . + 0 gene_id "ref_gene"; transcript_id "ref_gene.1"; diff --git a/test_order_explanation.py b/test_order_explanation.py new file mode 100644 index 0000000..253c36b --- /dev/null +++ b/test_order_explanation.py @@ -0,0 +1,14 @@ +import unittest + +class TestOrderExample(unittest.TestCase): + def test_z_third(self): + print("This runs third despite being defined first") + + def test_a_first(self): + print("This runs first despite being defined second") + + def test_b_second(self): + print("This runs second despite being defined third") + +if __name__ == '__main__': + unittest.main() diff --git a/test_reform.py b/test_reform.py index a10d75e..fb33299 100644 --- a/test_reform.py +++ b/test_reform.py @@ -19,13 +19,18 @@ def cleanup(self): os.system(cleanup_command) print("Done") - def test_case_1(self): + def test_case_01(self): """ Case 1: Indel within a feature resulting in deleted sequence, inserted sequence, and splitting of existing features """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/1/') @@ -59,9 +64,14 @@ def test_case_1(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_2(self): + def test_case_02(self): """ Case 2: Indel resulting in truncating 3' side of feature. @@ -69,6 +79,11 @@ def test_case_2(self): of feature. All but first position of original feature remains. """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/2/') @@ -102,9 +117,14 @@ def test_case_2(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_3(self): + def test_case_03(self): """ Case 3: Deletion resulting in removal of entire feature. @@ -112,6 +132,11 @@ def test_case_3(self): at last position of feature. """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/3/') @@ -145,14 +170,24 @@ def test_case_3(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_4(self): + def test_case_04(self): """ Case 4: Indel causing feature split. """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/4/') @@ -186,9 +221,14 @@ def test_case_4(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_5(self): + def test_case_05(self): """ Case 5: 1-base deletion, 10 base insertion. @@ -197,6 +237,11 @@ def test_case_5(self): remainder of the feature to be offset 9 bases downstream. """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/5/') @@ -230,9 +275,14 @@ def test_case_5(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_6(self): + def test_case_06(self): """ Case 6: 2 bp deletion, 10 bp insertion. @@ -241,6 +291,11 @@ def test_case_6(self): by 8 bases. """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/6/') @@ -274,9 +329,14 @@ def test_case_6(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_7(self): + def test_case_07(self): """ Case 7: Simple insertion using the position argument. @@ -284,6 +344,11 @@ def test_case_7(self): offset by length of insertion. """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/7/') @@ -316,14 +381,24 @@ def test_case_7(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_8(self): + def test_case_08(self): """ Case 8: Testing truncating features on reverse strand """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/8/') @@ -357,14 +432,24 @@ def test_case_8(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) - def test_case_9(self): + def test_case_09(self): """ Case 9: Insertion using the position argument at position 0 """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/9/') @@ -397,6 +482,11 @@ def test_case_9(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) def test_case_10(self): @@ -406,6 +496,11 @@ def test_case_10(self): (end of chromosome) """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/10/') @@ -438,6 +533,11 @@ def test_case_10(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) def test_case_11(self): @@ -446,6 +546,11 @@ def test_case_11(self): Testing GTF3 comment format (all above are GTF) """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/11/') @@ -478,6 +583,11 @@ def test_case_11(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) @@ -487,6 +597,11 @@ def test_case_12(self): Testing Case 2 with .gz ref sequence input """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/12/') @@ -520,6 +635,11 @@ def test_case_12(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) def test_case_13(self): @@ -528,6 +648,11 @@ def test_case_13(self): Testing Case 10 with .gz ref sequence input """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/13/') @@ -560,6 +685,11 @@ def test_case_13(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) def test_case_14(self): @@ -568,6 +698,11 @@ def test_case_14(self): Testing Sequential Processing which use multiple up.fa and down.fa files """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/14/') @@ -601,6 +736,11 @@ def test_case_14(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) @@ -610,6 +750,11 @@ def test_case_15(self): Testing Sequential Processing which combine test case 9, 10, 11 """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/15/') @@ -642,6 +787,11 @@ def test_case_15(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) def test_case_16(self): @@ -650,6 +800,11 @@ def test_case_16(self): Testing Reform which invalid chrom """ + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + wd = os.getcwd() os.chdir('test_data/16/') @@ -682,6 +837,210 @@ def test_case_16(self): self.assertListEqual(list(gold_fa), list(new_fa)) print("Done") + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + + os.chdir(wd) + + def test_case_17(self): + """ + Case 17: + Testing Reform which adding new chrom + """ + + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + + wd = os.getcwd() + os.chdir('test_data/17/') + + command = """ + python3 ../../reform.py \ + --new_chrom="Y" \ + --in_fasta=in.fa \ + --in_gff=in.gtf \ + --ref_fasta=ref.fa \ + --ref_gff=ref.gtf + """ + + response = subprocess.getoutput(command) + print(response) + + with open('gold.gtf', 'r') as f: + gold_gff = f.read() + with open('ref_reformed.gtf', 'r') as f: + new_gff = f.read() + print("Testing gtf") + self.assertListEqual(list(gold_gff), list(new_gff)) + print("Done") + + with open('gold.fa', 'r') as f: + gold_fa = f.read() + with open('ref_reformed.fa', 'r') as f: + new_fa = f.read() + print("Testing Fasta") + self.assertListEqual(list(gold_fa), list(new_fa)) + print("Done") + + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + + os.chdir(wd) + + def test_case_18(self): + """ + Case 18: + Testing Reform which adding multiple new chroms + """ + + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + + wd = os.getcwd() + os.chdir('test_data/18/') + + command = """ + python3 ../../reform.py \ + --new_chrom="Y,H,M" \ + --in_fasta=in1.fa,in2.fa,in3.fa \ + --in_gff=in1.gtf,in2.gtf,in3.gtf \ + --ref_fasta=ref.fa \ + --ref_gff=ref.gtf + """ + + response = subprocess.getoutput(command) + print(response) + + with open('gold.gtf', 'r') as f: + gold_gff = f.read() + with open('ref_reformed.gtf', 'r') as f: + new_gff = f.read() + print("Testing gtf") + self.assertListEqual(list(gold_gff), list(new_gff)) + print("Done") + + with open('gold.fa', 'r') as f: + gold_fa = f.read() + with open('ref_reformed.fa', 'r') as f: + new_fa = f.read() + print("Testing Fasta") + self.assertListEqual(list(gold_fa), list(new_fa)) + print("Done") + + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + + os.chdir(wd) + + def test_case_19(self): + """ + Case 19: + Testing Reform with incorrect new chrom and comments in input FASTA + and annotation files. Also, testing reform with more formal printing + """ + + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + + wd = os.getcwd() + os.chdir('test_data/19/') + + command = """ + python3 ../../reform.py \ + --new_chrom="Y,H,M" \ + --in_fasta=in1.fa,in2.fa,in3.fa \ + --in_gff=in1.gtf,in2.gtf,in3.gtf \ + --ref_fasta=ref.fa \ + --ref_gff=ref.gtf + """ + + response = subprocess.getoutput(command) + print(response) + + with open('gold.gtf', 'r') as f: + gold_gff = f.read() + with open('ref_reformed.gtf', 'r') as f: + new_gff = f.read() + print("Testing gtf") + self.assertListEqual(list(gold_gff), list(new_gff)) + print("Done") + + with open('gold.fa', 'r') as f: + gold_fa = f.read() + with open('ref_reformed.fa', 'r') as f: + new_fa = f.read() + print("Testing Fasta") + self.assertListEqual(list(gold_fa), list(new_fa)) + print("Done") + + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + + os.chdir(wd) + + def test_case_20(self): + """ + Case 20: + Simple addition using the position argument. + Test auto correction for wrong input fasta name, + wrong input gtf name and length + """ + + print() + print("------------------------------------------") + print("Test Start") + print("------------------------------------------") + + wd = os.getcwd() + os.chdir('test_data/20/') + + command = """ + python3 ../../reform.py \ + --new_chrom="Y" \ + --in_fasta=in.fa \ + --in_gff=in.gtf \ + --ref_fasta=ref.fa \ + --ref_gff=ref.gtf \ + """ + + response = subprocess.getoutput(command) + print(response) + + with open('gold.gtf', 'r') as f: + gold_gff = f.read() + with open('ref_reformed.gtf', 'r') as f: + new_gff = f.read() + print("Testing GTF") + self.assertListEqual(list(gold_gff), list(new_gff)) + print("Done") + + with open('gold.fa', 'r') as f: + gold_fa = f.read() + with open('ref_reformed.fa', 'r') as f: + new_fa = f.read() + print("Testing Fasta") + self.assertListEqual(list(gold_fa), list(new_fa)) + print("Done") + + print("------------------------------------------") + print("Test Done") + print("------------------------------------------") + print() + os.chdir(wd) if __name__ == '__main__':