ms_autoqc.AcquisitionListener

  1import os, sys, time, ast, shutil
  2import logging
  3import traceback
  4from datetime import datetime, timedelta
  5from watchdog.observers import Observer
  6from watchdog.events import FileSystemEventHandler
  7import hashlib
  8import ms_autoqc.DatabaseFunctions as db
  9import ms_autoqc.AutoQCProcessing as qc
 10
 11class DataAcquisitionEventHandler(FileSystemEventHandler):
 12
 13    """
 14    Event handler that alerts when the data file has completed sample acquisition.
 15    For more information, see: https://python-watchdog.readthedocs.io/en/stable/
 16    """
 17
 18    def __init__(self, observer, path, filenames, extension, instrument_id, run_id, current_sample):
 19
 20        self.observer = observer
 21        self.path = path
 22        self.filenames = filenames
 23        self.extension = extension
 24        self.instrument_id = instrument_id
 25        self.run_id = run_id
 26        self.current_sample = current_sample
 27
 28
 29    def on_created(self, event):
 30
 31        """
 32        Listens to data acquisition path and starts watching newly-created data files,
 33        but only if the filename exists in the sequence.
 34
 35        If the acquisition listener process was restarted, the last sample being acquired
 36        or monitored will be reprocessed.
 37
 38        Args:
 39            event (FileCreatedEvent): Event representing file / directory creation.
 40
 41        Returns:
 42            None
 43        """
 44
 45        # Remove directory path and file extension from filename
 46        path = event.src_path.replace("\\", "/")
 47        filename = path.split("/")[-1].split(".")[0]
 48
 49        # For restarted jobs: process the sample that was being acquired when the job was interrupted
 50        if self.current_sample is not None:
 51            if os.path.exists(self.path + self.current_sample + "." + self.extension):
 52                self.trigger_pipeline(self.path, self.current_sample, self.extension)
 53                self.current_sample = None
 54
 55        # Route data file to pipeline
 56        if not event.is_directory and filename in self.filenames:
 57            self.trigger_pipeline(self.path, filename, self.extension)
 58
 59
 60    def watch_file(self, path, filename, extension, next_sample=None):
 61
 62        """
 63        Returns True if MD5 checksums match AND next sample in sequence has begun acquiring,
 64        effectively determining whether sample acquisition has been completed.
 65
 66        Checksum matching is checking if the MD5 checksum computed for the file matches the
 67        MD5 checksum that was written to the database 3 minutes ago.
 68
 69        If watching the last sample in the sequence, this function will skip checking for the next sample.
 70
 71        Args:
 72            path (str): Data acquisition path
 73            filename (str): Name of sample data file
 74            extension (str): Data file extension, derived from instrument vendor
 75            next_sample (str, default None): Next sample in sequence after current sample being watched
 76
 77        Returns:
 78            bool: True if data acquisition is deemed complete.
 79        """
 80
 81        # Write initial MD5 checksum to database
 82        md5_checksum = get_md5(path + filename + "." + extension)
 83        db.update_md5_checksum(self.instrument_id, filename, md5_checksum)
 84
 85        # Watch file indefinitely
 86        while os.path.exists(path):
 87
 88            # Wait 3 minutes
 89            print("Waiting 3 minutes...")
 90            time.sleep(180)
 91
 92            # Compare checksums
 93            new_md5 = get_md5(path + filename + "." + extension)
 94            old_md5 = db.get_md5(self.instrument_id, filename)
 95
 96            # If the MD5 checksum after 3 mins is the same as before, route to pipeline
 97            if new_md5 == old_md5:
 98                print("MD5 checksums matched.")
 99
