ms_autoqc.AutoQCProcessing
1import warnings 2warnings.simplefilter(action="ignore", category=FutureWarning) 3 4import os, time, shutil, psutil, traceback 5import pandas as pd 6import numpy as np 7import ms_autoqc.DatabaseFunctions as db 8import ms_autoqc.SlackNotifications as slack_bot 9 10pd.options.mode.chained_assignment = None 11 12def sequence_is_valid(filename, contents, vendor="Thermo Fisher"): 13 14 """ 15 Validates that instrument sequence file contains the correct columns. 16 17 TODO: Add support for other mass spectrometry instrument vendors here. 18 19 Args: 20 filename (str): 21 Acquisition sequence file name 22 contents (io.StringIO): 23 Acquisition sequence as in-memory file object 24 vendor (str, default "Thermo Fisher"): 25 Instrument vendor for parsing sequence 26 27 Returns: 28 True if sequence table is valid, otherwise False. 29 """ 30 31 if ".csv" not in filename: 32 return False 33 34 # Select columns from sequence using correct vendor software nomenclature 35 if vendor == "Thermo Fisher": 36 37 # Attempt to load sequence file as a pandas DataFrame 38 try: 39 df_sequence = pd.read_csv(contents, index_col=False) 40 except Exception as error: 41 print("Sequence file could not be read.") 42 traceback.print_exc() 43 return False 44 45 df_sequence.columns = df_sequence.iloc[0] 46 df_sequence = df_sequence.drop(df_sequence.index[0]) 47 48 # Define required columns and columns found in sequence file 49 required_columns = ["File Name", "Path", "Instrument Method", "Position", "Inj Vol"] 50 sequence_file_columns = df_sequence.columns.tolist() 51 52 # Check that the required columns are present 53 for column in required_columns: 54 if column not in sequence_file_columns: 55 return False 56 57 return True 58 59 60def metadata_is_valid(filename, contents): 61 62 """ 63 Validates that sample metadata file contains the required columns. 64 65 Args: 66 filename (str): 67 Metadata file name 68 contents (io.StringIO): 69 Metadata file as in-memory file object 70 71 Returns: 72 True if metadata table is valid, otherwise False. 73 """ 74 75 if ".csv" not in filename: 76 return False 77 78 # Attempt to load metadata file as a pandas DataFrame 79 try: 80 df_metadata = pd.read_csv(contents, index_col=False) 81 except Exception as error: 82 print("Metadata file could not be read.") 83 traceback.print_exc() 84 return False 85 86 # Define required columns and columns found in metadata file 87 required_columns = ["Filename", "Name from collaborator", "Sample Name", "Species", 88 "Matrix", "Growth-Harvest Conditions", "Treatment"] 89 metadata_file_columns = df_metadata.columns.tolist() 90 91 # Check that the required columns are present 92 for column in required_columns: 93 if column not in metadata_file_columns: 94 return False 95 96 return True 97 98 99def chromatography_valid(chromatography): 100 101 """ 102 Validates that MSP / TXT files for the given chromatography method exist. 103 104 TODO: Per Brian, some labs don't run in both polarities. Will need to make this function flexible for that. 105 106 Args: 107 chromatography (str): Chromatography method ID to validate 108 109 Returns: 110 True if chromatography method files exist, False if not. 111 """ 112 113 # Get chromatography method from database 114 df_methods = db.get_chromatography_methods() 115 df_methods = df_methods.loc[df_methods["method_id"] == chromatography] 116 117 if len(df_methods) == 0: 118 return False 119 120 # Check whether the method's MSP / TXT files exist 121 pos_msp_file = db.get_msp_file_path(chromatography, "Positive") 122 neg_msp_file = db.get_msp_file_path(chromatography, "Negative") 123 124 if not os.path.isfile(pos_msp_file) or not os.path.isfile(neg_msp_file): 125 return False 126 127 return True 128 129 130def biological_standards_valid(chromatography, biological_standards): 131 132 """ 133 Validates that the given list of biological standards have MSP files. 134 135 Args: 136 chromatography (str): 137 Chromatography method ID to validate 138 biological_standards (list): 139 List of biological standards to validate 140 141 Returns: 142 True if all MSP / TXT files exist, False if not. 143 """ 144 145 # Query the provided biological standards from the database 146 df = db.get_biological_standards() 147 df = df.loc[df["chromatography"] == chromatography] 148 df = df.loc[df["name"].isin(biological_standards)] 149 150 # Check whether the method's MSP / TXT files exist, and return False if they don't 151 pos_msp_files = df["pos_bio_msp_file"].astype(str).tolist() 152 neg_msp_files = df["neg_bio_msp_file"].astype(str).tolist() 153 msp_files = pos_msp_files + neg_msp_files 154 155 for file in msp_files: 156 file_path = os.path.join(os.getcwd(), "data", "methods", file) 157 if not os.path.isfile(file_path): 158 return False 159 160 return True 161 162 163def convert_sequence_to_json(sequence_contents, vendor="Thermo Fisher"): 164 165 """ 166 Converts sequence table to JSON string for database storage. 167 168 TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) 169 instead of pd.read_json(json_string, orient="split"). 170 171 Args: 172 sequence_contents (io.StringIO): 173 Acquisition file as in-memory file object 174 vendor (str, default "Thermo Fisher"): 175 Instrument vendor for parsing sequence 176 177 Returns: 178 JSON string of acquisition sequence DataFrame in "split" format. 179 """ 180 181 # Select columns from sequence using correct vendor software nomenclature 182 if vendor == "Thermo Fisher": 183 df_sequence = pd.read_csv(sequence_contents, index_col=False) 184 df_sequence.columns = df_sequence.iloc[0] 185 df_sequence = df_sequence.drop(df_sequence.index[0]) 186 187 # Convert DataFrames to JSON strings 188 return df_sequence.to_json(orient="split") 189 190 191def convert_metadata_to_json(metadata_contents): 192 193 """ 194 Converts sequence and metadata files to JSON strings for database storage. 195 196 TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) 197 instead of pd.read_json(json_string, orient="split"). 198 199 Args: 200 metadata_contents (io.StringIO): Metadata file as in-memory file object 201 202 Returns: 203 JSON string of sample metadata DataFrame in "split" format. 204 """ 205 206 # Select columns from metadata 207 df_metadata = pd.read_csv(metadata_contents, index_col=False) 208 209 # Convert DataFrames to JSON strings 210 return df_metadata.to_json(orient="split") 211 212 213def run_msconvert(path, filename, extension, output_folder): 214 215 """ 216 Makes a copy of data file and converts it from instrument vendor format to open mzML format. 217 218 This function runs msconvert.exe in a background process. It checks every second for 30 seconds if the 219 mzML file was created, and if it hangs, will terminate the msconvert subprocess and return None. 220 221 TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. 222 The output folder is always fixed, so this parameter should be removed. 223 224 Args: 225 path (str): 226 Data acquisition path (with "/" at the end) 227 filename (str): 228 Name of sample data file 229 extension (str): 230 Data file extension, derived from instrument vendor 231 output_folder (str): 232 Output directory for mzML file – this is always ../data/instrument_id_run_id/data 233 234 Returns: 235 File path for mzML file (*.mzml) 236 """ 237 238 # Remove files in output folder (if any) 239 try: 240 for file in os.listdir(output_folder): 241 os.remove(output_folder + file) 242 except Exception as error: 243 print(error) 244 finally: 245 # Copy original data file to output folder 246 shutil.copy2(path + filename + "." + extension, output_folder) 247 248 # Get MSConvert.exe 249 try: 250 msconvert_folder = db.get_msconvert_directory() 251 msconvert_exe = '"' + msconvert_folder + '/msconvert.exe" ' 252 except: 253 print("Failed to locate MSConvert.exe!") 254 traceback.print_exc() 255 return None 256 257 # Run MSConvert in a subprocess 258 command = msconvert_exe + output_folder + filename + "." + extension + " -o " + output_folder 259 process = psutil.Popen(command) 260 pid = process.pid 261 262 # Check every second for 30 seconds if mzML file was created; if process hangs, terminate and return None 263 for index in range(31): 264 if not subprocess_is_running(pid): 265 break 266 else: 267 if index != 30: 268 time.sleep(1) 269 else: 270 kill_subprocess(pid) 271 return None 272 273 # Delete copy of original data file 274 data_file_copy = output_folder + filename + "." + extension 275 os.remove(data_file_copy) 276 277 # Return mzML file path to indicate success 278 return output_folder + filename + ".mzml" 279 280 281def run_msdial_processing(filename, msdial_path, parameter_file, input_folder, output_folder): 282 283 """ 284 Processes data file (in mzML format) using the MS-DIAL console app. 285 286 TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. 287 The input and output folders are fixed, so these parameters should be removed. 288 289 Args: 290 filename (str): 291 Name of sample data file 292 msdial_path (str): 293 Path for directory storing MSDialConsoleApp.exe 294 parameter_file (str): 295 Path for parameters.txt file, stored in /methods directory 296 input_folder (str): 297 Input folder – this is always ../data/instrument_id_run_id/data 298 output_folder (str): 299 Output folder – this is always ../data/instrument_id_run_id/results 300 301 Returns: 302 File path for MS-DIAL result file (*.msdial) 303 """ 304 305 # Navigate to directory containing MS-DIAL 306 home = os.getcwd() 307 os.chdir(msdial_path) 308 309 # Run MS-DIAL in a subprocess 310 command = "MsdialConsoleApp.exe lcmsdda -i " + input_folder \ 311 + " -o " + output_folder \ 312 + " -m " + parameter_file + " -p" 313 process = psutil.Popen(command) 314 pid = process.pid 315 316 # Check every second for 30 seconds if process was completed; if process hangs, return None 317 for index in range(31): 318 if not subprocess_is_running(pid): 319 break 320 else: 321 if index != 30: 322 time.sleep(1) 323 else: 324 kill_subprocess(pid) 325 os.chdir(home) 326 return None 327 328 # Clear data file directory for next sample 329 for file in os.listdir(input_folder): 330 filepath = os.path.join(input_folder, file) 331 try: 332 shutil.rmtree(filepath) 333 except OSError: 334 os.remove(filepath) 335 336 # Return to original working directory 337 os.chdir(home) 338 339 # MS-DIAL output filename 340 msdial_result = output_folder + "/" + filename.split(".")[0] + ".msdial" 341 342 # Return .msdial file path 343 return msdial_result 344 345 346def peak_list_to_dataframe(sample_peak_list, df_features): 347 348 """ 349 Filters duplicates and poor annotations from MS-DIAL peak table and creates DataFrame storing 350 m/z, RT, and intensity data for each internal standard (or targeted metabolite) in the sample. 351 352 TODO: Describe duplicate handling in more detail in this docstring. 353 354 Args: 355 sample_peak_list (str): 356 File path for MS-DIAL peak table, an .msdial file located in /data/instrument_id_run_id/results 357 df_features (DataFrame): 358 An m/z - RT table derived from internal standard (or biological standard) MSP library in database 359 360 Returns: 361 DataFrame with m/z, RT, and intensity data for each internal standard / targeted metabolite in the sample. 362 """ 363 364 # Convert .msdial file into a DataFrame 365 df = pd.read_csv(sample_peak_list, sep="\t", engine="python", skip_blank_lines=True) 366 df.rename(columns={"Title": "Name"}, inplace=True) 367 368 # Get only the m/z, RT, and intensity columns 369 df = df[["Name", "Precursor m/z", "RT (min)", "Height", "MSMS spectrum"]] 370 371 # Query only internal standards (or targeted features for biological standard) 372 feature_list = df_features["name"].astype(str).tolist() 373 without_ms2_feature_list = ["w/o MS2:" + feature for feature in feature_list] 374 df = df.loc[(df["Name"].isin(feature_list)) | (df["Name"].isin(without_ms2_feature_list))] 375 376 # Label annotations with and without MS2 377 with_ms2 = df["MSMS spectrum"].notnull() 378 without_ms2 = df["MSMS spectrum"].isnull() 379 df.loc[with_ms2, "MSMS spectrum"] = "MS2" 380 381 # Handle annotations made without MS2 382 if len(df[with_ms2]) > 0: 383 df.replace(["w/o MS2:"], "", regex=True, inplace=True) # Remove "w/o MS2" from annotation name 384 ms2_matching = True # Boolean that says MS/MS was used for identification 385 else: 386 ms2_matching = False 387 388 # Get duplicate annotations in a DataFrame 389 df_duplicates = df[df.duplicated(subset=["Name"], keep=False)] 390 391 # Remove duplicates from peak list DataFrame 392 df = df[~(df.duplicated(subset=["Name"], keep=False))] 393 394 # Handle duplicate annotations 395 if len(df_duplicates) > 0: 396 397 # Get list of annotations that have duplicates in the peak list 398 annotations = df_duplicates[~df_duplicates.duplicated(subset=["Name"])]["Name"].tolist() 399 400 # For each unique feature, choose the annotation that best matches library m/z and RT values 401 for annotation in annotations: 402 403 # Get all duplicate annotations for that feature 404 df_annotation = df_duplicates[df_duplicates["Name"] == annotation] 405 df_feature_in_library = df_features.loc[df_features["name"] == annotation] 406 407 # Calculate delta m/z and delta RT 408 df_annotation["Delta m/z"] = df_annotation["Precursor m/z"].astype(float) - df_feature_in_library["precursor_mz"].astype(float).values[0] 409 df_annotation["Delta RT"] = df_annotation["RT (min)"].astype(float) - df_feature_in_library["retention_time"].astype(float).values[0] 410 411 # Absolute values 412 df_annotation["Delta m/z"] = df_annotation["Delta m/z"].abs() 413 df_annotation["Delta RT"] = df_annotation["Delta RT"].abs() 414 415 # First, remove duplicates without MS2 (if an MSP with MS2 spectra was used for processing) 416 if ms2_matching: 417 new_df = df_annotation.loc[df_annotation["MSMS spectrum"].notnull()] 418 419 # If annotations with MS2 remain, use them moving forward 420 if len(new_df) > 1: 421 df_annotation = new_df 422 423 # If no annotations with MS2 remain, filter annotations without MS2 by height 424 else: 425 # Choose the annotation with the highest intensity 426 if len(df_annotation) > 1: 427 df_annotation = df_annotation.loc[ 428 df_annotation["Height"] == df_annotation["Height"].max()] 429 430 # If there's only one annotation without MS2 left, choose as the "correct" annotation 431 if len(df_annotation) == 1: 432 df = df.append(df_annotation, ignore_index=True) 433 continue 434 435 # Append the annotation with the lowest delta RT and delta m/z to the peak list DataFrame 436 df_rectified = df_annotation.loc[ 437 (df_annotation["Delta m/z"] == df_annotation["Delta m/z"].min()) & 438 (df_annotation["Delta RT"] == df_annotation["Delta RT"].min())] 439 440 # If there is no "best" feature with the lowest delta RT and lowest delta m/z, choose the lowest delta RT 441 if len(df_rectified) == 0: 442 df_rectified = df_annotation.loc[ 443 df_annotation["Delta RT"] == df_annotation["Delta RT"].min()] 444 445 # If the RT's are exactly the same, choose the feature between them with the lowest delta m/z 446 if len(df_rectified) > 1: 447 df_rectified = df_rectified.loc[ 448 df_rectified["Delta m/z"] == df_rectified["Delta m/z"].min()] 449 450 # If they both have the same delta m/z, choose the feature between them with the greatest intensity 451 if len(df_rectified) > 1: 452 df_rectified = df_rectified.loc[ 453 df_rectified["Height"] == df_rectified["Height"].max()] 454 455 # If at this point there's still duplicates for some reason, just choose the first one 456 if len(df_rectified) > 1: 457 df_rectified = df_rectified[:1] 458 459 # Append "correct" annotation to peak list DataFrame 460 df = df.append(df_rectified, ignore_index=True) 461 462 # DataFrame readiness before return 463 try: 464 df.drop(columns=["Delta m/z", "Delta RT"], inplace=True) 465 finally: 466 df.reset_index(drop=True, inplace=True) 467 return df 468 469 470def qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard): 471 472 """ 473 Performs quality control on sample data based on user-defined criteria in Settings > QC Configurations. 474 475 The following quality control parameters are used to determine QC pass, warning, or fail: 476 1. Intensity dropouts cutoff: 477 How many internal standards are missing in the sample? 478 2. RT shift from library value cutoff: 479 How many retention times are shifted from the expected value for the chromatography method? 480 3. RT shift from in-run average cutoff: 481 How many retention times are shifted from their average RT during the run? 482 4. m/z shift from library value cutoff: 483 How many precursor masses are shifted from the expected value for the internal standard? 484 485 This function returns a DataFrame with a single record in the following format: 486 487 | Sample | Delta m/z | Delta RT | In-run delta RT | Warnings | Fails | 488 | ---------- | --------- | -------- | --------------- | --------- | ----- | 489 | SAMPLE_001 | 0.000001 | 0.05 | 0.00001 | Delta RT | None | 490 491 Confusingly, a sample has an overall QC result, as well as QC warnings and fails for each internal standard. 492 This makes it easier to determine what caused the overall QC result. 493 494 See a screenshot of the sample information card in the Overview page of the website for context. 495 496 While the thresholds for QC pass and fail are explicit, allowing the user to determine thresholds for QC warnings 497 was deemed too cumbersome. Instead, an overall QC result of "Warning" happens if any of the following are true: 498 1. The number of intensity dropouts is 75% or more than the defined cutoff 499 2. The QC result is not a "Fail" and 50% or more internal standards have a QC Warning 500 501 For each internal standard, a note is added to the "Warning" or "Fail" column of qc_dataframe based on the user's 502 defined criteria in Settings > QC Configurations. If the internal standard is not marked as a "Fail", then 503 "Warnings" for individual internal standards could be marked if: 504 1. The delta RT is greater than 66% of the "RT shift from library value" cutoff 505 2. The In-run delta RT is greater than 80% of the "RT shift from in-run average value" cutoff 506 3. The delta m/z is greater than 80% of the "RT shift from library value" cutoff 507 508 TODO: Define and implement quality control for biological standards. 509 510 Args: 511 instrument_id (str): 512 Instrument ID 513 run_id (str): 514 Instrument run ID (Job ID) 515 polarity (str): 516 Polarity, either "Pos or "Neg" 517 df_peak_list (DataFrame): 518 Filtered peak table, from peak_list_to_dataframe() 519 df_features (DataFrame): 520 An m/z - RT table derived from internal standard (or biological standard) MSP library in database 521 is_bio_standard (bool): 522 Whether sample is a biological standard or not 523 524 Returns: 525 (DataFrame, str): Tuple containing QC results table and QC result (either "Pass", "Fail", or "Warning"). 526 """ 527 528 # Handles sample QC checks 529 if not is_bio_standard: 530 531 # Refactor internal standards DataFrame 532 df_features = df_features.rename( 533 columns={"name": "Name", 534 "chromatography": "Chromatography", 535 "polarity": "Polarity", 536 "precursor_mz": "Library m/z", 537 "retention_time": "Library RT", 538 "ms2_spectrum": "Library MS2", 539 "inchikey": "Library INCHIKEY"}) 540 541 # Get delta RT and delta m/z values for each internal standard 542 df_compare = pd.merge(df_features, df_peak_list, on="Name") 543 df_compare["Delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["Library RT"].astype(float) 544 df_compare["Delta m/z"] = df_compare["Precursor m/z"].astype(float) - df_compare["Library m/z"].astype(float) 545 df_peak_list_copy = df_peak_list.copy() 546 547 # Get MS-DIAL RT threshold and filter out annotations without MS2 that are outside threshold 548 with_ms2 = df_compare["MSMS spectrum"].notnull() 549 without_ms2 = df_compare["MSMS spectrum"].isnull() 550 annotations_without_ms2 = df_compare[without_ms2]["Name"].astype(str).tolist() 551 552 if len(df_compare[with_ms2]) > 0: 553 rt_threshold = db.get_msdial_configuration_parameters("Default", parameter="post_id_rt_tolerance") 554 outside_rt_threshold = df_compare["Delta RT"].abs() > rt_threshold 555 annotations_to_drop = df_compare.loc[(without_ms2) & (outside_rt_threshold)] 556 557 df_compare.drop(annotations_to_drop.index, inplace=True) 558 annotations_to_drop = annotations_to_drop["Name"].astype(str).tolist() 559 annotations_to_drop = df_peak_list_copy[df_peak_list_copy["Name"].isin(annotations_to_drop)] 560 df_peak_list_copy.drop(annotations_to_drop.index, inplace=True) 561 562 # Get in-run RT average for each internal standard 563 df_compare["In-run RT average"] = np.nan 564 df_run_retention_times = db.parse_internal_standard_data(instrument_id, run_id, "retention_time", polarity, "processing", False) 565 566 if df_run_retention_times is not None: 567 # Calculate in-run RT average for each internal standard 568 for internal_standard in df_run_retention_times.columns: 569 if internal_standard == "Sample": 570 continue 571 in_run_average = df_run_retention_times[internal_standard].dropna().astype(float).mean() 572 df_compare.loc[df_compare["Name"] == internal_standard, "In-run RT average"] = in_run_average 573 574 # Compare each internal standard RT to in-run RT average 575 df_compare["In-run delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["In-run RT average"].astype(float) 576 else: 577 df_compare["In-run delta RT"] = np.nan 578 579 # Prepare final DataFrame 580 qc_dataframe = df_compare[["Name", "Delta m/z", "Delta RT", "In-run delta RT"]] 581 582 # Count internal standard intensity dropouts 583 qc_dataframe["Intensity dropout"] = 0 584 qc_dataframe["Warnings"] = "" 585 qc_dataframe["Fails"] = "" 586 for feature in df_features["Name"].astype(str).tolist(): 587 if feature not in df_peak_list_copy["Name"].astype(str).tolist(): 588 row = {"Name": feature, 589 "Delta m/z": np.nan, 590 "Delta RT": np.nan, 591 "In-run delta RT": np.nan, 592 "Intensity dropout": 1, 593 "Warnings": "", 594 "Fails": ""} 595 qc_dataframe = qc_dataframe.append(row, ignore_index=True) 596 597 # Determine pass / fail based on user criteria 598 qc_config = db.get_qc_configuration_parameters(instrument_id=instrument_id, run_id=run_id) 599 qc_result = "Pass" 600 601 # QC of internal standard intensity dropouts 602 if qc_config["intensity_enabled"].values[0] == 1: 603 604 # Mark fails 605 qc_dataframe.loc[qc_dataframe["Intensity dropout"].astype(int) == 1, "Fails"] = "Missing" 606 607 # Count intensity dropouts 608 intensity_dropouts = qc_dataframe["Intensity dropout"].astype(int).sum() 609 intensity_dropouts_cutoff = qc_config["intensity_dropouts_cutoff"].astype(int).tolist()[0] 610 611 # Compare number of intensity dropouts to user-defined cutoff 612 if intensity_dropouts >= intensity_dropouts_cutoff: 613 qc_result = "Fail" 614 615 if qc_result != "Fail": 616 if intensity_dropouts > intensity_dropouts_cutoff / 1.33: 617 qc_result = "Warning" 618 619 # QC of internal standard RT's against library RT's 620 if qc_config["library_rt_enabled"].values[0] == 1: 621 622 # Check if delta RT's are outside of user-defined cutoff 623 library_rt_shift_cutoff = qc_config["library_rt_shift_cutoff"].astype(float).values[0] 624 625 # Mark fails 626 fails = qc_dataframe["Delta RT"].abs() > library_rt_shift_cutoff 627 qc_dataframe.loc[fails, "Fails"] = "RT" 628 629 # Mark warnings 630 warnings = ((library_rt_shift_cutoff / 1.5) < qc_dataframe["Delta RT"].abs()) & \ 631 ((qc_dataframe["Delta RT"].abs()) < library_rt_shift_cutoff) 632 qc_dataframe.loc[warnings, "Warnings"] = "RT" 633 634 if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2: 635 qc_result = "Fail" 636 else: 637 if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail": 638 qc_result = "Warning" 639 640 # QC of internal standard RT's against in-run RT average 641 if qc_config["in_run_rt_enabled"].values[0] == 1 and df_run_retention_times is not None: 642 643 # Check if in-run delta RT's are outside of user-defined cutoff 644 in_run_rt_shift_cutoff = qc_config["in_run_rt_shift_cutoff"].astype(float).values[0] 645 646 # Mark fails 647 fails = qc_dataframe["In-run delta RT"].abs() > in_run_rt_shift_cutoff 648 qc_dataframe.loc[fails, "Fails"] = "In-Run RT" 649 650 # Mark warnings 651 warnings = ((in_run_rt_shift_cutoff / 1.25) < qc_dataframe["In-run delta RT"].abs()) & \ 652 (qc_dataframe["In-run delta RT"].abs() < in_run_rt_shift_cutoff) 653 qc_dataframe.loc[warnings, "Warnings"] = "In-Run RT" 654 655 if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2: 656 qc_result = "Fail" 657 else: 658 if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail": 659 qc_result = "Warning" 660 661 # QC of internal standard precursor m/z against library m/z 662 if qc_config["library_mz_enabled"].values[0] == 1: 663 664 # Check if delta m/z's are outside of user-defined cutoff 665 library_mz_shift_cutoff = qc_config["library_mz_shift_cutoff"].astype(float).values[0] 666 667 # Mark fails 668 fails = qc_dataframe["Delta m/z"].abs() > library_mz_shift_cutoff 669 qc_dataframe.loc[fails, "Fails"] = "m/z" 670 671 # Mark warnings 672 warnings = ((library_mz_shift_cutoff / 1.25) < qc_dataframe["Delta m/z"].abs()) & \ 673 (qc_dataframe["Delta m/z"].abs() < library_mz_shift_cutoff) 674 qc_dataframe.loc[warnings, "Warnings"] = "m/z" 675 676 if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2: 677 qc_result = "Fail" 678 else: 679 if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail": 680 qc_result = "Warning" 681 682 # Mark annotations without MS2 683 qc_dataframe.loc[qc_dataframe["Name"].isin(annotations_without_ms2), "Warnings"] = "No MS2" 684 685 # Handles biological standard QC checks 686 else: 687 qc_dataframe = pd.DataFrame() 688 qc_result = "Pass" 689 690 return qc_dataframe, qc_result 691 692 693def convert_to_dict(sample_id, df_peak_list, qc_dataframe): 694 695 """ 696 Converts DataFrames to dictionary records, with features as columns and samples as rows, 697 which are then cast to string format for database storage. 698 699 See parse_internal_standard_data() in the DatabaseFunctions module for more information. 700 701 Format: 702 703 | Name | iSTD 1 | iSTD 2 | iSTD 3 | iSTD 4 | ... | 704 | ---------- | ------ | ------ | ------ | ------ | ... | 705 | SAMPLE_001 | 1.207 | 1.934 | 3.953 | 8.132 | ... | 706 707 Args: 708 sample_id (str): 709 Sample ID 710 df_peak_list (DataFrame): 711 Filtered MS-DIAL peak table result 712 qc_dataframe (DataFrame): 713 Table of various QC results 714 715 Returns: 716 (str, str, str, str): Tuple containing dictionary records of m/z, RT, intensity, and 717 QC data, respectively, cast as strings for database storage. 718 """ 719 720 # m/z, RT, intensity 721 df_mz = df_peak_list[["Name", "Precursor m/z"]] 722 df_rt = df_peak_list[["Name", "RT (min)"]] 723 df_intensity = df_peak_list[["Name", "Height"]] 724 725 df_mz = df_mz.rename(columns={"Precursor m/z": sample_id}) 726 df_rt = df_rt.rename(columns={"RT (min)": sample_id}) 727 df_intensity = df_intensity.rename(columns={"Height": sample_id}) 728 729 df_mz = df_mz.transpose().reset_index() 730 df_rt = df_rt.transpose().reset_index() 731 df_intensity = df_intensity.transpose().reset_index() 732 733 df_mz.columns = df_mz.iloc[0].astype(str).tolist() 734 df_rt.columns = df_rt.iloc[0].astype(str).tolist() 735 df_intensity.columns = df_intensity.iloc[0].astype(str).tolist() 736 737 df_mz = df_mz.drop(df_mz.index[0]) 738 df_rt = df_rt.drop(df_rt.index[0]) 739 df_intensity = df_intensity.drop(df_intensity.index[0]) 740 741 mz_record = df_mz.to_dict(orient="records")[0] 742 rt_record = df_rt.to_dict(orient="records")[0] 743 intensity_record = df_intensity.to_dict(orient="records")[0] 744 745 # QC results 746 if len(qc_dataframe) > 0: 747 for column in qc_dataframe.columns: 748 if column != "Name": 749 qc_dataframe.rename(columns={column: sample_id + " " + column}, inplace=True) 750 qc_dataframe = qc_dataframe.transpose().reset_index() 751 qc_dataframe.columns = qc_dataframe.iloc[0].astype(str).tolist() 752 qc_dataframe = qc_dataframe.drop(qc_dataframe.index[0]) 753 qc_dataframe = qc_dataframe.fillna(" ") 754 qc_record = qc_dataframe.to_dict(orient="records") 755 else: 756 qc_record = {} 757 758 return str(mz_record), str(rt_record), str(intensity_record), str(qc_record) 759 760 761def process_data_file(path, filename, extension, instrument_id, run_id): 762 763 """ 764 Processes data file upon sample acquisition completion. 765 766 For more details, please visit the Documentation page on the website. 767 768 Performs the following functions: 769 1. Convert data file to mzML format using MSConvert 770 2. Process data file using MS-DIAL and user-defined parameter configuration 771 3. Load peak table into DataFrame and filter out poor annotations 772 4. Perform quality control checks based on user-defined criteria 773 5. Notify user of QC warnings or fails via Slack or email 774 6. Write QC results to instrument database 775 7. If Google Drive sync is enabled, upload results as CSV files 776 777 Args: 778 path (str): 779 Data acquisition path 780 filename (str): 781 Name of sample data file 782 extension (str): 783 Data file extension, derived from instrument vendor 784 instrument_id (str): 785 Instrument ID 786 run_id (str): 787 Instrument run ID (job ID) 788 789 Returns: 790 None 791 """ 792 793 id = instrument_id.replace(" ", "_") + "_" + run_id 794 795 # Create the necessary directories 796 data_directory = os.path.join(os.getcwd(), r"data") 797 mzml_file_directory = os.path.join(data_directory, id, "data") 798 qc_results_directory = os.path.join(data_directory, id, "results") 799 800 for directory in [data_directory, mzml_file_directory, qc_results_directory]: 801 if not os.path.exists(directory): 802 os.makedirs(directory) 803 804 mzml_file_directory = mzml_file_directory + "/" 805 qc_results_directory = qc_results_directory + "/" 806 807 # Retrieve chromatography, samples, and biological standards using run ID 808 df_run = db.get_instrument_run(instrument_id, run_id) 809 chromatography = df_run["chromatography"].astype(str).values[0] 810 811 df_samples = db.get_samples_in_run(instrument_id, run_id, sample_type="Sample") 812 df_biological_standards = db.get_samples_in_run(instrument_id, run_id, sample_type="Biological Standard") 813 814 # Retrieve MS-DIAL parameters, internal standards, and targeted features from database 815 if filename in df_biological_standards["sample_id"].astype(str).tolist(): 816 817 # Get biological standard type 818 biological_standard = df_biological_standards.loc[ 819 df_biological_standards["sample_id"] == filename] 820 821 # Get polarity 822 try: 823 polarity = biological_standard["polarity"].astype(str).values[0] 824 except Exception as error: 825 print("Could not read polarity from database:", error) 826 print("Using default positive mode.") 827 polarity = "Pos" 828 829 biological_standard = biological_standard["biological_standard"].astype(str).values[0] 830 831 # Get parameters and features for that biological standard type 832 msdial_parameters = db.get_parameter_file_path(chromatography, polarity, biological_standard) 833 df_features = db.get_targeted_features(biological_standard, chromatography, polarity) 834 is_bio_standard = True 835 836 elif filename in df_samples["sample_id"].astype(str).tolist(): 837 838 # Get polarity 839 try: 840 polarity = df_samples.loc[df_samples["sample_id"] == filename]["polarity"].astype(str).values[0] 841 except Exception as error: 842 print("Could not read polarity from database:", error) 843 print("Using default positive mode.") 844 polarity = "Positive" 845 846 msdial_parameters = db.get_parameter_file_path(chromatography, polarity) 847 df_features = db.get_internal_standards(chromatography, polarity) 848 is_bio_standard = False 849 850 else: 851 print("Error! Could not retrieve MS-DIAL libraries and parameters.") 852 return 853 854 # Get MS-DIAL directory 855 msdial_directory = db.get_msdial_directory() 856 857 # Run MSConvert 858 try: 859 mzml_file = run_msconvert(path, filename, extension, mzml_file_directory) 860 861 # For active instrument runs, give 3 more attempts if MSConvert fails 862 if not db.is_completed_run(instrument_id, run_id): 863 for attempt in range(3): 864 if not os.path.exists(mzml_file): 865 print("MSConvert crashed, trying again in 3 minutes...") 866 time.sleep(180) 867 mzml_file = run_msconvert(path, filename, extension, mzml_file_directory) 868 else: 869 break 870 except: 871 mzml_file = None 872 print("Failed to run MSConvert.") 873 traceback.print_exc() 874 875 # Run MS-DIAL 876 if mzml_file is not None: 877 try: 878 peak_list = run_msdial_processing(filename, msdial_directory, msdial_parameters, 879 str(mzml_file_directory), str(qc_results_directory)) 880 except: 881 peak_list = None 882 print("Failed to run MS-DIAL.") 883 traceback.print_exc() 884 885 # Send peak list to MS-AutoQC algorithm if valid 886 if mzml_file is not None and peak_list is not None: 887 888 # Convert peak list to DataFrame 889 try: 890 df_peak_list = peak_list_to_dataframe(peak_list, df_features) 891 except: 892 print("Failed to convert peak list to DataFrame.") 893 traceback.print_exc() 894 return 895 896 # Execute AutoQC algorithm 897 try: 898 qc_dataframe, qc_result = qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard) 899 except: 900 print("Failed to execute AutoQC algorithm.") 901 traceback.print_exc() 902 return 903 904 # Convert m/z, RT, and intensity data to dictionary records in string form 905 try: 906 mz_record, rt_record, intensity_record, qc_record = convert_to_dict(filename, df_peak_list, qc_dataframe) 907 except: 908 print("Failed to convert DataFrames to dictionary record format.") 909 traceback.print_exc() 910 return 911 912 # Delete MS-DIAL result file 913 try: 914 os.remove(qc_results_directory + filename + ".msdial") 915 except Exception as error: 916 print("Failed to remove MS-DIAL result file.") 917 traceback.print_exc() 918 return 919 920 else: 921 print("Failed to process", filename) 922 mz_record = None 923 rt_record = None 924 intensity_record = None 925 qc_record = None 926 qc_result = "Fail" 927 peak_list = None 928 929 # Send email and Slack notification (if they are enabled) 930 try: 931 if qc_result != "Pass": 932 alert = "QC " + qc_result + ": " + filename 933 if peak_list is None: 934 alert = "Failed to process " + filename 935 936 # Send Slack 937 if db.slack_notifications_are_enabled(): 938 slack_bot.send_message(alert) 939 940 # Send email 941 if db.email_notifications_are_enabled(): 942 db.send_email(alert, "Please check on your instrument run accordingly.") 943 944 except: 945 print("Failed to send Slack notification.") 946 traceback.print_exc() 947 948 try: 949 # Write QC results to database and upload to Google Drive 950 db.write_qc_results(filename, instrument_id, run_id, mz_record, rt_record, intensity_record, qc_record, qc_result, is_bio_standard) 951 952 # Update sample counters to trigger dashboard update 953 db.update_sample_counters_for_run(instrument_id=instrument_id, run_id=run_id, latest_sample=filename) 954 955 # If sync is enabled, upload the QC results to Google Drive 956 if db.sync_is_enabled(): 957 db.upload_qc_results(instrument_id, run_id) 958 959 except: 960 print("Failed to write QC results to database.") 961 traceback.print_exc() 962 return 963 964 965def subprocess_is_running(pid): 966 967 """ 968 Returns True if subprocess is still running, and False if not. 969 970 Args: 971 pid (int): Subprocess ID 972 973 Returns: 974 bool: True if subprocess is still running, False if not 975 """ 976 977 if pid is None: 978 return False 979 980 time.sleep(1) 981 982 try: 983 if psutil.Process(pid).status() == "running": 984 return True 985 else: 986 return False 987 except Exception as error: 988 return False 989 990 991def kill_subprocess(pid): 992 993 """ 994 Kills subprocess. 995 996 Args: 997 pid (int): Subprocess ID 998 999 Returns: 1000 None 1001 """ 1002 1003 try: 1004 return psutil.Process(pid).kill() 1005 except Exception as error: 1006 print("Error killing acquisition listener.") 1007 traceback.print_exc()
13def sequence_is_valid(filename, contents, vendor="Thermo Fisher"): 14 15 """ 16 Validates that instrument sequence file contains the correct columns. 17 18 TODO: Add support for other mass spectrometry instrument vendors here. 19 20 Args: 21 filename (str): 22 Acquisition sequence file name 23 contents (io.StringIO): 24 Acquisition sequence as in-memory file object 25 vendor (str, default "Thermo Fisher"): 26 Instrument vendor for parsing sequence 27 28 Returns: 29 True if sequence table is valid, otherwise False. 30 """ 31 32 if ".csv" not in filename: 33 return False 34 35 # Select columns from sequence using correct vendor software nomenclature 36 if vendor == "Thermo Fisher": 37 38 # Attempt to load sequence file as a pandas DataFrame 39 try: 40 df_sequence = pd.read_csv(contents, index_col=False) 41 except Exception as error: 42 print("Sequence file could not be read.") 43 traceback.print_exc() 44 return False 45 46 df_sequence.columns = df_sequence.iloc[0] 47 df_sequence = df_sequence.drop(df_sequence.index[0]) 48 49 # Define required columns and columns found in sequence file 50 required_columns = ["File Name", "Path", "Instrument Method", "Position", "Inj Vol"] 51 sequence_file_columns = df_sequence.columns.tolist() 52 53 # Check that the required columns are present 54 for column in required_columns: 55 if column not in sequence_file_columns: 56 return False 57 58 return True
Validates that instrument sequence file contains the correct columns.
TODO: Add support for other mass spectrometry instrument vendors here.
Arguments:
- filename (str): Acquisition sequence file name
- contents (io.StringIO): Acquisition sequence as in-memory file object
- vendor (str, default "Thermo Fisher"): Instrument vendor for parsing sequence
Returns:
True if sequence table is valid, otherwise False.
61def metadata_is_valid(filename, contents): 62 63 """ 64 Validates that sample metadata file contains the required columns. 65 66 Args: 67 filename (str): 68 Metadata file name 69 contents (io.StringIO): 70 Metadata file as in-memory file object 71 72 Returns: 73 True if metadata table is valid, otherwise False. 74 """ 75 76 if ".csv" not in filename: 77 return False 78 79 # Attempt to load metadata file as a pandas DataFrame 80 try: 81 df_metadata = pd.read_csv(contents, index_col=False) 82 except Exception as error: 83 print("Metadata file could not be read.") 84 traceback.print_exc() 85 return False 86 87 # Define required columns and columns found in metadata file 88 required_columns = ["Filename", "Name from collaborator", "Sample Name", "Species", 89 "Matrix", "Growth-Harvest Conditions", "Treatment"] 90 metadata_file_columns = df_metadata.columns.tolist() 91 92 # Check that the required columns are present 93 for column in required_columns: 94 if column not in metadata_file_columns: 95 return False 96 97 return True
Validates that sample metadata file contains the required columns.
Arguments:
- filename (str): Metadata file name
- contents (io.StringIO): Metadata file as in-memory file object
Returns:
True if metadata table is valid, otherwise False.
100def chromatography_valid(chromatography): 101 102 """ 103 Validates that MSP / TXT files for the given chromatography method exist. 104 105 TODO: Per Brian, some labs don't run in both polarities. Will need to make this function flexible for that. 106 107 Args: 108 chromatography (str): Chromatography method ID to validate 109 110 Returns: 111 True if chromatography method files exist, False if not. 112 """ 113 114 # Get chromatography method from database 115 df_methods = db.get_chromatography_methods() 116 df_methods = df_methods.loc[df_methods["method_id"] == chromatography] 117 118 if len(df_methods) == 0: 119 return False 120 121 # Check whether the method's MSP / TXT files exist 122 pos_msp_file = db.get_msp_file_path(chromatography, "Positive") 123 neg_msp_file = db.get_msp_file_path(chromatography, "Negative") 124 125 if not os.path.isfile(pos_msp_file) or not os.path.isfile(neg_msp_file): 126 return False 127 128 return True
Validates that MSP / TXT files for the given chromatography method exist.
TODO: Per Brian, some labs don't run in both polarities. Will need to make this function flexible for that.
Arguments:
- chromatography (str): Chromatography method ID to validate
Returns:
True if chromatography method files exist, False if not.
131def biological_standards_valid(chromatography, biological_standards): 132 133 """ 134 Validates that the given list of biological standards have MSP files. 135 136 Args: 137 chromatography (str): 138 Chromatography method ID to validate 139 biological_standards (list): 140 List of biological standards to validate 141 142 Returns: 143 True if all MSP / TXT files exist, False if not. 144 """ 145 146 # Query the provided biological standards from the database 147 df = db.get_biological_standards() 148 df = df.loc[df["chromatography"] == chromatography] 149 df = df.loc[df["name"].isin(biological_standards)] 150 151 # Check whether the method's MSP / TXT files exist, and return False if they don't 152 pos_msp_files = df["pos_bio_msp_file"].astype(str).tolist() 153 neg_msp_files = df["neg_bio_msp_file"].astype(str).tolist() 154 msp_files = pos_msp_files + neg_msp_files 155 156 for file in msp_files: 157 file_path = os.path.join(os.getcwd(), "data", "methods", file) 158 if not os.path.isfile(file_path): 159 return False 160 161 return True
Validates that the given list of biological standards have MSP files.
Arguments:
- chromatography (str): Chromatography method ID to validate
- biological_standards (list): List of biological standards to validate
Returns:
True if all MSP / TXT files exist, False if not.
164def convert_sequence_to_json(sequence_contents, vendor="Thermo Fisher"): 165 166 """ 167 Converts sequence table to JSON string for database storage. 168 169 TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) 170 instead of pd.read_json(json_string, orient="split"). 171 172 Args: 173 sequence_contents (io.StringIO): 174 Acquisition file as in-memory file object 175 vendor (str, default "Thermo Fisher"): 176 Instrument vendor for parsing sequence 177 178 Returns: 179 JSON string of acquisition sequence DataFrame in "split" format. 180 """ 181 182 # Select columns from sequence using correct vendor software nomenclature 183 if vendor == "Thermo Fisher": 184 df_sequence = pd.read_csv(sequence_contents, index_col=False) 185 df_sequence.columns = df_sequence.iloc[0] 186 df_sequence = df_sequence.drop(df_sequence.index[0]) 187 188 # Convert DataFrames to JSON strings 189 return df_sequence.to_json(orient="split")
Converts sequence table to JSON string for database storage.
TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) instead of pd.read_json(json_string, orient="split").
Arguments:
- sequence_contents (io.StringIO): Acquisition file as in-memory file object
- vendor (str, default "Thermo Fisher"): Instrument vendor for parsing sequence
Returns:
JSON string of acquisition sequence DataFrame in "split" format.
192def convert_metadata_to_json(metadata_contents): 193 194 """ 195 Converts sequence and metadata files to JSON strings for database storage. 196 197 TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) 198 instead of pd.read_json(json_string, orient="split"). 199 200 Args: 201 metadata_contents (io.StringIO): Metadata file as in-memory file object 202 203 Returns: 204 JSON string of sample metadata DataFrame in "split" format. 205 """ 206 207 # Select columns from metadata 208 df_metadata = pd.read_csv(metadata_contents, index_col=False) 209 210 # Convert DataFrames to JSON strings 211 return df_metadata.to_json(orient="split")
Converts sequence and metadata files to JSON strings for database storage.
TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) instead of pd.read_json(json_string, orient="split").
Arguments:
- metadata_contents (io.StringIO): Metadata file as in-memory file object
Returns:
JSON string of sample metadata DataFrame in "split" format.
214def run_msconvert(path, filename, extension, output_folder): 215 216 """ 217 Makes a copy of data file and converts it from instrument vendor format to open mzML format. 218 219 This function runs msconvert.exe in a background process. It checks every second for 30 seconds if the 220 mzML file was created, and if it hangs, will terminate the msconvert subprocess and return None. 221 222 TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. 223 The output folder is always fixed, so this parameter should be removed. 224 225 Args: 226 path (str): 227 Data acquisition path (with "/" at the end) 228 filename (str): 229 Name of sample data file 230 extension (str): 231 Data file extension, derived from instrument vendor 232 output_folder (str): 233 Output directory for mzML file – this is always ../data/instrument_id_run_id/data 234 235 Returns: 236 File path for mzML file (*.mzml) 237 """ 238 239 # Remove files in output folder (if any) 240 try: 241 for file in os.listdir(output_folder): 242 os.remove(output_folder + file) 243 except Exception as error: 244 print(error) 245 finally: 246 # Copy original data file to output folder 247 shutil.copy2(path + filename + "." + extension, output_folder) 248 249 # Get MSConvert.exe 250 try: 251 msconvert_folder = db.get_msconvert_directory() 252 msconvert_exe = '"' + msconvert_folder + '/msconvert.exe" ' 253 except: 254 print("Failed to locate MSConvert.exe!") 255 traceback.print_exc() 256 return None 257 258 # Run MSConvert in a subprocess 259 command = msconvert_exe + output_folder + filename + "." + extension + " -o " + output_folder 260 process = psutil.Popen(command) 261 pid = process.pid 262 263 # Check every second for 30 seconds if mzML file was created; if process hangs, terminate and return None 264 for index in range(31): 265 if not subprocess_is_running(pid): 266 break 267 else: 268 if index != 30: 269 time.sleep(1) 270 else: 271 kill_subprocess(pid) 272 return None 273 274 # Delete copy of original data file 275 data_file_copy = output_folder + filename + "." + extension 276 os.remove(data_file_copy) 277 278 # Return mzML file path to indicate success 279 return output_folder + filename + ".mzml"
Makes a copy of data file and converts it from instrument vendor format to open mzML format.
This function runs msconvert.exe in a background process. It checks every second for 30 seconds if the mzML file was created, and if it hangs, will terminate the msconvert subprocess and return None.
TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. The output folder is always fixed, so this parameter should be removed.
Arguments:
- path (str): Data acquisition path (with "/" at the end)
- filename (str): Name of sample data file
- extension (str): Data file extension, derived from instrument vendor
- output_folder (str): Output directory for mzML file – this is always ../data/instrument_id_run_id/data
Returns:
File path for mzML file (*.mzml)
282def run_msdial_processing(filename, msdial_path, parameter_file, input_folder, output_folder): 283 284 """ 285 Processes data file (in mzML format) using the MS-DIAL console app. 286 287 TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. 288 The input and output folders are fixed, so these parameters should be removed. 289 290 Args: 291 filename (str): 292 Name of sample data file 293 msdial_path (str): 294 Path for directory storing MSDialConsoleApp.exe 295 parameter_file (str): 296 Path for parameters.txt file, stored in /methods directory 297 input_folder (str): 298 Input folder – this is always ../data/instrument_id_run_id/data 299 output_folder (str): 300 Output folder – this is always ../data/instrument_id_run_id/results 301 302 Returns: 303 File path for MS-DIAL result file (*.msdial) 304 """ 305 306 # Navigate to directory containing MS-DIAL 307 home = os.getcwd() 308 os.chdir(msdial_path) 309 310 # Run MS-DIAL in a subprocess 311 command = "MsdialConsoleApp.exe lcmsdda -i " + input_folder \ 312 + " -o " + output_folder \ 313 + " -m " + parameter_file + " -p" 314 process = psutil.Popen(command) 315 pid = process.pid 316 317 # Check every second for 30 seconds if process was completed; if process hangs, return None 318 for index in range(31): 319 if not subprocess_is_running(pid): 320 break 321 else: 322 if index != 30: 323 time.sleep(1) 324 else: 325 kill_subprocess(pid) 326 os.chdir(home) 327 return None 328 329 # Clear data file directory for next sample 330 for file in os.listdir(input_folder): 331 filepath = os.path.join(input_folder, file) 332 try: 333 shutil.rmtree(filepath) 334 except OSError: 335 os.remove(filepath) 336 337 # Return to original working directory 338 os.chdir(home) 339 340 # MS-DIAL output filename 341 msdial_result = output_folder + "/" + filename.split(".")[0] + ".msdial" 342 343 # Return .msdial file path 344 return msdial_result
Processes data file (in mzML format) using the MS-DIAL console app.
TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. The input and output folders are fixed, so these parameters should be removed.
Arguments:
- filename (str): Name of sample data file
- msdial_path (str): Path for directory storing MSDialConsoleApp.exe
- parameter_file (str): Path for parameters.txt file, stored in /methods directory
- input_folder (str): Input folder – this is always ../data/instrument_id_run_id/data
- output_folder (str): Output folder – this is always ../data/instrument_id_run_id/results
Returns:
File path for MS-DIAL result file (*.msdial)
347def peak_list_to_dataframe(sample_peak_list, df_features): 348 349 """ 350 Filters duplicates and poor annotations from MS-DIAL peak table and creates DataFrame storing 351 m/z, RT, and intensity data for each internal standard (or targeted metabolite) in the sample. 352 353 TODO: Describe duplicate handling in more detail in this docstring. 354 355 Args: 356 sample_peak_list (str): 357 File path for MS-DIAL peak table, an .msdial file located in /data/instrument_id_run_id/results 358 df_features (DataFrame): 359 An m/z - RT table derived from internal standard (or biological standard) MSP library in database 360 361 Returns: 362 DataFrame with m/z, RT, and intensity data for each internal standard / targeted metabolite in the sample. 363 """ 364 365 # Convert .msdial file into a DataFrame 366 df = pd.read_csv(sample_peak_list, sep="\t", engine="python", skip_blank_lines=True) 367 df.rename(columns={"Title": "Name"}, inplace=True) 368 369 # Get only the m/z, RT, and intensity columns 370 df = df[["Name", "Precursor m/z", "RT (min)", "Height", "MSMS spectrum"]] 371 372 # Query only internal standards (or targeted features for biological standard) 373 feature_list = df_features["name"].astype(str).tolist() 374 without_ms2_feature_list = ["w/o MS2:" + feature for feature in feature_list] 375 df = df.loc[(df["Name"].isin(feature_list)) | (df["Name"].isin(without_ms2_feature_list))] 376 377 # Label annotations with and without MS2 378 with_ms2 = df["MSMS spectrum"].notnull() 379 without_ms2 = df["MSMS spectrum"].isnull() 380 df.loc[with_ms2, "MSMS spectrum"] = "MS2" 381 382 # Handle annotations made without MS2 383 if len(df[with_ms2]) > 0: 384 df.replace(["w/o MS2:"], "", regex=True, inplace=True) # Remove "w/o MS2" from annotation name 385 ms2_matching = True # Boolean that says MS/MS was used for identification 386 else: 387 ms2_matching = False 388 389 # Get duplicate annotations in a DataFrame 390 df_duplicates = df[df.duplicated(subset=["Name"], keep=False)] 391 392 # Remove duplicates from peak list DataFrame 393 df = df[~(df.duplicated(subset=["Name"], keep=False))] 394 395 # Handle duplicate annotations 396 if len(df_duplicates) > 0: 397 398 # Get list of annotations that have duplicates in the peak list 399 annotations = df_duplicates[~df_duplicates.duplicated(subset=["Name"])]["Name"].tolist() 400 401 # For each unique feature, choose the annotation that best matches library m/z and RT values 402 for annotation in annotations: 403 404 # Get all duplicate annotations for that feature 405 df_annotation = df_duplicates[df_duplicates["Name"] == annotation] 406 df_feature_in_library = df_features.loc[df_features["name"] == annotation] 407 408 # Calculate delta m/z and delta RT 409 df_annotation["Delta m/z"] = df_annotation["Precursor m/z"].astype(float) - df_feature_in_library["precursor_mz"].astype(float).values[0] 410 df_annotation["Delta RT"] = df_annotation["RT (min)"].astype(float) - df_feature_in_library["retention_time"].astype(float).values[0] 411 412 # Absolute values 413 df_annotation["Delta m/z"] = df_annotation["Delta m/z"].abs() 414 df_annotation["Delta RT"] = df_annotation["Delta RT"].abs() 415 416 # First, remove duplicates without MS2 (if an MSP with MS2 spectra was used for processing) 417 if ms2_matching: 418 new_df = df_annotation.loc[df_annotation["MSMS spectrum"].notnull()] 419 420 # If annotations with MS2 remain, use them moving forward 421 if len(new_df) > 1: 422 df_annotation = new_df 423 424 # If no annotations with MS2 remain, filter annotations without MS2 by height 425 else: 426 # Choose the annotation with the highest intensity 427 if len(df_annotation) > 1: 428 df_annotation = df_annotation.loc[ 429 df_annotation["Height"] == df_annotation["Height"].max()] 430 431 # If there's only one annotation without MS2 left, choose as the "correct" annotation 432 if len(df_annotation) == 1: 433 df = df.append(df_annotation, ignore_index=True) 434 continue 435 436 # Append the annotation with the lowest delta RT and delta m/z to the peak list DataFrame 437 df_rectified = df_annotation.loc[ 438 (df_annotation["Delta m/z"] == df_annotation["Delta m/z"].min()) & 439 (df_annotation["Delta RT"] == df_annotation["Delta RT"].min())] 440 441 # If there is no "best" feature with the lowest delta RT and lowest delta m/z, choose the lowest delta RT 442 if len(df_rectified) == 0: 443 df_rectified = df_annotation.loc[ 444 df_annotation["Delta RT"] == df_annotation["Delta RT"].min()] 445 446 # If the RT's are exactly the same, choose the feature between them with the lowest delta m/z 447 if len(df_rectified) > 1: 448 df_rectified = df_rectified.loc[ 449 df_rectified["Delta m/z"] == df_rectified["Delta m/z"].min()] 450 451 # If they both have the same delta m/z, choose the feature between them with the greatest intensity 452 if len(df_rectified) > 1: 453 df_rectified = df_rectified.loc[ 454 df_rectified["Height"] == df_rectified["Height"].max()] 455 456 # If at this point there's still duplicates for some reason, just choose the first one 457 if len(df_rectified) > 1: 458 df_rectified = df_rectified[:1] 459 460 # Append "correct" annotation to peak list DataFrame 461 df = df.append(df_rectified, ignore_index=True) 462 463 # DataFrame readiness before return 464 try: 465 df.drop(columns=["Delta m/z", "Delta RT"], inplace=True) 466 finally: 467 df.reset_index(drop=True, inplace=True) 468 return df
Filters duplicates and poor annotations from MS-DIAL peak table and creates DataFrame storing m/z, RT, and intensity data for each internal standard (or targeted metabolite) in the sample.
TODO: Describe duplicate handling in more detail in this docstring.
Arguments:
- sample_peak_list (str): File path for MS-DIAL peak table, an .msdial file located in /data/instrument_id_run_id/results
- df_features (DataFrame): An m/z - RT table derived from internal standard (or biological standard) MSP library in database
Returns:
DataFrame with m/z, RT, and intensity data for each internal standard / targeted metabolite in the sample.
471def qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard): 472 473 """ 474 Performs quality control on sample data based on user-defined criteria in Settings > QC Configurations. 475 476 The following quality control parameters are used to determine QC pass, warning, or fail: 477 1. Intensity dropouts cutoff: 478 How many internal standards are missing in the sample? 479 2. RT shift from library value cutoff: 480 How many retention times are shifted from the expected value for the chromatography method? 481 3. RT shift from in-run average cutoff: 482 How many retention times are shifted from their average RT during the run? 483 4. m/z shift from library value cutoff: 484 How many precursor masses are shifted from the expected value for the internal standard? 485 486 This function returns a DataFrame with a single record in the following format: 487 488 | Sample | Delta m/z | Delta RT | In-run delta RT | Warnings | Fails | 489 | ---------- | --------- | -------- | --------------- | --------- | ----- | 490 | SAMPLE_001 | 0.000001 | 0.05 | 0.00001 | Delta RT | None | 491 492 Confusingly, a sample has an overall QC result, as well as QC warnings and fails for each internal standard. 493 This makes it easier to determine what caused the overall QC result. 494 495 See a screenshot of the sample information card in the Overview page of the website for context. 496 497 While the thresholds for QC pass and fail are explicit, allowing the user to determine thresholds for QC warnings 498 was deemed too cumbersome. Instead, an overall QC result of "Warning" happens if any of the following are true: 499 1. The number of intensity dropouts is 75% or more than the defined cutoff 500 2. The QC result is not a "Fail" and 50% or more internal standards have a QC Warning 501 502 For each internal standard, a note is added to the "Warning" or "Fail" column of qc_dataframe based on the user's 503 defined criteria in Settings > QC Configurations. If the internal standard is not marked as a "Fail", then 504 "Warnings" for individual internal standards could be marked if: 505 1. The delta RT is greater than 66% of the "RT shift from library value" cutoff 506 2. The In-run delta RT is greater than 80% of the "RT shift from in-run average value" cutoff 507 3. The delta m/z is greater than 80% of the "RT shift from library value" cutoff 508 509 TODO: Define and implement quality control for biological standards. 510 511 Args: 512 instrument_id (str): 513 Instrument ID 514 run_id (str): 515 Instrument run ID (Job ID) 516 polarity (str): 517 Polarity, either "Pos or "Neg" 518 df_peak_list (DataFrame): 519 Filtered peak table, from peak_list_to_dataframe() 520 df_features (DataFrame): 521 An m/z - RT table derived from internal standard (or biological standard) MSP library in database 522 is_bio_standard (bool): 523 Whether sample is a biological standard or not 524 525 Returns: 526 (DataFrame, str): Tuple containing QC results table and QC result (either "Pass", "Fail", or "Warning"). 527 """ 528 529 # Handles sample QC checks 530 if not is_bio_standard: 531 532 # Refactor internal standards DataFrame 533 df_features = df_features.rename( 534 columns={"name": "Name", 535 "chromatography": "Chromatography", 536 "polarity": "Polarity", 537 "precursor_mz": "Library m/z", 538 "retention_time": "Library RT", 539 "ms2_spectrum": "Library MS2", 540 "inchikey": "Library INCHIKEY"}) 541 542 # Get delta RT and delta m/z values for each internal standard 543 df_compare = pd.merge(df_features, df_peak_list, on="Name") 544 df_compare["Delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["Library RT"].astype(float) 545 df_compare["Delta m/z"] = df_compare["Precursor m/z"].astype(float) - df_compare["Library m/z"].astype(float) 546 df_peak_list_copy = df_peak_list.copy() 547 548 # Get MS-DIAL RT threshold and filter out annotations without MS2 that are outside threshold 549 with_ms2 = df_compare["MSMS spectrum"].notnull() 550 without_ms2 = df_compare["MSMS spectrum"].isnull() 551 annotations_without_ms2 = df_compare[without_ms2]["Name"].astype(str).tolist() 552 553 if len(df_compare[with_ms2]) > 0: 554 rt_threshold = db.get_msdial_configuration_parameters("Default", parameter="post_id_rt_tolerance") 555 outside_rt_threshold = df_compare["Delta RT"].abs() > rt_threshold 556 annotations_to_drop = df_compare.loc[(without_ms2) & (outside_rt_threshold)] 557 558 df_compare.drop(annotations_to_drop.index, inplace=True) 559 annotations_to_drop = annotations_to_drop["Name"].astype(str).tolist() 560 annotations_to_drop = df_peak_list_copy[df_peak_list_copy["Name"].isin(annotations_to_drop)] 561 df_peak_list_copy.drop(annotations_to_drop.index, inplace=True) 562 563 # Get in-run RT average for each internal standard 564 df_compare["In-run RT average"] = np.nan 565 df_run_retention_times = db.parse_internal_standard_data(instrument_id, run_id, "retention_time", polarity, "processing", False) 566 567 if df_run_retention_times is not None: 568 # Calculate in-run RT average for each internal standard 569 for internal_standard in df_run_retention_times.columns: 570 if internal_standard == "Sample": 571 continue 572 in_run_average = df_run_retention_times[internal_standard].dropna().astype(float).mean() 573 df_compare.loc[df_compare["Name"] == internal_standard, "In-run RT average"] = in_run_average 574 575 # Compare each internal standard RT to in-run RT average 576 df_compare["In-run delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["In-run RT average"].astype(float) 577 else: 578 df_compare["In-run delta RT"] = np.nan 579 580 # Prepare final DataFrame 581 qc_dataframe = df_compare[["Name", "Delta m/z", "Delta RT", "In-run delta RT"]] 582 583 # Count internal standard intensity dropouts 584 qc_dataframe["Intensity dropout"] = 0 585 qc_dataframe["Warnings"] = "" 586 qc_dataframe["Fails"] = "" 587 for feature in df_features["Name"].astype(str).tolist(): 588 if feature not in df_peak_list_copy["Name"].astype(str).tolist(): 589 row = {"Name": feature, 590 "Delta m/z": np.nan, 591 "Delta RT": np.nan, 592 "In-run delta RT": np.nan, 593 "Intensity dropout": 1, 594 "Warnings": "", 595 "Fails": ""} 596 qc_dataframe = qc_dataframe.append(row, ignore_index=True) 597 598 # Determine pass / fail based on user criteria 599 qc_config = db.get_qc_configuration_parameters(instrument_id=instrument_id, run_id=run_id) 600 qc_result = "Pass" 601 602 # QC of internal standard intensity dropouts 603 if qc_config["intensity_enabled"].values[0] == 1: 604 605 # Mark fails 606 qc_dataframe.loc[qc_dataframe["Intensity dropout"].astype(int) == 1, "Fails"] = "Missing" 607 608 # Count intensity dropouts 609 intensity_dropouts = qc_dataframe["Intensity dropout"].astype(int).sum() 610 intensity_dropouts_cutoff = qc_config["intensity_dropouts_cutoff"].astype(int).tolist()[0] 611 612 # Compare number of intensity dropouts to user-defined cutoff 613 if intensity_dropouts >= intensity_dropouts_cutoff: 614 qc_result = "Fail" 615 616 if qc_result != "Fail": 617 if intensity_dropouts > intensity_dropouts_cutoff / 1.33: 618 qc_result = "Warning" 619 620 # QC of internal standard RT's against library RT's 621 if qc_config["library_rt_enabled"].values[0] == 1: 622 623 # Check if delta RT's are outside of user-defined cutoff 624 library_rt_shift_cutoff = qc_config["library_rt_shift_cutoff"].astype(float).values[0] 625 626 # Mark fails 627 fails = qc_dataframe["Delta RT"].abs() > library_rt_shift_cutoff 628 qc_dataframe.loc[fails, "Fails"] = "RT" 629 630 # Mark warnings 631 warnings = ((library_rt_shift_cutoff / 1.5) < qc_dataframe["Delta RT"].abs()) & \ 632 ((qc_dataframe["Delta RT"].abs()) < library_rt_shift_cutoff) 633 qc_dataframe.loc[warnings, "Warnings"] = "RT" 634 635 if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2: 636 qc_result = "Fail" 637 else: 638 if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail": 639 qc_result = "Warning" 640 641 # QC of internal standard RT's against in-run RT average 642 if qc_config["in_run_rt_enabled"].values[0] == 1 and df_run_retention_times is not None: 643 644 # Check if in-run delta RT's are outside of user-defined cutoff 645 in_run_rt_shift_cutoff = qc_config["in_run_rt_shift_cutoff"].astype(float).values[0] 646 647 # Mark fails 648 fails = qc_dataframe["In-run delta RT"].abs() > in_run_rt_shift_cutoff 649 qc_dataframe.loc[fails, "Fails"] = "In-Run RT" 650 651 # Mark warnings 652 warnings = ((in_run_rt_shift_cutoff / 1.25) < qc_dataframe["In-run delta RT"].abs()) & \ 653 (qc_dataframe["In-run delta RT"].abs() < in_run_rt_shift_cutoff) 654 qc_dataframe.loc[warnings, "Warnings"] = "In-Run RT" 655 656 if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2: 657 qc_result = "Fail" 658 else: 659 if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail": 660 qc_result = "Warning" 661 662 # QC of internal standard precursor m/z against library m/z 663 if qc_config["library_mz_enabled"].values[0] == 1: 664 665 # Check if delta m/z's are outside of user-defined cutoff 666 library_mz_shift_cutoff = qc_config["library_mz_shift_cutoff"].astype(float).values[0] 667 668 # Mark fails 669 fails = qc_dataframe["Delta m/z"].abs() > library_mz_shift_cutoff 670 qc_dataframe.loc[fails, "Fails"] = "m/z" 671 672 # Mark warnings 673 warnings = ((library_mz_shift_cutoff / 1.25) < qc_dataframe["Delta m/z"].abs()) & \ 674 (qc_dataframe["Delta m/z"].abs() < library_mz_shift_cutoff) 675 qc_dataframe.loc[warnings, "Warnings"] = "m/z" 676 677 if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2: 678 qc_result = "Fail" 679 else: 680 if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail": 681 qc_result = "Warning" 682 683 # Mark annotations without MS2 684 qc_dataframe.loc[qc_dataframe["Name"].isin(annotations_without_ms2), "Warnings"] = "No MS2" 685 686 # Handles biological standard QC checks 687 else: 688 qc_dataframe = pd.DataFrame() 689 qc_result = "Pass" 690 691 return qc_dataframe, qc_result
Performs quality control on sample data based on user-defined criteria in Settings > QC Configurations.
The following quality control parameters are used to determine QC pass, warning, or fail: 1. Intensity dropouts cutoff: How many internal standards are missing in the sample? 2. RT shift from library value cutoff: How many retention times are shifted from the expected value for the chromatography method? 3. RT shift from in-run average cutoff: How many retention times are shifted from their average RT during the run? 4. m/z shift from library value cutoff: How many precursor masses are shifted from the expected value for the internal standard?
This function returns a DataFrame with a single record in the following format:
Sample | Delta m/z | Delta RT | In-run delta RT | Warnings | Fails |
---|---|---|---|---|---|
SAMPLE_001 | 0.000001 | 0.05 | 0.00001 | Delta RT | None |
Confusingly, a sample has an overall QC result, as well as QC warnings and fails for each internal standard. This makes it easier to determine what caused the overall QC result.
See a screenshot of the sample information card in the Overview page of the website for context.
While the thresholds for QC pass and fail are explicit, allowing the user to determine thresholds for QC warnings was deemed too cumbersome. Instead, an overall QC result of "Warning" happens if any of the following are true: 1. The number of intensity dropouts is 75% or more than the defined cutoff 2. The QC result is not a "Fail" and 50% or more internal standards have a QC Warning
For each internal standard, a note is added to the "Warning" or "Fail" column of qc_dataframe based on the user's defined criteria in Settings > QC Configurations. If the internal standard is not marked as a "Fail", then "Warnings" for individual internal standards could be marked if: 1. The delta RT is greater than 66% of the "RT shift from library value" cutoff 2. The In-run delta RT is greater than 80% of the "RT shift from in-run average value" cutoff 3. The delta m/z is greater than 80% of the "RT shift from library value" cutoff
TODO: Define and implement quality control for biological standards.
Arguments:
- instrument_id (str): Instrument ID
- run_id (str): Instrument run ID (Job ID)
- polarity (str): Polarity, either "Pos or "Neg"
- df_peak_list (DataFrame): Filtered peak table, from peak_list_to_dataframe()
- df_features (DataFrame): An m/z - RT table derived from internal standard (or biological standard) MSP library in database
- is_bio_standard (bool): Whether sample is a biological standard or not
Returns:
(DataFrame, str): Tuple containing QC results table and QC result (either "Pass", "Fail", or "Warning").
694def convert_to_dict(sample_id, df_peak_list, qc_dataframe): 695 696 """ 697 Converts DataFrames to dictionary records, with features as columns and samples as rows, 698 which are then cast to string format for database storage. 699 700 See parse_internal_standard_data() in the DatabaseFunctions module for more information. 701 702 Format: 703 704 | Name | iSTD 1 | iSTD 2 | iSTD 3 | iSTD 4 | ... | 705 | ---------- | ------ | ------ | ------ | ------ | ... | 706 | SAMPLE_001 | 1.207 | 1.934 | 3.953 | 8.132 | ... | 707 708 Args: 709 sample_id (str): 710 Sample ID 711 df_peak_list (DataFrame): 712 Filtered MS-DIAL peak table result 713 qc_dataframe (DataFrame): 714 Table of various QC results 715 716 Returns: 717 (str, str, str, str): Tuple containing dictionary records of m/z, RT, intensity, and 718 QC data, respectively, cast as strings for database storage. 719 """ 720 721 # m/z, RT, intensity 722 df_mz = df_peak_list[["Name", "Precursor m/z"]] 723 df_rt = df_peak_list[["Name", "RT (min)"]] 724 df_intensity = df_peak_list[["Name", "Height"]] 725 726 df_mz = df_mz.rename(columns={"Precursor m/z": sample_id}) 727 df_rt = df_rt.rename(columns={"RT (min)": sample_id}) 728 df_intensity = df_intensity.rename(columns={"Height": sample_id}) 729 730 df_mz = df_mz.transpose().reset_index() 731 df_rt = df_rt.transpose().reset_index() 732 df_intensity = df_intensity.transpose().reset_index() 733 734 df_mz.columns = df_mz.iloc[0].astype(str).tolist() 735 df_rt.columns = df_rt.iloc[0].astype(str).tolist() 736 df_intensity.columns = df_intensity.iloc[0].astype(str).tolist() 737 738 df_mz = df_mz.drop(df_mz.index[0]) 739 df_rt = df_rt.drop(df_rt.index[0]) 740 df_intensity = df_intensity.drop(df_intensity.index[0]) 741 742 mz_record = df_mz.to_dict(orient="records")[0] 743 rt_record = df_rt.to_dict(orient="records")[0] 744 intensity_record = df_intensity.to_dict(orient="records")[0] 745 746 # QC results 747 if len(qc_dataframe) > 0: 748 for column in qc_dataframe.columns: 749 if column != "Name": 750 qc_dataframe.rename(columns={column: sample_id + " " + column}, inplace=True) 751 qc_dataframe = qc_dataframe.transpose().reset_index() 752 qc_dataframe.columns = qc_dataframe.iloc[0].astype(str).tolist() 753 qc_dataframe = qc_dataframe.drop(qc_dataframe.index[0]) 754 qc_dataframe = qc_dataframe.fillna(" ") 755 qc_record = qc_dataframe.to_dict(orient="records") 756 else: 757 qc_record = {} 758 759 return str(mz_record), str(rt_record), str(intensity_record), str(qc_record)
Converts DataFrames to dictionary records, with features as columns and samples as rows, which are then cast to string format for database storage.
See parse_internal_standard_data() in the DatabaseFunctions module for more information.
Format:
| Name | iSTD 1 | iSTD 2 | iSTD 3 | iSTD 4 | ... | | ---------- | ------ | ------ | ------ | ------ | ... | | SAMPLE_001 | 1.207 | 1.934 | 3.953 | 8.132 | ... |
Arguments:
- sample_id (str): Sample ID
- df_peak_list (DataFrame): Filtered MS-DIAL peak table result
- qc_dataframe (DataFrame): Table of various QC results
Returns:
(str, str, str, str): Tuple containing dictionary records of m/z, RT, intensity, and QC data, respectively, cast as strings for database storage.
762def process_data_file(path, filename, extension, instrument_id, run_id): 763 764 """ 765 Processes data file upon sample acquisition completion. 766 767 For more details, please visit the Documentation page on the website. 768 769 Performs the following functions: 770 1. Convert data file to mzML format using MSConvert 771 2. Process data file using MS-DIAL and user-defined parameter configuration 772 3. Load peak table into DataFrame and filter out poor annotations 773 4. Perform quality control checks based on user-defined criteria 774 5. Notify user of QC warnings or fails via Slack or email 775 6. Write QC results to instrument database 776 7. If Google Drive sync is enabled, upload results as CSV files 777 778 Args: 779 path (str): 780 Data acquisition path 781 filename (str): 782 Name of sample data file 783 extension (str): 784 Data file extension, derived from instrument vendor 785 instrument_id (str): 786 Instrument ID 787 run_id (str): 788 Instrument run ID (job ID) 789 790 Returns: 791 None 792 """ 793 794 id = instrument_id.replace(" ", "_") + "_" + run_id 795 796 # Create the necessary directories 797 data_directory = os.path.join(os.getcwd(), r"data") 798 mzml_file_directory = os.path.join(data_directory, id, "data") 799 qc_results_directory = os.path.join(data_directory, id, "results") 800 801 for directory in [data_directory, mzml_file_directory, qc_results_directory]: 802 if not os.path.exists(directory): 803 os.makedirs(directory) 804 805 mzml_file_directory = mzml_file_directory + "/" 806 qc_results_directory = qc_results_directory + "/" 807 808 # Retrieve chromatography, samples, and biological standards using run ID 809 df_run = db.get_instrument_run(instrument_id, run_id) 810 chromatography = df_run["chromatography"].astype(str).values[0] 811 812 df_samples = db.get_samples_in_run(instrument_id, run_id, sample_type="Sample") 813 df_biological_standards = db.get_samples_in_run(instrument_id, run_id, sample_type="Biological Standard") 814 815 # Retrieve MS-DIAL parameters, internal standards, and targeted features from database 816 if filename in df_biological_standards["sample_id"].astype(str).tolist(): 817 818 # Get biological standard type 819 biological_standard = df_biological_standards.loc[ 820 df_biological_standards["sample_id"] == filename] 821 822 # Get polarity 823 try: 824 polarity = biological_standard["polarity"].astype(str).values[0] 825 except Exception as error: 826 print("Could not read polarity from database:", error) 827 print("Using default positive mode.") 828 polarity = "Pos" 829 830 biological_standard = biological_standard["biological_standard"].astype(str).values[0] 831 832 # Get parameters and features for that biological standard type 833 msdial_parameters = db.get_parameter_file_path(chromatography, polarity, biological_standard) 834 df_features = db.get_targeted_features(biological_standard, chromatography, polarity) 835 is_bio_standard = True 836 837 elif filename in df_samples["sample_id"].astype(str).tolist(): 838 839 # Get polarity 840 try: 841 polarity = df_samples.loc[df_samples["sample_id"] == filename]["polarity"].astype(str).values[0] 842 except Exception as error: 843 print("Could not read polarity from database:", error) 844 print("Using default positive mode.") 845 polarity = "Positive" 846 847 msdial_parameters = db.get_parameter_file_path(chromatography, polarity) 848 df_features = db.get_internal_standards(chromatography, polarity) 849 is_bio_standard = False 850 851 else: 852 print("Error! Could not retrieve MS-DIAL libraries and parameters.") 853 return 854 855 # Get MS-DIAL directory 856 msdial_directory = db.get_msdial_directory() 857 858 # Run MSConvert 859 try: 860 mzml_file = run_msconvert(path, filename, extension, mzml_file_directory) 861 862 # For active instrument runs, give 3 more attempts if MSConvert fails 863 if not db.is_completed_run(instrument_id, run_id): 864 for attempt in range(3): 865 if not os.path.exists(mzml_file): 866 print("MSConvert crashed, trying again in 3 minutes...") 867 time.sleep(180) 868 mzml_file = run_msconvert(path, filename, extension, mzml_file_directory) 869 else: 870 break 871 except: 872 mzml_file = None 873 print("Failed to run MSConvert.") 874 traceback.print_exc() 875 876 # Run MS-DIAL 877 if mzml_file is not None: 878 try: 879 peak_list = run_msdial_processing(filename, msdial_directory, msdial_parameters, 880 str(mzml_file_directory), str(qc_results_directory)) 881 except: 882 peak_list = None 883 print("Failed to run MS-DIAL.") 884 traceback.print_exc() 885 886 # Send peak list to MS-AutoQC algorithm if valid 887 if mzml_file is not None and peak_list is not None: 888 889 # Convert peak list to DataFrame 890 try: 891 df_peak_list = peak_list_to_dataframe(peak_list, df_features) 892 except: 893 print("Failed to convert peak list to DataFrame.") 894 traceback.print_exc() 895 return 896 897 # Execute AutoQC algorithm 898 try: 899 qc_dataframe, qc_result = qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard) 900 except: 901 print("Failed to execute AutoQC algorithm.") 902 traceback.print_exc() 903 return 904 905 # Convert m/z, RT, and intensity data to dictionary records in string form 906 try: 907 mz_record, rt_record, intensity_record, qc_record = convert_to_dict(filename, df_peak_list, qc_dataframe) 908 except: 909 print("Failed to convert DataFrames to dictionary record format.") 910 traceback.print_exc() 911 return 912 913 # Delete MS-DIAL result file 914 try: 915 os.remove(qc_results_directory + filename + ".msdial") 916 except Exception as error: 917 print("Failed to remove MS-DIAL result file.") 918 traceback.print_exc() 919 return 920 921 else: 922 print("Failed to process", filename) 923 mz_record = None 924 rt_record = None 925 intensity_record = None 926 qc_record = None 927 qc_result = "Fail" 928 peak_list = None 929 930 # Send email and Slack notification (if they are enabled) 931 try: 932 if qc_result != "Pass": 933 alert = "QC " + qc_result + ": " + filename 934 if peak_list is None: 935 alert = "Failed to process " + filename 936 937 # Send Slack 938 if db.slack_notifications_are_enabled(): 939 slack_bot.send_message(alert) 940 941 # Send email 942 if db.email_notifications_are_enabled(): 943 db.send_email(alert, "Please check on your instrument run accordingly.") 944 945 except: 946 print("Failed to send Slack notification.") 947 traceback.print_exc() 948 949 try: 950 # Write QC results to database and upload to Google Drive 951 db.write_qc_results(filename, instrument_id, run_id, mz_record, rt_record, intensity_record, qc_record, qc_result, is_bio_standard) 952 953 # Update sample counters to trigger dashboard update 954 db.update_sample_counters_for_run(instrument_id=instrument_id, run_id=run_id, latest_sample=filename) 955 956 # If sync is enabled, upload the QC results to Google Drive 957 if db.sync_is_enabled(): 958 db.upload_qc_results(instrument_id, run_id) 959 960 except: 961 print("Failed to write QC results to database.") 962 traceback.print_exc() 963 return
Processes data file upon sample acquisition completion.
For more details, please visit the Documentation page on the website.
Performs the following functions:
- Convert data file to mzML format using MSConvert
- Process data file using MS-DIAL and user-defined parameter configuration
- Load peak table into DataFrame and filter out poor annotations
- Perform quality control checks based on user-defined criteria
- Notify user of QC warnings or fails via Slack or email
- Write QC results to instrument database
- If Google Drive sync is enabled, upload results as CSV files
Arguments:
- path (str): Data acquisition path
- filename (str): Name of sample data file
- extension (str): Data file extension, derived from instrument vendor
- instrument_id (str): Instrument ID
- run_id (str): Instrument run ID (job ID)
Returns:
None
966def subprocess_is_running(pid): 967 968 """ 969 Returns True if subprocess is still running, and False if not. 970 971 Args: 972 pid (int): Subprocess ID 973 974 Returns: 975 bool: True if subprocess is still running, False if not 976 """ 977 978 if pid is None: 979 return False 980 981 time.sleep(1) 982 983 try: 984 if psutil.Process(pid).status() == "running": 985 return True 986 else: 987 return False 988 except Exception as error: 989 return False
Returns True if subprocess is still running, and False if not.
Arguments:
- pid (int): Subprocess ID
Returns:
bool: True if subprocess is still running, False if not
992def kill_subprocess(pid): 993 994 """ 995 Kills subprocess. 996 997 Args: 998 pid (int): Subprocess ID 999 1000 Returns: 1001 None 1002 """ 1003 1004 try: 1005 return psutil.Process(pid).kill() 1006 except Exception as error: 1007 print("Error killing acquisition listener.") 1008 traceback.print_exc()
Kills subprocess.
Arguments:
- pid (int): Subprocess ID
Returns:
None