diff --git a/dataload/airr_cell.py b/dataload/airr_cell.py index 8600e46..770f0cc 100644 --- a/dataload/airr_cell.py +++ b/dataload/airr_cell.py @@ -92,6 +92,37 @@ def processAIRRCellFile( self, file_handle, filename ): print("ERROR: Could not link file %s to a valid repertoire"%(filename)) return False + # Look up the repertoire data for the record of interest. This is an array + # and it should be of length 1 + repertoires = self.repository.getRepertoires(repertoire_link_field, + repertoire_link_id) + if not len(repertoires) == 1: + print("ERROR: Could not find unique repertoire for id %s"%(repertoire_link_id)) + return False + repertoire = repertoires[0] + + # Get mapping of the ID fields we want to generate. + map_class = self.getAIRRMap().getRepertoireClass() + rep_id_field = self.getAIRRRepositoryField("repertoire_id", map_class) + data_id_field = self.getAIRRRepositoryField("data_processing_id", map_class) + sample_id_field = self.getAIRRRepositoryField("sample_processing_id", map_class) + + # Cache some data we need to use often. + if rep_id_field in repertoire: + repertoire_id_value = repertoire[rep_id_field] + else: + repertoire_id_value = None + + if data_id_field in repertoire: + data_processing_id_value = repertoire[data_id_field] + else: + data_processing_id_value = None + + if sample_id_field in repertoire: + sample_processing_id_value = repertoire[sample_id_field] + else: + sample_processing_id_value = None + # Get the column of values from the AIRR tag. We only want the # Cell related fields. map_column = self.getAIRRMap().getIRCellMapColumn(airr_tag) @@ -220,7 +251,12 @@ def processAIRRCellFile( self, file_handle, filename ): # Set the relevant IDs for the record being inserted. It updates the dictionary # (passed by reference) and returns False if it fails. If it fails, don't # load any data. - if (not self.checkIDFieldsJSON(cell_dict, repertoire_link_id)): + if (not self.checkIDFieldsJSON(cell_dict, + repertoire_link_field, repertoire_link_id, + repertoire_id_value, + data_processing_id_value, + sample_processing_id_value)): + return False # Create the created and update values for this block of records. Note that diff --git a/dataload/airr_clone.py b/dataload/airr_clone.py index d253694..b2557b1 100644 --- a/dataload/airr_clone.py +++ b/dataload/airr_clone.py @@ -92,6 +92,37 @@ def processAIRRCloneFile( self, file_handle, filename ): print("ERROR: Could not link file %s to a valid repertoire"%(filename)) return False + # Look up the repertoire data for the record of interest. This is an array + # and it should be of length 1 + repertoires = self.repository.getRepertoires(repertoire_link_field, + repertoire_link_id) + if not len(repertoires) == 1: + print("ERROR: Could not find unique repertoire for id %s"%(repertoire_link_id)) + return False + repertoire = repertoires[0] + + # Get mapping of the ID fields we want to generate. + map_class = self.getAIRRMap().getRepertoireClass() + rep_id_field = self.getAIRRRepositoryField("repertoire_id", map_class) + data_id_field = self.getAIRRRepositoryField("data_processing_id", map_class) + sample_id_field = self.getAIRRRepositoryField("sample_processing_id", map_class) + + # Cache some data we need to use often. + if rep_id_field in repertoire: + repertoire_id_value = repertoire[rep_id_field] + else: + repertoire_id_value = None + + if data_id_field in repertoire: + data_processing_id_value = repertoire[data_id_field] + else: + data_processing_id_value = None + + if sample_id_field in repertoire: + sample_processing_id_value = repertoire[sample_id_field] + else: + sample_processing_id_value = None + # Get the column of values from the AIRR tag. We only want the # Clone related fields. map_column = self.getAIRRMap().getIRCloneMapColumn(airr_tag) @@ -274,7 +305,12 @@ def processAIRRCloneFile( self, file_handle, filename ): # Set the relevant IDs for the record being inserted. It updates the dictionary # (passed by reference) and returns False if it fails. If it fails, don't # load any data. - if (not self.checkIDFieldsJSON(clone_dict, repertoire_link_id)): + if (not self.checkIDFieldsJSON(clone_dict, + repertoire_link_field, repertoire_link_id, + repertoire_id_value, + data_processing_id_value, + sample_processing_id_value)): + return False # Create the created and update values for this block of records. Note that diff --git a/dataload/airr_expression.py b/dataload/airr_expression.py index 17ef847..033b4d3 100644 --- a/dataload/airr_expression.py +++ b/dataload/airr_expression.py @@ -98,6 +98,37 @@ def processAIRRExpressionFile( self, file_handle, filename ): print("ERROR: Could not link file %s to a valid repertoire"%(filename)) return False + # Look up the repertoire data for the record of interest. This is an array + # and it should be of length 1 + repertoires = self.repository.getRepertoires(repertoire_link_field, + repertoire_link_id) + if not len(repertoires) == 1: + print("ERROR: Could not find unique repertoire for id %s"%(repertoire_link_id)) + return False + repertoire = repertoires[0] + + # Get mapping of the ID fields we want to generate. + map_class = self.getAIRRMap().getRepertoireClass() + rep_id_field = self.getAIRRRepositoryField("repertoire_id", map_class) + data_id_field = self.getAIRRRepositoryField("data_processing_id", map_class) + sample_id_field = self.getAIRRRepositoryField("sample_processing_id", map_class) + + # Cache some data we need to use often. + if rep_id_field in repertoire: + repertoire_id_value = repertoire[rep_id_field] + else: + repertoire_id_value = None + + if data_id_field in repertoire: + data_processing_id_value = repertoire[data_id_field] + else: + data_processing_id_value = None + + if sample_id_field in repertoire: + sample_processing_id_value = repertoire[sample_id_field] + else: + sample_processing_id_value = None + # Get the column of values from the AIRR tag. We only want the # Expression related fields. map_column = self.getAIRRMap().getExpressionMapColumn(airr_tag) @@ -168,10 +199,10 @@ def processAIRRExpressionFile( self, file_handle, filename ): # Timing stuff t_start = time.perf_counter() # Left in timing code (commented out) in case we want to go back and optimize. - #t_flatten = 0.0 - #t_check = 0.0 - #t_append = 0.0 - #t_copy = 0.0 + t_flatten = 0.0 + t_check = 0.0 + t_append = 0.0 + t_copy = 0.0 # Do some setup, including getting a repository key map cache so we don't have # to look up repository keys all the time. This is a huge overhead without the @@ -266,11 +297,15 @@ def processAIRRExpressionFile( self, file_handle, filename ): # Set the relevant IDs for the record being inserted. It updates the dictionary # (passed by reference) and returns False if it fails. If it fails, don't # load any data. - #t_local_start = time.perf_counter() - if (not self.checkIDFieldsJSON(airr_expression_dict, repertoire_link_id)): + t_local_start = time.perf_counter() + if (not self.checkIDFieldsJSON(airr_expression_dict, + repertoire_link_field, repertoire_link_id, + repertoire_id_value, + data_processing_id_value, + sample_processing_id_value)): return False - #t_local_end = time.perf_counter() - #t_check = t_check + (t_local_end - t_local_start) + t_local_end = time.perf_counter() + t_check = t_check + (t_local_end - t_local_start) # Create the created and update values for this record. Note that # this means that each block of inserts will have the same date. @@ -287,21 +322,21 @@ def processAIRRExpressionFile( self, file_handle, filename ): # We want to insert into mongo in blocks of chunk_size records. block_count = block_count + 1 if block_count == chunk_size: - #t_insert_start = time.perf_counter() + t_insert_start = time.perf_counter() self.repositoryInsertRecords(block_array) - #t_insert_end = time.perf_counter() + t_insert_end = time.perf_counter() t_end = time.perf_counter() - #print("Info: insert time = %f"% (t_insert_end-t_insert_start),flush=True) + print("Info: insert time = %f"% (t_insert_end-t_insert_start),flush=True) #print("Info: flatten time = %f"% (t_flatten),flush=True) - #print("Info: check time = %f"% (t_check),flush=True) + print("Info: check time = %f"% (t_check),flush=True) #print("Info: append time = %f"% (t_append),flush=True) #print("Info: copy time = %f"% (t_copy),flush=True) print("Info: Inserted %d records, time = %f (%f records/s, %f percent)"% (chunk_size, t_end-t_start, chunk_size/(t_end-t_start), (total_records/expression_records)*100),flush=True) #t_flatten = 0 - #t_check = 0 + t_check = 0 #t_append = 0 #t_copy = 0 block_count = 0