100                if next_sample is None:
101                    print("Preparing to process file.")
102                    time.sleep(180)
103                    return True
104                else:
105                    if os.path.exists(path + next_sample + "." + extension):
106                        print("Preparing to process file.")
107                        time.sleep(180)
108                        return True
109                    else:
110                        print("Waiting for instrument to start acquiring next sample:", next_sample)
111                        db.update_md5_checksum(self.instrument_id, filename, new_md5)
112            else:
113                print("MD5 checksums do not match.")
114                db.update_md5_checksum(self.instrument_id, filename, new_md5)
115
116
117    def trigger_pipeline(self, path, filename, extension):
118
119        """
120        Wrapper function that routes data file to monitoring and processing functions.
121
122        This function is called every time a data file is created in the data acquisition path.
123        See watch_file() and process_data_file() for more information.
124
125        At the end of the instrument run, marks job as completed and kills listener process.
126
127        Args:
128            path (str): Data acquisition path
129            filename (str): Name of sample data file
130            extension (str): Data file extension, derived from instrument vendor
131
132        Returns:
133            None
134        """
135
136        print("Watching file:", filename)
137
138        # Get next sample
139        try:
140            next_sample = db.get_next_sample(filename, self.instrument_id, self.run_id)
141        except:
142            next_sample = None
143
144        # Start watching file until sample acquisition is complete
145        try:
146            sample_acquired = self.watch_file(path, filename, extension, next_sample)
147        except Exception as error:
148            print("Error while watching file:", error)
149            sample_acquired = None
150
151        # Route data file to MS-AutoQC pipeline
152        if sample_acquired:
153            print("Data acquisition completed for", filename)
154            qc.process_data_file(path, filename, extension, self.instrument_id, self.run_id)
155            print("Data processing for", filename, "complete.")
156
157        # Check if data file was the last sample in the sequence
158        if filename == self.filenames[-1]:
159            # If so, stop acquisition listening
160            print("Last sample acquired. Instrument run complete.")
161            self.observer.stop()
162
163            # Terminate acquisition listener process
164            print("Terminating acquisition listener process.")
165            terminate_job(self.instrument_id, self.run_id)
166
167
168def start_listener(path, instrument_id, run_id):
169
170    """
171    Initializes acquisition listener process to process data files upon sample acquisition completion.
172
173    If the QC job is for a completed instrument run (i.e. all data files have been previously acquired and exist
174    in a directory), then this function simply iterates through the list of filenames and processes each sample.
175
176    If the QC job is for an active instrument run, this function initializes the Watchdog file monitor to capture
177    incoming data files and wait for them to finish writing before processing each sample.
178
179    In addition, to handle crashes, restarted jobs, and other events, this function checks for and processes
180    all unprocessed samples in active instrument runs. Unprocessed samples are defined as data files that exist but
181    do not have QC results in the database.
182    
183    For more information on the Watchdog package, see: https://python-watchdog.readthedocs.io/en/stable/
184
185    Args:
186        path (str): Data acquisition path
187        instrument_id (str): Instrument ID
188        run_id (str): Instrument run ID (job ID)
189
190    Returns:
191        None
192    """
193
194    print("Run monitoring initiated for", path)
195
196    # Check if MS-AutoQC job type is active monitoring or bulk QC
197    is_completed_run = db.is_completed_run(instrument_id, run_id)
198
199    # Retrieve filenames for samples in run
200    filenames = db.get_remaining_samples(instrument_id, run_id)
201
202    # Get data file extension
203    extension = db.get_data_file_type(instrument_id)
204
205    # Format acquisition path
206    path = path.replace("\\", "/")
207    path = path + "/" if path[-1] != "/" else path
208
209    if is_completed_run:
210
211        # Iterate through files and process each one
212        for filename in filenames:
213
214            # If file is not in directory, skip it
215            full_path = path + filename + "." + extension
216
217            if not os.path.exists(full_path):
218                continue
219
220            # Process data file
221            qc.process_data_file(path, filename, extension, instrument_id, run_id)
222            print("Data processing for", filename, "complete.")
223
224        print("Last sample acquired. QC job complete.")
225        terminate_job(instrument_id, run_id)
226
227    else:
228        # Get samples that may have been unprocessed due to an error or accidental termination
229        missing_samples, current_sample = db.get_unprocessed_samples(instrument_id, run_id)
230        print("Current sample:", current_sample)
231
232        # Check for missed samples and process them before starting file monitor
233        if len(missing_samples) > 0:
234
235            # Iterate through files and process each one
236            for filename in missing_samples:
237
238                # If file is not in directory, skip it
239                full_path = path + filename + "." + extension
240                if not os.path.exists(full_path):
241                    continue
242
243                # Process data file
244                qc.process_data_file(path, filename, extension, instrument_id, run_id)
245                print("Data processing for", filename, "complete.")
246
247        # Start file monitor and process files as they are created
248        logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
249
250        observer = Observer()
251        event_handler = DataAcquisitionEventHandler(observer, path, filenames, extension, instrument_id, run_id, current_sample)
252        observer.schedule(event_handler, path, recursive=True)
253        observer.start()
254
255        try:
256            while observer.is_alive():
257                observer.join(1)
258        finally:
259            observer.stop()
260            observer.join()
261
262
263def terminate_job(instrument_id, run_id):
264
265    """
266    Wraps up QC job after the last data file has been routed to the pipeline.
267
268    Performs the following functions:
269        1. Marks instrument run as completed
270        2. Uploads database to Google Drive (if Google Drive sync is enabled)
271        3. Deletes temporary data file directory in /data
272        4. Kills acquisition listener process
273
274    Args:
275        instrument_id (str):
276            Instrument ID
277        run_id (str):
278            Instrument run ID (job ID)
279
280    Returns:
281        None
282    """
283
284    # Mark instrument run as completed
285    db.mark_run_as_completed(instrument_id, run_id)
286
287    # Sync database on run completion
288    if db.sync_is_enabled():
289        db.sync_on_run_completion(instrument_id, run_id)
290
291    # Delete temporary data file directory
292    db.delete_temp_directory(instrument_id, run_id)
293
294    # Kill acquisition listener
295    pid = db.get_pid(instrument_id, run_id)
296    qc.kill_subprocess(pid)
297
298
299def get_md5(file_path):
300
301    """
302    Computes MD5 checksum for a given file.
303
304    Args:
305        file_path (str): File path
306
307    Returns:
308        str: MD5 checksum for the given file.
309    """
310
311    hash_md5 = hashlib.md5()
312
313    with open(file_path, "rb") as f:
314        for chunk in iter(lambda: f.read(4096), b""):
315            hash_md5.update(chunk)
316
317    return hash_md5.hexdigest()
318
319
320if __name__ == "__main__":
321
322    # Start listening to data file directory
323    start_listener(path=sys.argv[1], instrument_id=sys.argv[2], run_id=sys.argv[3])
class DataAcquisitionEventHandler(watchdog.events.FileSystemEventHandler):
 12class DataAcquisitionEventHandler(FileSystemEventHandler):
 13
 14    """
 15    Event handler that alerts when the data file has completed sample acquisition.
 16    For more information, see: https://python-watchdog.readthedocs.io/en/stable/
 17    """
 18
 19    def __init__(self, observer, path, filenames, extension, instrument_id, run_id, current_sample):
 20
 21        self.observer = observer
 22        self.path = path
 23        self.filenames = filenames
 24        self.extension = extension
 25        self.instrument_id = instrument_id
 26        self.run_id = run_id
 27        self.current_sample = current_sample
 28
 29
 30    def on_created(self, event):
 31
 32        """
 33        Listens to data acquisition path and starts watching newly-created data files,
 34        but only if the filename exists in the sequence.
 35
 36        If the acquisition listener process was restarted, the last sample being acquired
 37        or monitored will be reprocessed.
 38
 39        Args:
 40            event (FileCreatedEvent): Event representing file / directory creation.
 41
 42        Returns:
 43            None
 44        """
 45
 46        # Remove directory path and file extension from filename
 47        path = event.src_path.replace("\\", "/")
 48        filename = path.split("/")[-1].split(".")[0]
 49
 50        # For restarted jobs: process the sample that was being acquired when the job was interrupted
 51        if self.current_sample is not None:
 52            if os.path.exists(self.path + self.current_sample + "." + self.extension):
 53                self.trigger_pipeline(self.path, self.current_sample, self.extension)
 54                self.current_sample = None
 55
 56        # Route data file to pipeline
 57        if not event.is_directory and filename in self.filenames:
 58            self.trigger_pipeline(self.path, filename, self.extension)
 59
 60
 61    def watch_file(self, path, filename, extension, next_sample=None):
 62
 63        """
 64        Returns True if MD5 checksums match AND next sample in sequence has begun acquiring,
 65        effectively determining whether sample acquisition has been completed.
 66
 67        Checksum matching is checking if the MD5 checksum computed for the file matches the
 68        MD5 checksum that was written to the database 3 minutes ago.
 69
 70        If watching the last sample in the sequence, this function will skip checking for the next sample.
 71
 72        Args:
 73            path (str): Data acquisition path
 74            filename (str): Name of sample data file
 75            extension (str): Data file extension, derived from instrument vendor
 76            next_sample (str, default None): Next sample in sequence after current sample being watched
 77
 78        Returns:
 79            bool: True if data acquisition is deemed complete.
 80        """
 81
 82        # Write initial MD5 checksum to database
 83        md5_checksum = get_md5(path + filename + "." + extension)
 84        db.update_md5_checksum(self.instrument_id, filename, md5_checksum)
 85
 86        # Watch file indefinitely
 87        while os.path.exists(path):
 88
 89            # Wait 3 minutes
 90            print("Waiting 3 minutes...")
 91            time.sleep(180)
 92
 93            # Compare checksums
 94            new_md5 = get_md5(path + filename + "." + extension)
 95            old_md5 = db.get_md5(self.instrument_id, filename)
 96
 97            # If the MD5 checksum after 3 mins is the same as before, route to pipeline
 98            if new_md5 == old_md5:
 99                print("MD5 checksums matched.")
