Skip to content

Commit

Permalink
Changed use of checkIDFieldsJSON, caller now provides more info
Browse files Browse the repository at this point in the history
so method doesn't have to do DB look ups.
  • Loading branch information
bcorrie committed Nov 25, 2022
1 parent 6fd227d commit 151183b
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 15 deletions.
38 changes: 37 additions & 1 deletion dataload/airr_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,37 @@ def processAIRRCellFile( self, file_handle, filename ):
print("ERROR: Could not link file %s to a valid repertoire"%(filename))
return False

# Look up the repertoire data for the record of interest. This is an array
# and it should be of length 1
repertoires = self.repository.getRepertoires(repertoire_link_field,
repertoire_link_id)
if not len(repertoires) == 1:
print("ERROR: Could not find unique repertoire for id %s"%(repertoire_link_id))
return False
repertoire = repertoires[0]

# Get mapping of the ID fields we want to generate.
map_class = self.getAIRRMap().getRepertoireClass()
rep_id_field = self.getAIRRRepositoryField("repertoire_id", map_class)
data_id_field = self.getAIRRRepositoryField("data_processing_id", map_class)
sample_id_field = self.getAIRRRepositoryField("sample_processing_id", map_class)

# Cache some data we need to use often.
if rep_id_field in repertoire:
repertoire_id_value = repertoire[rep_id_field]
else:
repertoire_id_value = None

if data_id_field in repertoire:
data_processing_id_value = repertoire[data_id_field]
else:
data_processing_id_value = None

if sample_id_field in repertoire:
sample_processing_id_value = repertoire[sample_id_field]
else:
sample_processing_id_value = None

# Get the column of values from the AIRR tag. We only want the
# Cell related fields.
map_column = self.getAIRRMap().getIRCellMapColumn(airr_tag)
Expand Down Expand Up @@ -220,7 +251,12 @@ def processAIRRCellFile( self, file_handle, filename ):
# Set the relevant IDs for the record being inserted. It updates the dictionary
# (passed by reference) and returns False if it fails. If it fails, don't
# load any data.
if (not self.checkIDFieldsJSON(cell_dict, repertoire_link_id)):
if (not self.checkIDFieldsJSON(cell_dict,
repertoire_link_field, repertoire_link_id,
repertoire_id_value,
data_processing_id_value,
sample_processing_id_value)):

return False

# Create the created and update values for this block of records. Note that
Expand Down
38 changes: 37 additions & 1 deletion dataload/airr_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,37 @@ def processAIRRCloneFile( self, file_handle, filename ):
print("ERROR: Could not link file %s to a valid repertoire"%(filename))
return False

# Look up the repertoire data for the record of interest. This is an array
# and it should be of length 1
repertoires = self.repository.getRepertoires(repertoire_link_field,
repertoire_link_id)
if not len(repertoires) == 1:
print("ERROR: Could not find unique repertoire for id %s"%(repertoire_link_id))
return False
repertoire = repertoires[0]

# Get mapping of the ID fields we want to generate.
map_class = self.getAIRRMap().getRepertoireClass()
rep_id_field = self.getAIRRRepositoryField("repertoire_id", map_class)
data_id_field = self.getAIRRRepositoryField("data_processing_id", map_class)
sample_id_field = self.getAIRRRepositoryField("sample_processing_id", map_class)

# Cache some data we need to use often.
if rep_id_field in repertoire:
repertoire_id_value = repertoire[rep_id_field]
else:
repertoire_id_value = None

if data_id_field in repertoire:
data_processing_id_value = repertoire[data_id_field]
else:
data_processing_id_value = None

if sample_id_field in repertoire:
sample_processing_id_value = repertoire[sample_id_field]
else:
sample_processing_id_value = None

# Get the column of values from the AIRR tag. We only want the
# Clone related fields.
map_column = self.getAIRRMap().getIRCloneMapColumn(airr_tag)
Expand Down Expand Up @@ -274,7 +305,12 @@ def processAIRRCloneFile( self, file_handle, filename ):
# Set the relevant IDs for the record being inserted. It updates the dictionary
# (passed by reference) and returns False if it fails. If it fails, don't
# load any data.
if (not self.checkIDFieldsJSON(clone_dict, repertoire_link_id)):
if (not self.checkIDFieldsJSON(clone_dict,
repertoire_link_field, repertoire_link_id,
repertoire_id_value,
data_processing_id_value,
sample_processing_id_value)):

