ms_autoqc.AcquisitionListener
1import os, sys, time, ast, shutil 2import logging 3import traceback 4from datetime import datetime, timedelta 5from watchdog.observers import Observer 6from watchdog.events import FileSystemEventHandler 7import hashlib 8import ms_autoqc.DatabaseFunctions as db 9import ms_autoqc.AutoQCProcessing as qc 10 11class DataAcquisitionEventHandler(FileSystemEventHandler): 12 13 """ 14 Event handler that alerts when the data file has completed sample acquisition. 15 For more information, see: https://python-watchdog.readthedocs.io/en/stable/ 16 """ 17 18 def __init__(self, observer, path, filenames, extension, instrument_id, run_id, current_sample): 19 20 self.observer = observer 21 self.path = path 22 self.filenames = filenames 23 self.extension = extension 24 self.instrument_id = instrument_id 25 self.run_id = run_id 26 self.current_sample = current_sample 27 28 29 def on_created(self, event): 30 31 """ 32 Listens to data acquisition path and starts watching newly-created data files, 33 but only if the filename exists in the sequence. 34 35 If the acquisition listener process was restarted, the last sample being acquired 36 or monitored will be reprocessed. 37 38 Args: 39 event (FileCreatedEvent): Event representing file / directory creation. 40 41 Returns: 42 None 43 """ 44 45 # Remove directory path and file extension from filename 46 path = event.src_path.replace("\\", "/") 47 filename = path.split("/")[-1].split(".")[0] 48 49 # For restarted jobs: process the sample that was being acquired when the job was interrupted 50 if self.current_sample is not None: 51 if os.path.exists(self.path + self.current_sample + "." + self.extension): 52 self.trigger_pipeline(self.path, self.current_sample, self.extension) 53 self.current_sample = None 54 55 # Route data file to pipeline 56 if not event.is_directory and filename in self.filenames: 57 self.trigger_pipeline(self.path, filename, self.extension) 58 59 60 def watch_file(self, path, filename, extension, next_sample=None): 61 62 """ 63 Returns True if MD5 checksums match AND next sample in sequence has begun acquiring, 64 effectively determining whether sample acquisition has been completed. 65 66 Checksum matching is checking if the MD5 checksum computed for the file matches the 67 MD5 checksum that was written to the database 3 minutes ago. 68 69 If watching the last sample in the sequence, this function will skip checking for the next sample. 70 71 Args: 72 path (str): Data acquisition path 73 filename (str): Name of sample data file 74 extension (str): Data file extension, derived from instrument vendor 75 next_sample (str, default None): Next sample in sequence after current sample being watched 76 77 Returns: 78 bool: True if data acquisition is deemed complete. 79 """ 80 81 # Write initial MD5 checksum to database 82 md5_checksum = get_md5(path + filename + "." + extension) 83 db.update_md5_checksum(self.instrument_id, filename, md5_checksum) 84 85 # Watch file indefinitely 86 while os.path.exists(path): 87 88 # Wait 3 minutes 89 print("Waiting 3 minutes...") 90 time.sleep(180) 91 92 # Compare checksums 93 new_md5 = get_md5(path + filename + "." + extension) 94 old_md5 = db.get_md5(self.instrument_id, filename) 95 96 # If the MD5 checksum after 3 mins is the same as before, route to pipeline 97 if new_md5 == old_md5: 98 print("MD5 checksums matched.") 99 100 if next_sample is None: 101 print("Preparing to process file.") 102 time.sleep(180) 103 return True 104 else: 105 if os.path.exists(path + next_sample + "." + extension): 106 print("Preparing to process file.") 107 time.sleep(180) 108 return True 109 else: 110 print("Waiting for instrument to start acquiring next sample:", next_sample) 111 db.update_md5_checksum(self.instrument_id, filename, new_md5) 112 else: 113 print("MD5 checksums do not match.") 114 db.update_md5_checksum(self.instrument_id, filename, new_md5) 115 116 117 def trigger_pipeline(self, path, filename, extension): 118 119 """ 120 Wrapper function that routes data file to monitoring and processing functions. 121 122 This function is called every time a data file is created in the data acquisition path. 123 See watch_file() and process_data_file() for more information. 124 125 At the end of the instrument run, marks job as completed and kills listener process. 126 127 Args: 128 path (str): Data acquisition path 129 filename (str): Name of sample data file 130 extension (str): Data file extension, derived from instrument vendor 131 132 Returns: 133 None 134 """ 135 136 print("Watching file:", filename) 137 138 # Get next sample 139 try: 140 next_sample = db.get_next_sample(filename, self.instrument_id, self.run_id) 141 except: 142 next_sample = None 143 144 # Start watching file until sample acquisition is complete 145 try: 146 sample_acquired = self.watch_file(path, filename, extension, next_sample) 147 except Exception as error: 148 print("Error while watching file:", error) 149 sample_acquired = None 150 151 # Route data file to MS-AutoQC pipeline 152 if sample_acquired: 153 print("Data acquisition completed for", filename) 154 qc.process_data_file(path, filename, extension, self.instrument_id, self.run_id) 155 print("Data processing for", filename, "complete.") 156 157 # Check if data file was the last sample in the sequence 158 if filename == self.filenames[-1]: 159 # If so, stop acquisition listening 160 print("Last sample acquired. Instrument run complete.") 161 self.observer.stop() 162 163 # Terminate acquisition listener process 164 print("Terminating acquisition listener process.") 165 terminate_job(self.instrument_id, self.run_id) 166 167 168def start_listener(path, instrument_id, run_id): 169 170 """ 171 Initializes acquisition listener process to process data files upon sample acquisition completion. 172 173 If the QC job is for a completed instrument run (i.e. all data files have been previously acquired and exist 174 in a directory), then this function simply iterates through the list of filenames and processes each sample. 175 176 If the QC job is for an active instrument run, this function initializes the Watchdog file monitor to capture 177 incoming data files and wait for them to finish writing before processing each sample. 178 179 In addition, to handle crashes, restarted jobs, and other events, this function checks for and processes 180 all unprocessed samples in active instrument runs. Unprocessed samples are defined as data files that exist but 181 do not have QC results in the database. 182 183 For more information on the Watchdog package, see: https://python-watchdog.readthedocs.io/en/stable/ 184 185 Args: 186 path (str): Data acquisition path 187 instrument_id (str): Instrument ID 188 run_id (str): Instrument run ID (job ID) 189 190 Returns: 191 None 192 """ 193 194 print("Run monitoring initiated for", path) 195 196 # Check if MS-AutoQC job type is active monitoring or bulk QC 197 is_completed_run = db.is_completed_run(instrument_id, run_id) 198 199 # Retrieve filenames for samples in run 200 filenames = db.get_remaining_samples(instrument_id, run_id) 201 202 # Get data file extension 203 extension = db.get_data_file_type(instrument_id) 204 205 # Format acquisition path 206 path = path.replace("\\", "/") 207 path = path + "/" if path[-1] != "/" else path 208 209 if is_completed_run: 210 211 # Iterate through files and process each one 212 for filename in filenames: 213 214 # If file is not in directory, skip it 215 full_path = path + filename + "." + extension 216 217 if not os.path.exists(full_path): 218 continue 219 220 # Process data file 221 qc.process_data_file(path, filename, extension, instrument_id, run_id) 222 print("Data processing for", filename, "complete.") 223 224 print("Last sample acquired. QC job complete.") 225 terminate_job(instrument_id, run_id) 226 227 else: 228 # Get samples that may have been unprocessed due to an error or accidental termination 229 missing_samples, current_sample = db.get_unprocessed_samples(instrument_id, run_id) 230 print("Current sample:", current_sample) 231 232 # Check for missed samples and process them before starting file monitor 233 if len(missing_samples) > 0: 234 235 # Iterate through files and process each one 236 for filename in missing_samples: 237 238 # If file is not in directory, skip it 239 full_path = path + filename + "." + extension 240 if not os.path.exists(full_path): 241 continue 242 243 # Process data file 244 qc.process_data_file(path, filename, extension, instrument_id, run_id) 245 print("Data processing for", filename, "complete.") 246 247 # Start file monitor and process files as they are created 248 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") 249 250 observer = Observer() 251 event_handler = DataAcquisitionEventHandler(observer, path, filenames, extension, instrument_id, run_id, current_sample) 252 observer.schedule(event_handler, path, recursive=True) 253 observer.start() 254 255 try: 256 while observer.is_alive(): 257 observer.join(1) 258 finally: 259 observer.stop() 260 observer.join() 261 262 263def terminate_job(instrument_id, run_id): 264 265 """ 266 Wraps up QC job after the last data file has been routed to the pipeline. 267 268 Performs the following functions: 269 1. Marks instrument run as completed 270 2. Uploads database to Google Drive (if Google Drive sync is enabled) 271 3. Deletes temporary data file directory in /data 272 4. Kills acquisition listener process 273 274 Args: 275 instrument_id (str): 276 Instrument ID 277 run_id (str): 278 Instrument run ID (job ID) 279 280 Returns: 281 None 282 """ 283 284 # Mark instrument run as completed 285 db.mark_run_as_completed(instrument_id, run_id) 286 287 # Sync database on run completion 288 if db.sync_is_enabled(): 289 db.sync_on_run_completion(instrument_id, run_id) 290 291 # Delete temporary data file directory 292 db.delete_temp_directory(instrument_id, run_id) 293 294 # Kill acquisition listener 295 pid = db.get_pid(instrument_id, run_id) 296 qc.kill_subprocess(pid) 297 298 299def get_md5(file_path): 300 301 """ 302 Computes MD5 checksum for a given file. 303 304 Args: 305 file_path (str): File path 306 307 Returns: 308 str: MD5 checksum for the given file. 309 """ 310 311 hash_md5 = hashlib.md5() 312 313 with open(file_path, "rb") as f: 314 for chunk in iter(lambda: f.read(4096), b""): 315 hash_md5.update(chunk) 316 317 return hash_md5.hexdigest() 318 319 320if __name__ == "__main__": 321 322 # Start listening to data file directory 323 start_listener(path=sys.argv[1], instrument_id=sys.argv[2], run_id=sys.argv[3])
12class DataAcquisitionEventHandler(FileSystemEventHandler): 13 14 """ 15 Event handler that alerts when the data file has completed sample acquisition. 16 For more information, see: https://python-watchdog.readthedocs.io/en/stable/ 17 """ 18 19 def __init__(self, observer, path, filenames, extension, instrument_id, run_id, current_sample): 20 21 self.observer = observer 22 self.path = path 23 self.filenames = filenames 24 self.extension = extension 25 self.instrument_id = instrument_id 26 self.run_id = run_id 27 self.current_sample = current_sample 28 29 30 def on_created(self, event): 31 32 """ 33 Listens to data acquisition path and starts watching newly-created data files, 34 but only if the filename exists in the sequence. 35 36 If the acquisition listener process was restarted, the last sample being acquired 37 or monitored will be reprocessed. 38 39 Args: 40 event (FileCreatedEvent): Event representing file / directory creation. 41 42 Returns: 43 None 44 """ 45 46 # Remove directory path and file extension from filename 47 path = event.src_path.replace("\\", "/") 48 filename = path.split("/")[-1].split(".")[0] 49 50 # For restarted jobs: process the sample that was being acquired when the job was interrupted 51 if self.current_sample is not None: 52 if os.path.exists(self.path + self.current_sample + "." + self.extension): 53 self.trigger_pipeline(self.path, self.current_sample, self.extension) 54 self.current_sample = None 55 56 # Route data file to pipeline 57 if not event.is_directory and filename in self.filenames: 58 self.trigger_pipeline(self.path, filename, self.extension) 59 60 61 def watch_file(self, path, filename, extension, next_sample=None): 62 63 """ 64 Returns True if MD5 checksums match AND next sample in sequence has begun acquiring, 65 effectively determining whether sample acquisition has been completed. 66 67 Checksum matching is checking if the MD5 checksum computed for the file matches the 68 MD5 checksum that was written to the database 3 minutes ago. 69 70 If watching the last sample in the sequence, this function will skip checking for the next sample. 71 72 Args: 73 path (str): Data acquisition path 74 filename (str): Name of sample data file 75 extension (str): Data file extension, derived from instrument vendor 76 next_sample (str, default None): Next sample in sequence after current sample being watched 77 78 Returns: 79 bool: True if data acquisition is deemed complete. 80 """ 81 82 # Write initial MD5 checksum to database 83 md5_checksum = get_md5(path + filename + "." + extension) 84 db.update_md5_checksum(self.instrument_id, filename, md5_checksum) 85 86 # Watch file indefinitely 87 while os.path.exists(path): 88 89 # Wait 3 minutes 90 print("Waiting 3 minutes...") 91 time.sleep(180) 92 93 # Compare checksums 94 new_md5 = get_md5(path + filename + "." + extension) 95 old_md5 = db.get_md5(self.instrument_id, filename) 96 97 # If the MD5 checksum after 3 mins is the same as before, route to pipeline 98 if new_md5 == old_md5: 99 print("MD5 checksums matched.") 100 101 if next_sample is None: 102 print("Preparing to process file.") 103 time.sleep(180) 104 return True 105 else: 106 if os.path.exists(path + next_sample + "." + extension): 107 print("Preparing to process file.") 108 time.sleep(180) 109 return True 110 else: 111 print("Waiting for instrument to start acquiring next sample:", next_sample) 112 db.update_md5_checksum(self.instrument_id, filename, new_md5) 113 else: 114 print("MD5 checksums do not match.") 115 db.update_md5_checksum(self.instrument_id, filename, new_md5) 116 117 118 def trigger_pipeline(self, path, filename, extension): 119 120 """ 121 Wrapper function that routes data file to monitoring and processing functions. 122 123 This function is called every time a data file is created in the data acquisition path. 124 See watch_file() and process_data_file() for more information. 125 126 At the end of the instrument run, marks job as completed and kills listener process. 127 128 Args: 129 path (str): Data acquisition path 130 filename (str): Name of sample data file 131 extension (str): Data file extension, derived from instrument vendor 132 133 Returns: 134 None 135 """ 136 137 print("Watching file:", filename) 138 139 # Get next sample 140 try: 141 next_sample = db.get_next_sample(filename, self.instrument_id, self.run_id) 142 except: 143 next_sample = None 144 145 # Start watching file until sample acquisition is complete 146 try: 147 sample_acquired = self.watch_file(path, filename, extension, next_sample) 148 except Exception as error: 149 print("Error while watching file:", error) 150 sample_acquired = None 151 152 # Route data file to MS-AutoQC pipeline 153 if sample_acquired: 154 print("Data acquisition completed for", filename) 155 qc.process_data_file(path, filename, extension, self.instrument_id, self.run_id) 156 print("Data processing for", filename, "complete.") 157 158 # Check if data file was the last sample in the sequence 159 if filename == self.filenames[-1]: 160 # If so, stop acquisition listening 161 print("Last sample acquired. Instrument run complete.") 162 self.observer.stop() 163 164 # Terminate acquisition listener process 165 print("Terminating acquisition listener process.") 166 terminate_job(self.instrument_id, self.run_id)
Event handler that alerts when the data file has completed sample acquisition. For more information, see: https://python-watchdog.readthedocs.io/en/stable/
19 def __init__(self, observer, path, filenames, extension, instrument_id, run_id, current_sample): 20 21 self.observer = observer 22 self.path = path 23 self.filenames = filenames 24 self.extension = extension 25 self.instrument_id = instrument_id 26 self.run_id = run_id 27 self.current_sample = current_sample
30 def on_created(self, event): 31 32 """ 33 Listens to data acquisition path and starts watching newly-created data files, 34 but only if the filename exists in the sequence. 35 36 If the acquisition listener process was restarted, the last sample being acquired 37 or monitored will be reprocessed. 38 39 Args: 40 event (FileCreatedEvent): Event representing file / directory creation. 41 42 Returns: 43 None 44 """ 45 46 # Remove directory path and file extension from filename 47 path = event.src_path.replace("\\", "/") 48 filename = path.split("/")[-1].split(".")[0] 49 50 # For restarted jobs: process the sample that was being acquired when the job was interrupted 51 if self.current_sample is not None: 52 if os.path.exists(self.path + self.current_sample + "." + self.extension): 53 self.trigger_pipeline(self.path, self.current_sample, self.extension) 54 self.current_sample = None 55 56 # Route data file to pipeline 57 if not event.is_directory and filename in self.filenames: 58 self.trigger_pipeline(self.path, filename, self.extension)
Listens to data acquisition path and starts watching newly-created data files, but only if the filename exists in the sequence.
If the acquisition listener process was restarted, the last sample being acquired or monitored will be reprocessed.
Arguments:
- event (FileCreatedEvent): Event representing file / directory creation.
Returns:
None
61 def watch_file(self, path, filename, extension, next_sample=None): 62 63 """ 64 Returns True if MD5 checksums match AND next sample in sequence has begun acquiring, 65 effectively determining whether sample acquisition has been completed. 66 67 Checksum matching is checking if the MD5 checksum computed for the file matches the 68 MD5 checksum that was written to the database 3 minutes ago. 69 70 If watching the last sample in the sequence, this function will skip checking for the next sample. 71 72 Args: 73 path (str): Data acquisition path 74 filename (str): Name of sample data file 75 extension (str): Data file extension, derived from instrument vendor 76 next_sample (str, default None): Next sample in sequence after current sample being watched 77 78 Returns: 79 bool: True if data acquisition is deemed complete. 80 """ 81 82 # Write initial MD5 checksum to database 83 md5_checksum = get_md5(path + filename + "." + extension) 84 db.update_md5_checksum(self.instrument_id, filename, md5_checksum) 85 86 # Watch file indefinitely 87 while os.path.exists(path): 88 89 # Wait 3 minutes 90 print("Waiting 3 minutes...") 91 time.sleep(180) 92 93 # Compare checksums 94 new_md5 = get_md5(path + filename + "." + extension) 95 old_md5 = db.get_md5(self.instrument_id, filename) 96 97 # If the MD5 checksum after 3 mins is the same as before, route to pipeline 98 if new_md5 == old_md5: 99 print("MD5 checksums matched.") 100 101 if next_sample is None: 102 print("Preparing to process file.") 103 time.sleep(180) 104 return True 105 else: 106 if os.path.exists(path + next_sample + "." + extension): 107 print("Preparing to process file.") 108 time.sleep(180) 109 return True 110 else: 111 print("Waiting for instrument to start acquiring next sample:", next_sample) 112 db.update_md5_checksum(self.instrument_id, filename, new_md5) 113 else: 114 print("MD5 checksums do not match.") 115 db.update_md5_checksum(self.instrument_id, filename, new_md5)
Returns True if MD5 checksums match AND next sample in sequence has begun acquiring, effectively determining whether sample acquisition has been completed.
Checksum matching is checking if the MD5 checksum computed for the file matches the MD5 checksum that was written to the database 3 minutes ago.
If watching the last sample in the sequence, this function will skip checking for the next sample.
Arguments:
- path (str): Data acquisition path
- filename (str): Name of sample data file
- extension (str): Data file extension, derived from instrument vendor
- next_sample (str, default None): Next sample in sequence after current sample being watched
Returns:
bool: True if data acquisition is deemed complete.
118 def trigger_pipeline(self, path, filename, extension): 119 120 """ 121 Wrapper function that routes data file to monitoring and processing functions. 122 123 This function is called every time a data file is created in the data acquisition path. 124 See watch_file() and process_data_file() for more information. 125 126 At the end of the instrument run, marks job as completed and kills listener process. 127 128 Args: 129 path (str): Data acquisition path 130 filename (str): Name of sample data file 131 extension (str): Data file extension, derived from instrument vendor 132 133 Returns: 134 None 135 """ 136 137 print("Watching file:", filename) 138 139 # Get next sample 140 try: 141 next_sample = db.get_next_sample(filename, self.instrument_id, self.run_id) 142 except: 143 next_sample = None 144 145 # Start watching file until sample acquisition is complete 146 try: 147 sample_acquired = self.watch_file(path, filename, extension, next_sample) 148 except Exception as error: 149 print("Error while watching file:", error) 150 sample_acquired = None 151 152 # Route data file to MS-AutoQC pipeline 153 if sample_acquired: 154 print("Data acquisition completed for", filename) 155 qc.process_data_file(path, filename, extension, self.instrument_id, self.run_id) 156 print("Data processing for", filename, "complete.") 157 158 # Check if data file was the last sample in the sequence 159 if filename == self.filenames[-1]: 160 # If so, stop acquisition listening 161 print("Last sample acquired. Instrument run complete.") 162 self.observer.stop() 163 164 # Terminate acquisition listener process 165 print("Terminating acquisition listener process.") 166 terminate_job(self.instrument_id, self.run_id)
Wrapper function that routes data file to monitoring and processing functions.
This function is called every time a data file is created in the data acquisition path. See watch_file() and process_data_file() for more information.
At the end of the instrument run, marks job as completed and kills listener process.
Arguments:
- path (str): Data acquisition path
- filename (str): Name of sample data file
- extension (str): Data file extension, derived from instrument vendor
Returns:
None
Inherited Members
- watchdog.events.FileSystemEventHandler
- dispatch
- on_any_event
- on_moved
- on_deleted
- on_modified
- on_closed
169def start_listener(path, instrument_id, run_id): 170 171 """ 172 Initializes acquisition listener process to process data files upon sample acquisition completion. 173 174 If the QC job is for a completed instrument run (i.e. all data files have been previously acquired and exist 175 in a directory), then this function simply iterates through the list of filenames and processes each sample. 176 177 If the QC job is for an active instrument run, this function initializes the Watchdog file monitor to capture 178 incoming data files and wait for them to finish writing before processing each sample. 179 180 In addition, to handle crashes, restarted jobs, and other events, this function checks for and processes 181 all unprocessed samples in active instrument runs. Unprocessed samples are defined as data files that exist but 182 do not have QC results in the database. 183 184 For more information on the Watchdog package, see: https://python-watchdog.readthedocs.io/en/stable/ 185 186 Args: 187 path (str): Data acquisition path 188 instrument_id (str): Instrument ID 189 run_id (str): Instrument run ID (job ID) 190 191 Returns: 192 None 193 """ 194 195 print("Run monitoring initiated for", path) 196 197 # Check if MS-AutoQC job type is active monitoring or bulk QC 198 is_completed_run = db.is_completed_run(instrument_id, run_id) 199 200 # Retrieve filenames for samples in run 201 filenames = db.get_remaining_samples(instrument_id, run_id) 202 203 # Get data file extension 204 extension = db.get_data_file_type(instrument_id) 205 206 # Format acquisition path 207 path = path.replace("\\", "/") 208 path = path + "/" if path[-1] != "/" else path 209 210 if is_completed_run: 211 212 # Iterate through files and process each one 213 for filename in filenames: 214 215 # If file is not in directory, skip it 216 full_path = path + filename + "." + extension 217 218 if not os.path.exists(full_path): 219 continue 220 221 # Process data file 222 qc.process_data_file(path, filename, extension, instrument_id, run_id) 223 print("Data processing for", filename, "complete.") 224 225 print("Last sample acquired. QC job complete.") 226 terminate_job(instrument_id, run_id) 227 228 else: 229 # Get samples that may have been unprocessed due to an error or accidental termination 230 missing_samples, current_sample = db.get_unprocessed_samples(instrument_id, run_id) 231 print("Current sample:", current_sample) 232 233 # Check for missed samples and process them before starting file monitor 234 if len(missing_samples) > 0: 235 236 # Iterate through files and process each one 237 for filename in missing_samples: 238 239 # If file is not in directory, skip it 240 full_path = path + filename + "." + extension 241 if not os.path.exists(full_path): 242 continue 243 244 # Process data file 245 qc.process_data_file(path, filename, extension, instrument_id, run_id) 246 print("Data processing for", filename, "complete.") 247 248 # Start file monitor and process files as they are created 249 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") 250 251 observer = Observer() 252 event_handler = DataAcquisitionEventHandler(observer, path, filenames, extension, instrument_id, run_id, current_sample) 253 observer.schedule(event_handler, path, recursive=True) 254 observer.start() 255 256 try: 257 while observer.is_alive(): 258 observer.join(1) 259 finally: 260 observer.stop() 261 observer.join()
Initializes acquisition listener process to process data files upon sample acquisition completion.
If the QC job is for a completed instrument run (i.e. all data files have been previously acquired and exist in a directory), then this function simply iterates through the list of filenames and processes each sample.
If the QC job is for an active instrument run, this function initializes the Watchdog file monitor to capture incoming data files and wait for them to finish writing before processing each sample.
In addition, to handle crashes, restarted jobs, and other events, this function checks for and processes all unprocessed samples in active instrument runs. Unprocessed samples are defined as data files that exist but do not have QC results in the database.
For more information on the Watchdog package, see: https://python-watchdog.readthedocs.io/en/stable/
Arguments:
- path (str): Data acquisition path
- instrument_id (str): Instrument ID
- run_id (str): Instrument run ID (job ID)
Returns:
None
264def terminate_job(instrument_id, run_id): 265 266 """ 267 Wraps up QC job after the last data file has been routed to the pipeline. 268 269 Performs the following functions: 270 1. Marks instrument run as completed 271 2. Uploads database to Google Drive (if Google Drive sync is enabled) 272 3. Deletes temporary data file directory in /data 273 4. Kills acquisition listener process 274 275 Args: 276 instrument_id (str): 277 Instrument ID 278 run_id (str): 279 Instrument run ID (job ID) 280 281 Returns: 282 None 283 """ 284 285 # Mark instrument run as completed 286 db.mark_run_as_completed(instrument_id, run_id) 287 288 # Sync database on run completion 289 if db.sync_is_enabled(): 290 db.sync_on_run_completion(instrument_id, run_id) 291 292 # Delete temporary data file directory 293 db.delete_temp_directory(instrument_id, run_id) 294 295 # Kill acquisition listener 296 pid = db.get_pid(instrument_id, run_id) 297 qc.kill_subprocess(pid)
Wraps up QC job after the last data file has been routed to the pipeline.
Performs the following functions:
- Marks instrument run as completed
- Uploads database to Google Drive (if Google Drive sync is enabled)
- Deletes temporary data file directory in /data
- Kills acquisition listener process
Arguments:
- instrument_id (str): Instrument ID
- run_id (str): Instrument run ID (job ID)
Returns:
None
300def get_md5(file_path): 301 302 """ 303 Computes MD5 checksum for a given file. 304 305 Args: 306 file_path (str): File path 307 308 Returns: 309 str: MD5 checksum for the given file. 310 """ 311 312 hash_md5 = hashlib.md5() 313 314 with open(file_path, "rb") as f: 315 for chunk in iter(lambda: f.read(4096), b""): 316 hash_md5.update(chunk) 317 318 return hash_md5.hexdigest()
Computes MD5 checksum for a given file.
Arguments:
- file_path (str): File path
Returns:
str: MD5 checksum for the given file.