100
101                if next_sample is None:
102                    print("Preparing to process file.")
103                    time.sleep(180)
104                    return True
105                else:
106                    if os.path.exists(path + next_sample + "." + extension):
107                        print("Preparing to process file.")
108                        time.sleep(180)
109                        return True
110                    else:
111                        print("Waiting for instrument to start acquiring next sample:", next_sample)
112                        db.update_md5_checksum(self.instrument_id, filename, new_md5)
113            else:
114                print("MD5 checksums do not match.")
115                db.update_md5_checksum(self.instrument_id, filename, new_md5)
116
117
118    def trigger_pipeline(self, path, filename, extension):
119
120        """
121        Wrapper function that routes data file to monitoring and processing functions.
122
123        This function is called every time a data file is created in the data acquisition path.
124        See watch_file() and process_data_file() for more information.
125
126        At the end of the instrument run, marks job as completed and kills listener process.
127
128        Args:
129            path (str): Data acquisition path
130            filename (str): Name of sample data file
131            extension (str): Data file extension, derived from instrument vendor
132
133        Returns:
134            None
135        """
136
137        print("Watching file:", filename)
138
139        # Get next sample
140        try:
141            next_sample = db.get_next_sample(filename, self.instrument_id, self.run_id)
142        except:
143            next_sample = None
144
145        # Start watching file until sample acquisition is complete
146        try:
147            sample_acquired = self.watch_file(path, filename, extension, next_sample)
148        except Exception as error:
149            print("Error while watching file:", error)
150            sample_acquired = None
151
152        # Route data file to MS-AutoQC pipeline
153        if sample_acquired:
154            print("Data acquisition completed for", filename)
155            qc.process_data_file(path, filename, extension, self.instrument_id, self.run_id)
156            print("Data processing for", filename, "complete.")
157
158        # Check if data file was the last sample in the sequence
159        if filename == self.filenames[-1]:
160            # If so, stop acquisition listening
161            print("Last sample acquired. Instrument run complete.")
162            self.observer.stop()
163
164            # Terminate acquisition listener process
165            print("Terminating acquisition listener process.")
166            terminate_job(self.instrument_id, self.run_id)