return False

# Create the created and update values for this block of records. Note that
Expand Down
61 changes: 48 additions & 13 deletions dataload/airr_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,37 @@ def processAIRRExpressionFile( self, file_handle, filename ):
print("ERROR: Could not link file %s to a valid repertoire"%(filename))
return False

# Look up the repertoire data for the record of interest. This is an array
# and it should be of length 1
repertoires = self.repository.getRepertoires(repertoire_link_field,
repertoire_link_id)
if not len(repertoires) == 1:
print("ERROR: Could not find unique repertoire for id %s"%(repertoire_link_id))
return False
repertoire = repertoires[0]

# Get mapping of the ID fields we want to generate.
map_class = self.getAIRRMap().getRepertoireClass()
rep_id_field = self.getAIRRRepositoryField("repertoire_id", map_class)
data_id_field = self.getAIRRRepositoryField("data_processing_id", map_class)
sample_id_field = self.getAIRRRepositoryField("sample_processing_id", map_class)

# Cache some data we need to use often.
if rep_id_field in repertoire:
repertoire_id_value = repertoire[rep_id_field]
else:
repertoire_id_value = None

if data_id_field in repertoire:
data_processing_id_value = repertoire[data_id_field]
else:
data_processing_id_value = None

if sample_id_field in repertoire:
sample_processing_id_value = repertoire[sample_id_field]
else:
sample_processing_id_value = None

# Get the column of values from the AIRR tag. We only want the
# Expression related fields.
map_column = self.getAIRRMap().getExpressionMapColumn(airr_tag)
Expand Down Expand Up @@ -168,10 +199,10 @@ def processAIRRExpressionFile( self, file_handle, filename ):
# Timing stuff
t_start = time.perf_counter()
# Left in timing code (commented out) in case we want to go back and optimize.
#t_flatten = 0.0
#t_check = 0.0
#t_append = 0.0
#t_copy = 0.0
t_flatten = 0.0
t_check = 0.0
t_append = 0.0
t_copy = 0.0

# Do some setup, including getting a repository key map cache so we don't have
# to look up repository keys all the time. This is a huge overhead without the
Expand Down Expand Up @@ -266,11 +297,15 @@ def processAIRRExpressionFile( self, file_handle, filename ):
# Set the relevant IDs for the record being inserted. It updates the dictionary
# (passed by reference) and returns False if it fails. If it fails, don't
# load any data.
#t_local_start = time.perf_counter()
if (not self.checkIDFieldsJSON(airr_expression_dict, repertoire_link_id)):
t_local_start = time.perf_counter()
if (not self.checkIDFieldsJSON(airr_expression_dict,
repertoire_link_field, repertoire_link_id,
repertoire_id_value,
data_processing_id_value,
sample_processing_id_value)):
return False
#t_local_end = time.perf_counter()
#t_check = t_check + (t_local_end - t_local_start)
t_local_end = time.perf_counter()
t_check = t_check + (t_local_end - t_local_start)

# Create the created and update values for this record. Note that
# this means that each block of inserts will have the same date.
Expand All @@ -287,21 +322,21 @@ def processAIRRExpressionFile( self, file_handle, filename ):
# We want to insert into mongo in blocks of chunk_size records.
block_count = block_count + 1
if block_count == chunk_size:
#t_insert_start = time.perf_counter()
t_insert_start = time.perf_counter()
self.repositoryInsertRecords(block_array)
#t_insert_end = time.perf_counter()
t_insert_end = time.perf_counter()
t_end = time.perf_counter()

#print("Info: insert time = %f"% (t_insert_end-t_insert_start),flush=True)
print("Info: insert time = %f"% (t_insert_end-t_insert_start),flush=True)
#print("Info: flatten time = %f"% (t_flatten),flush=True)
#print("Info: check time = %f"% (t_check),flush=True)
print("Info: check time = %f"% (t_check),flush=True)
#print("Info: append time = %f"% (t_append),flush=True)
#print("Info: copy time = %f"% (t_copy),flush=True)
print("Info: Inserted %d records, time = %f (%f records/s, %f percent)"%
(chunk_size, t_end-t_start, chunk_size/(t_end-t_start),
(total_records/expression_records)*100),flush=True)
#t_flatten = 0
#t_check = 0
t_check = 0
#t_append = 0
#t_copy = 0
block_count = 0
Expand Down

0 comments on commit 151183b

Please sign in to comment.