Event handler that alerts when the data file has completed sample acquisition. For more information, see: https://python-watchdog.readthedocs.io/en/stable/

DataAcquisitionEventHandler( observer, path, filenames, extension, instrument_id, run_id, current_sample)
19    def __init__(self, observer, path, filenames, extension, instrument_id, run_id, current_sample):
20
21        self.observer = observer
22        self.path = path
23        self.filenames = filenames
24        self.extension = extension
25        self.instrument_id = instrument_id
26        self.run_id = run_id
27        self.current_sample = current_sample
def on_created(self, event):
30    def on_created(self, event):
31
32        """
33        Listens to data acquisition path and starts watching newly-created data files,
34        but only if the filename exists in the sequence.
35
36        If the acquisition listener process was restarted, the last sample being acquired
37        or monitored will be reprocessed.
38
39        Args:
40            event (FileCreatedEvent): Event representing file / directory creation.
41
42        Returns:
43            None
44        """
45
46        # Remove directory path and file extension from filename
47        path = event.src_path.replace("\\", "/")
48        filename = path.split("/")[-1].split(".")[0]
49
50        # For restarted jobs: process the sample that was being acquired when the job was interrupted
51        if self.current_sample is not None:
52            if os.path.exists(self.path + self.current_sample + "." + self.extension):
53                self.trigger_pipeline(self.path, self.current_sample, self.extension)
54                self.current_sample = None
55
56        # Route data file to pipeline
57        if not event.is_directory and filename in self.filenames:
58            self.trigger_pipeline(self.path, filename, self.extension)

Listens to data acquisition path and starts watching newly-created data files, but only if the filename exists in the sequence.

If the acquisition listener process was restarted, the last sample being acquired or monitored will be reprocessed.

Arguments:
  • event (FileCreatedEvent): Event representing file / directory creation.
Returns:

None

def watch_file(self, path, filename, extension, next_sample=None):
 61    def watch_file(self, path, filename, extension, next_sample=None):
 62
 63        """
 64        Returns True if MD5 checksums match AND next sample in sequence has begun acquiring,
 65        effectively determining whether sample acquisition has been completed.
 66
 67        Checksum matching is checking if the MD5 checksum computed for the file matches the
 68        MD5 checksum that was written to the database 3 minutes ago.
 69
 70        If watching the last sample in the sequence, this function will skip checking for the next sample.
 71
 72        Args:
 73            path (str): Data acquisition path
 74            filename (str): Name of sample data file
 75            extension (str): Data file extension, derived from instrument vendor
 76            next_sample (str, default None): Next sample in sequence after current sample being watched
 77
 78        Returns:
 79            bool: True if data acquisition is deemed complete.
 80        """
 81
 82        # Write initial MD5 checksum to database
 83        md5_checksum = get_md5(path + filename + "." + extension)
 84        db.update_md5_checksum(self.instrument_id, filename, md5_checksum)
 85
 86        # Watch file indefinitely
 87        while os.path.exists(path):
 88
 89            # Wait 3 minutes
 90            print("Waiting 3 minutes...")
 91            time.sleep(180)
 92
 93            # Compare checksums
 94            new_md5 = get_md5(path + filename + "." + extension)
 95            old_md5 = db.get_md5(self.instrument_id, filename)
 96
 97            # If the MD5 checksum after 3 mins is the same as before, route to pipeline
 98            if new_md5 == old_md5:
 99                print("MD5 checksums matched.")
100
101                if next_sample is None:
102                    print("Preparing to process file.")
103                    time.sleep(180)
104                    return True
105                else:
106                    if os.path.exists(path + next_sample + "." + extension):
107                        print("Preparing to process file.")
108                        time.sleep(180)
109                        return True
110                    else:
111                        print("Waiting for instrument to start acquiring next sample:", next_sample)
112                        db.update_md5_checksum(self.instrument_id, filename, new_md5)
113            else:
114                print("MD5 checksums do not match.")
115                db.update_md5_checksum(self.instrument_id, filename, new_md5)

Returns True if MD5 checksums match AND next sample in sequence has begun acquiring, effectively determining whether sample acquisition has been completed.

Checksum matching is checking if the MD5 checksum computed for the file matches the MD5 checksum that was written to the database 3 minutes ago.

If watching the last sample in the sequence, this function will skip checking for the next sample.

Arguments:
  • path (str): Data acquisition path
  • filename (str): Name of sample data file
  • extension (str): Data file extension, derived from instrument vendor
  • next_sample (str, default None): Next sample in sequence after current sample being watched
Returns:

bool: True if data acquisition is deemed complete.

def trigger_pipeline(self, path, filename, extension):
118    def trigger_pipeline(self, path, filename, extension):
119
120        """
121        Wrapper function that routes data file to monitoring and processing functions.
122
123        This function is called every time a data file is created in the data acquisition path.
124        See watch_file() and process_data_file() for more information.
125
126        At the end of the instrument run, marks job as completed and kills listener process.
127
128        Args:
129            path (str): Data acquisition path
130            filename (str): Name of sample data file
131            extension (str): Data file extension, derived from instrument vendor
132
133        Returns:
134            None
135        """
136
137        print("Watching file:", filename)
138
139        # Get next sample
140        try:
141            next_sample = db.get_next_sample(filename, self.instrument_id, self.run_id)
142        except:
143            next_sample = None
144
145        # Start watching file until sample acquisition is complete
146        try:
147            sample_acquired = self.watch_file(path, filename, extension, next_sample)
148        except Exception as error:
149            print("Error while watching file:", error)
150            sample_acquired = None
151
152        # Route data file to MS-AutoQC pipeline
153        if sample_acquired:
154            print("Data acquisition completed for", filename)
155            qc.process_data_file(path, filename, extension, self.instrument_id, self.run_id)
156            print("Data processing for", filename, "complete.")
157
158        # Check if data file was the last sample in the sequence
159        if filename == self.filenames[-1]:
160            # If so, stop acquisition listening
161            print("Last sample acquired. Instrument run complete.")
162            self.observer.stop()
163
164            # Terminate acquisition listener process
165            print("Terminating acquisition listener process.")
166            terminate_job(self.instrument_id, self.run_id)

Wrapper function that routes data file to monitoring and processing functions.

This function is called every time a data file is created in the data acquisition path. See watch_file() and process_data_file() for more information.

At the end of the instrument run, marks job as completed and kills listener process.

Arguments:
  • path (str): Data acquisition path
  • filename (str): Name of sample data file
  • extension (str): Data file extension, derived from instrument vendor
Returns:

None

Inherited Members
watchdog.events.FileSystemEventHandler
dispatch
on_any_event
on_moved
on_deleted
on_modified
on_closed
def start_listener(path, instrument_id, run_id):
169def start_listener(path, instrument_id, run_id):
170
171    """
172    Initializes acquisition listener process to process data files upon sample acquisition completion.
173
174    If the QC job is for a completed instrument run (i.e. all data files have been previously acquired and exist
175    in a directory), then this function simply iterates through the list of filenames and processes each sample.
176
177    If the QC job is for an active instrument run, this function initializes the Watchdog file monitor to capture
178    incoming data files and wait for them to finish writing before processing each sample.
179
180    In addition, to handle crashes, restarted jobs, and other events, this function checks for and processes
181    all unprocessed samples in active instrument runs. Unprocessed samples are defined as data files that exist but
182    do not have QC results in the database.
183    
184    For more information on the Watchdog package, see: https://python-watchdog.readthedocs.io/en/stable/
185
186    Args:
187        path (str): Data acquisition path
188        instrument_id (str): Instrument ID
189        run_id (str): Instrument run ID (job ID)
190
191    Returns:
192        None
193    """
194
195    print("Run monitoring initiated for", path)
196
197    # Check if MS-AutoQC job type is active monitoring or bulk QC
198    is_completed_run = db.is_completed_run(instrument_id, run_id)
199
200    # Retrieve filenames for samples in run
201    filenames = db.get_remaining_samples(instrument_id, run_id)
202
203    # Get data file extension
204    extension = db.get_data_file_type(instrument_id)
205
206    # Format acquisition path
207    path = path.replace("\\", "/")
208    path = path + "/" if path[-1] != "/" else path
209
210    if is_completed_run:
211
212        # Iterate through files and process each one
213        for filename in filenames:
214
215            # If file is not in directory, skip it
216            full_path = path + filename + "." + extension
217
218            if not os.path.exists(full_path):
219                continue
220
221            # Process data file
222            qc.process_data_file(path, filename, extension, instrument_id, run_id)
223            print("Data processing for", filename, "complete.")
224
225        print("Last sample acquired. QC job complete.")
226        terminate_job(instrument_id, run_id)
227
228    else:
229        # Get samples that may have been unprocessed due to an error or accidental termination
230        missing_samples, current_sample = db.get_unprocessed_samples(instrument_id, run_id)
231        print("Current sample:", current_sample)
232
233        # Check for missed samples and process them before starting file monitor
234        if len(missing_samples) > 0:
235
236            # Iterate through files and process each one
237            for filename in missing_samples:
238
239                # If file is not in directory, skip it
240                full_path = path + filename + "." + extension
241                if not os.path.exists(full_path):
242                    continue
243
244                # Process data file
245                qc.process_data_file(path, filename, extension, instrument_id, run_id)
246                print("Data processing for", filename, "complete.")
247
248        # Start file monitor and process files as they are created
249        logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
250
251        observer = Observer()
252        event_handler = DataAcquisitionEventHandler(observer, path, filenames, extension, instrument_id, run_id, current_sample)
253        observer.schedule(event_handler, path, recursive=True)
254        observer.start()
255
256        try:
257            while observer.is_alive():
258                observer.join(1)
259        finally:
260            observer.stop()
261            observer.join()

Initializes acquisition listener process to process data files upon sample acquisition completion.

If the QC job is for a completed instrument run (i.e. all data files have been previously acquired and exist in a directory), then this function simply iterates through the list of filenames and processes each sample.

If the QC job is for an active instrument run, this function initializes the Watchdog file monitor to capture incoming data files and wait for them to finish writing before processing each sample.

In addition, to handle crashes, restarted jobs, and other events, this function checks for and processes all unprocessed samples in active instrument runs. Unprocessed samples are defined as data files that exist but do not have QC results in the database.

For more information on the Watchdog package, see: https://python-watchdog.readthedocs.io/en/stable/

Arguments:
  • path (str): Data acquisition path
  • instrument_id (str): Instrument ID
  • run_id (str): Instrument run ID (job ID)
Returns:

None

def terminate_job(instrument_id, run_id):
264def terminate_job(instrument_id, run_id):
265
266    """
267    Wraps up QC job after the last data file has been routed to the pipeline.
268
269    Performs the following functions:
270        1. Marks instrument run as completed
271        2. Uploads database to Google Drive (if Google Drive sync is enabled)
272        3. Deletes temporary data file directory in /data
273        4. Kills acquisition listener process
274
275    Args:
276        instrument_id (str):
277            Instrument ID
278        run_id (str):
279            Instrument run ID (job ID)
280
281    Returns:
282        None
283    """
284
285    # Mark instrument run as completed
286    db.mark_run_as_completed(instrument_id, run_id)
287
288    # Sync database on run completion
289    if db.sync_is_enabled():
290        db.sync_on_run_completion(instrument_id, run_id)
291
292    # Delete temporary data file directory
293    db.delete_temp_directory(instrument_id, run_id)
294
295    # Kill acquisition listener
296    pid = db.get_pid(instrument_id, run_id)
297    qc.kill_subprocess(pid)

Wraps up QC job after the last data file has been routed to the pipeline.

Performs the following functions:
  1. Marks instrument run as completed
  2. Uploads database to Google Drive (if Google Drive sync is enabled)
  3. Deletes temporary data file directory in /data
  4. Kills acquisition listener process
Arguments:
  • instrument_id (str): Instrument ID
  • run_id (str): Instrument run ID (job ID)
Returns:

None

def get_md5(file_path):
300def get_md5(file_path):
301
302    """
303    Computes MD5 checksum for a given file.
304
305    Args:
306        file_path (str): File path
307
308    Returns:
309        str: MD5 checksum for the given file.
310    """
311
312    hash_md5 = hashlib.md5()
313
314    with open(file_path, "rb") as f:
315        for chunk in iter(lambda: f.read(4096), b""):
316            hash_md5.update(chunk)
317
318    return hash_md5.hexdigest()

Computes MD5 checksum for a given file.

Arguments:
  • file_path (str): File path
Returns:

str: MD5 checksum for the given file.