ms_autoqc.AutoQCProcessing

   1import warnings
   2warnings.simplefilter(action="ignore", category=FutureWarning)
   3
   4import os, time, shutil, psutil, traceback
   5import pandas as pd
   6import numpy as np
   7import ms_autoqc.DatabaseFunctions as db
   8import ms_autoqc.SlackNotifications as slack_bot
   9
  10pd.options.mode.chained_assignment = None
  11
  12def sequence_is_valid(filename, contents, vendor="Thermo Fisher"):
  13
  14    """
  15    Validates that instrument sequence file contains the correct columns.
  16
  17    TODO: Add support for other mass spectrometry instrument vendors here.
  18
  19    Args:
  20        filename (str):
  21            Acquisition sequence file name
  22        contents (io.StringIO):
  23            Acquisition sequence as in-memory file object
  24        vendor (str, default "Thermo Fisher"):
  25            Instrument vendor for parsing sequence
  26
  27    Returns:
  28        True if sequence table is valid, otherwise False.
  29    """
  30
  31    if ".csv" not in filename:
  32        return False
  33
  34    # Select columns from sequence using correct vendor software nomenclature
  35    if vendor == "Thermo Fisher":
  36
  37        # Attempt to load sequence file as a pandas DataFrame
  38        try:
  39            df_sequence = pd.read_csv(contents, index_col=False)
  40        except Exception as error:
  41            print("Sequence file could not be read.")
  42            traceback.print_exc()
  43            return False
  44
  45        df_sequence.columns = df_sequence.iloc[0]
  46        df_sequence = df_sequence.drop(df_sequence.index[0])
  47
  48        # Define required columns and columns found in sequence file
  49        required_columns = ["File Name", "Path", "Instrument Method", "Position", "Inj Vol"]
  50        sequence_file_columns = df_sequence.columns.tolist()
  51
  52        # Check that the required columns are present
  53        for column in required_columns:
  54            if column not in sequence_file_columns:
  55                return False
  56
  57    return True
  58
  59
  60def metadata_is_valid(filename, contents):
  61
  62    """
  63    Validates that sample metadata file contains the required columns.
  64
  65    Args:
  66        filename (str):
  67            Metadata file name
  68        contents (io.StringIO):
  69            Metadata file as in-memory file object
  70
  71    Returns:
  72        True if metadata table is valid, otherwise False.
  73    """
  74
  75    if ".csv" not in filename:
  76        return False
  77
  78    # Attempt to load metadata file as a pandas DataFrame
  79    try:
  80        df_metadata = pd.read_csv(contents, index_col=False)
  81    except Exception as error:
  82        print("Metadata file could not be read.")
  83        traceback.print_exc()
  84        return False
  85
  86    # Define required columns and columns found in metadata file
  87    required_columns = ["Filename", "Name from collaborator", "Sample Name", "Species",
  88                        "Matrix", "Growth-Harvest Conditions", "Treatment"]
  89    metadata_file_columns = df_metadata.columns.tolist()
  90
  91    # Check that the required columns are present
  92    for column in required_columns:
  93        if column not in metadata_file_columns:
  94            return False
  95
  96    return True
  97
  98
  99def chromatography_valid(chromatography):
 100
 101    """
 102    Validates that MSP / TXT files for the given chromatography method exist.
 103
 104    TODO: Per Brian, some labs don't run in both polarities. Will need to make this function flexible for that.
 105
 106    Args:
 107        chromatography (str): Chromatography method ID to validate
 108
 109    Returns:
 110        True if chromatography method files exist, False if not.
 111    """
 112
 113    # Get chromatography method from database
 114    df_methods = db.get_chromatography_methods()
 115    df_methods = df_methods.loc[df_methods["method_id"] == chromatography]
 116
 117    if len(df_methods) == 0:
 118        return False
 119
 120    # Check whether the method's MSP / TXT files exist
 121    pos_msp_file = db.get_msp_file_path(chromatography, "Positive")
 122    neg_msp_file = db.get_msp_file_path(chromatography, "Negative")
 123
 124    if not os.path.isfile(pos_msp_file) or not os.path.isfile(neg_msp_file):
 125        return False
 126
 127    return True
 128
 129
 130def biological_standards_valid(chromatography, biological_standards):
 131
 132    """
 133    Validates that the given list of biological standards have MSP files.
 134
 135    Args:
 136        chromatography (str):
 137            Chromatography method ID to validate
 138        biological_standards (list):
 139            List of biological standards to validate
 140
 141    Returns:
 142        True if all MSP / TXT files exist, False if not.
 143    """
 144
 145    # Query the provided biological standards from the database
 146    df = db.get_biological_standards()
 147    df = df.loc[df["chromatography"] == chromatography]
 148    df = df.loc[df["name"].isin(biological_standards)]
 149
 150    # Check whether the method's MSP / TXT files exist, and return False if they don't
 151    pos_msp_files = df["pos_bio_msp_file"].astype(str).tolist()
 152    neg_msp_files = df["neg_bio_msp_file"].astype(str).tolist()
 153    msp_files = pos_msp_files + neg_msp_files
 154
 155    for file in msp_files:
 156        file_path = os.path.join(os.getcwd(), "data", "methods", file)
 157        if not os.path.isfile(file_path):
 158            return False
 159
 160    return True
 161
 162
 163def convert_sequence_to_json(sequence_contents, vendor="Thermo Fisher"):
 164
 165    """
 166    Converts sequence table to JSON string for database storage.
 167
 168    TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string))
 169        instead of pd.read_json(json_string, orient="split").
 170
 171    Args:
 172        sequence_contents (io.StringIO):
 173            Acquisition file as in-memory file object
 174        vendor (str, default "Thermo Fisher"):
 175            Instrument vendor for parsing sequence
 176
 177    Returns:
 178        JSON string of acquisition sequence DataFrame in "split" format.
 179    """
 180
 181    # Select columns from sequence using correct vendor software nomenclature
 182    if vendor == "Thermo Fisher":
 183        df_sequence = pd.read_csv(sequence_contents, index_col=False)
 184        df_sequence.columns = df_sequence.iloc[0]
 185        df_sequence = df_sequence.drop(df_sequence.index[0])
 186
 187    # Convert DataFrames to JSON strings
 188    return df_sequence.to_json(orient="split")
 189
 190
 191def convert_metadata_to_json(metadata_contents):
 192
 193    """
 194    Converts sequence and metadata files to JSON strings for database storage.
 195
 196    TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string))
 197        instead of pd.read_json(json_string, orient="split").
 198
 199    Args:
 200        metadata_contents (io.StringIO): Metadata file as in-memory file object
 201
 202    Returns:
 203        JSON string of sample metadata DataFrame in "split" format.
 204    """
 205
 206    # Select columns from metadata
 207    df_metadata = pd.read_csv(metadata_contents, index_col=False)
 208
 209    # Convert DataFrames to JSON strings
 210    return df_metadata.to_json(orient="split")
 211
 212
 213def run_msconvert(path, filename, extension, output_folder):
 214
 215    """
 216    Makes a copy of data file and converts it from instrument vendor format to open mzML format.
 217
 218    This function runs msconvert.exe in a background process. It checks every second for 30 seconds if the
 219    mzML file was created, and if it hangs, will terminate the msconvert subprocess and return None.
 220
 221    TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant.
 222        The output folder is always fixed, so this parameter should be removed.
 223
 224    Args:
 225        path (str):
 226            Data acquisition path (with "/" at the end)
 227        filename (str):
 228            Name of sample data file
 229        extension (str):
 230            Data file extension, derived from instrument vendor
 231        output_folder (str):
 232            Output directory for mzML file – this is always ../data/instrument_id_run_id/data
 233
 234    Returns:
 235        File path for mzML file (*.mzml)
 236    """
 237
 238    # Remove files in output folder (if any)
 239    try:
 240        for file in os.listdir(output_folder):
 241            os.remove(output_folder + file)
 242    except Exception as error:
 243        print(error)
 244    finally:
 245        # Copy original data file to output folder
 246        shutil.copy2(path + filename + "." + extension, output_folder)
 247
 248    # Get MSConvert.exe
 249    try:
 250        msconvert_folder = db.get_msconvert_directory()
 251        msconvert_exe = '"' + msconvert_folder + '/msconvert.exe" '
 252    except:
 253        print("Failed to locate MSConvert.exe!")
 254        traceback.print_exc()
 255        return None
 256
 257    # Run MSConvert in a subprocess
 258    command = msconvert_exe + output_folder + filename + "." + extension + " -o " + output_folder
 259    process = psutil.Popen(command)
 260    pid = process.pid
 261
 262    # Check every second for 30 seconds if mzML file was created; if process hangs, terminate and return None
 263    for index in range(31):
 264        if not subprocess_is_running(pid):
 265            break
 266        else:
 267            if index != 30:
 268                time.sleep(1)
 269            else:
 270                kill_subprocess(pid)
 271                return None
 272
 273    # Delete copy of original data file
 274    data_file_copy = output_folder + filename + "." + extension
 275    os.remove(data_file_copy)
 276
 277    # Return mzML file path to indicate success
 278    return output_folder + filename + ".mzml"
 279
 280
 281def run_msdial_processing(filename, msdial_path, parameter_file, input_folder, output_folder):
 282
 283    """
 284    Processes data file (in mzML format) using the MS-DIAL console app.
 285
 286    TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant.
 287        The input and output folders are fixed, so these parameters should be removed.
 288
 289    Args:
 290        filename (str):
 291            Name of sample data file
 292        msdial_path (str):
 293            Path for directory storing MSDialConsoleApp.exe
 294        parameter_file (str):
 295            Path for parameters.txt file, stored in /methods directory
 296        input_folder (str):
 297            Input folder – this is always ../data/instrument_id_run_id/data
 298        output_folder (str):
 299            Output folder – this is always ../data/instrument_id_run_id/results
 300
 301    Returns:
 302        File path for MS-DIAL result file (*.msdial)
 303    """
 304
 305    # Navigate to directory containing MS-DIAL
 306    home = os.getcwd()
 307    os.chdir(msdial_path)
 308
 309    # Run MS-DIAL in a subprocess
 310    command = "MsdialConsoleApp.exe lcmsdda -i " + input_folder \
 311              + " -o " + output_folder \
 312              + " -m " + parameter_file + " -p"
 313    process = psutil.Popen(command)
 314    pid = process.pid
 315
 316    # Check every second for 30 seconds if process was completed; if process hangs, return None
 317    for index in range(31):
 318        if not subprocess_is_running(pid):
 319            break
 320        else:
 321            if index != 30:
 322                time.sleep(1)
 323            else:
 324                kill_subprocess(pid)
 325                os.chdir(home)
 326                return None
 327
 328    # Clear data file directory for next sample
 329    for file in os.listdir(input_folder):
 330        filepath = os.path.join(input_folder, file)
 331        try:
 332            shutil.rmtree(filepath)
 333        except OSError:
 334            os.remove(filepath)
 335
 336    # Return to original working directory
 337    os.chdir(home)
 338
 339    # MS-DIAL output filename
 340    msdial_result = output_folder + "/" + filename.split(".")[0] + ".msdial"
 341
 342    # Return .msdial file path
 343    return msdial_result
 344
 345
 346def peak_list_to_dataframe(sample_peak_list, df_features):
 347
 348    """
 349    Filters duplicates and poor annotations from MS-DIAL peak table and creates DataFrame storing
 350    m/z, RT, and intensity data for each internal standard (or targeted metabolite) in the sample.
 351
 352    TODO: Describe duplicate handling in more detail in this docstring.
 353
 354    Args:
 355        sample_peak_list (str):
 356            File path for MS-DIAL peak table, an .msdial file located in /data/instrument_id_run_id/results
 357        df_features (DataFrame):
 358            An m/z - RT table derived from internal standard (or biological standard) MSP library in database
 359
 360    Returns:
 361        DataFrame with m/z, RT, and intensity data for each internal standard / targeted metabolite in the sample.
 362    """
 363
 364    # Convert .msdial file into a DataFrame
 365    df = pd.read_csv(sample_peak_list, sep="\t", engine="python", skip_blank_lines=True)
 366    df.rename(columns={"Title": "Name"}, inplace=True)
 367
 368    # Get only the m/z, RT, and intensity columns
 369    df = df[["Name", "Precursor m/z", "RT (min)", "Height", "MSMS spectrum"]]
 370
 371    # Query only internal standards (or targeted features for biological standard)
 372    feature_list = df_features["name"].astype(str).tolist()
 373    without_ms2_feature_list = ["w/o MS2:" + feature for feature in feature_list]
 374    df = df.loc[(df["Name"].isin(feature_list)) | (df["Name"].isin(without_ms2_feature_list))]
 375
 376    # Label annotations with and without MS2
 377    with_ms2 = df["MSMS spectrum"].notnull()
 378    without_ms2 = df["MSMS spectrum"].isnull()
 379    df.loc[with_ms2, "MSMS spectrum"] = "MS2"
 380
 381    # Handle annotations made without MS2
 382    if len(df[with_ms2]) > 0:
 383        df.replace(["w/o MS2:"], "", regex=True, inplace=True)      # Remove "w/o MS2" from annotation name
 384        ms2_matching = True                                         # Boolean that says MS/MS was used for identification
 385    else:
 386        ms2_matching = False
 387
 388    # Get duplicate annotations in a DataFrame
 389    df_duplicates = df[df.duplicated(subset=["Name"], keep=False)]
 390
 391    # Remove duplicates from peak list DataFrame
 392    df = df[~(df.duplicated(subset=["Name"], keep=False))]
 393
 394    # Handle duplicate annotations
 395    if len(df_duplicates) > 0:
 396
 397        # Get list of annotations that have duplicates in the peak list
 398        annotations = df_duplicates[~df_duplicates.duplicated(subset=["Name"])]["Name"].tolist()
 399
 400        # For each unique feature, choose the annotation that best matches library m/z and RT values
 401        for annotation in annotations:
 402
 403            # Get all duplicate annotations for that feature
 404            df_annotation = df_duplicates[df_duplicates["Name"] == annotation]
 405            df_feature_in_library = df_features.loc[df_features["name"] == annotation]
 406
 407            # Calculate delta m/z and delta RT
 408            df_annotation["Delta m/z"] = df_annotation["Precursor m/z"].astype(float) - df_feature_in_library["precursor_mz"].astype(float).values[0]
 409            df_annotation["Delta RT"] = df_annotation["RT (min)"].astype(float) - df_feature_in_library["retention_time"].astype(float).values[0]
 410
 411            # Absolute values
 412            df_annotation["Delta m/z"] = df_annotation["Delta m/z"].abs()
 413            df_annotation["Delta RT"] = df_annotation["Delta RT"].abs()
 414
 415            # First, remove duplicates without MS2 (if an MSP with MS2 spectra was used for processing)
 416            if ms2_matching:
 417                new_df = df_annotation.loc[df_annotation["MSMS spectrum"].notnull()]
 418
 419                # If annotations with MS2 remain, use them moving forward
 420                if len(new_df) > 1:
 421                    df_annotation = new_df
 422
 423                # If no annotations with MS2 remain, filter annotations without MS2 by height
 424                else:
 425                    # Choose the annotation with the highest intensity
 426                    if len(df_annotation) > 1:
 427                        df_annotation = df_annotation.loc[
 428                            df_annotation["Height"] == df_annotation["Height"].max()]
 429
 430                    # If there's only one annotation without MS2 left, choose as the "correct" annotation
 431                    if len(df_annotation) == 1:
 432                        df = df.append(df_annotation, ignore_index=True)
 433                        continue
 434
 435            # Append the annotation with the lowest delta RT and delta m/z to the peak list DataFrame
 436            df_rectified = df_annotation.loc[
 437                (df_annotation["Delta m/z"] == df_annotation["Delta m/z"].min()) &
 438                (df_annotation["Delta RT"] == df_annotation["Delta RT"].min())]
 439
 440            # If there is no "best" feature with the lowest delta RT and lowest delta m/z, choose the lowest delta RT
 441            if len(df_rectified) == 0:
 442                df_rectified = df_annotation.loc[
 443                    df_annotation["Delta RT"] == df_annotation["Delta RT"].min()]
 444
 445            # If the RT's are exactly the same, choose the feature between them with the lowest delta m/z
 446            if len(df_rectified) > 1:
 447                df_rectified = df_rectified.loc[
 448                    df_rectified["Delta m/z"] == df_rectified["Delta m/z"].min()]
 449
 450            # If they both have the same delta m/z, choose the feature between them with the greatest intensity
 451            if len(df_rectified) > 1:
 452                df_rectified = df_rectified.loc[
 453                    df_rectified["Height"] == df_rectified["Height"].max()]
 454
 455            # If at this point there's still duplicates for some reason, just choose the first one
 456            if len(df_rectified) > 1:
 457                df_rectified = df_rectified[:1]
 458
 459            # Append "correct" annotation to peak list DataFrame
 460            df = df.append(df_rectified, ignore_index=True)
 461
 462    # DataFrame readiness before return
 463    try:
 464        df.drop(columns=["Delta m/z", "Delta RT"], inplace=True)
 465    finally:
 466        df.reset_index(drop=True, inplace=True)
 467        return df
 468
 469
 470def qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard):
 471
 472    """
 473    Performs quality control on sample data based on user-defined criteria in Settings > QC Configurations.
 474
 475    The following quality control parameters are used to determine QC pass, warning, or fail:
 476        1. Intensity dropouts cutoff:
 477            How many internal standards are missing in the sample?
 478        2. RT shift from library value cutoff:
 479            How many retention times are shifted from the expected value for the chromatography method?
 480        3. RT shift from in-run average cutoff:
 481            How many retention times are shifted from their average RT during the run?
 482        4. m/z shift from library value cutoff:
 483            How many precursor masses are shifted from the expected value for the internal standard?
 484
 485    This function returns a DataFrame with a single record in the following format:
 486
 487    | Sample     | Delta m/z | Delta RT | In-run delta RT | Warnings  | Fails |
 488    | ---------- | --------- | -------- | --------------- | --------- | ----- |
 489    | SAMPLE_001 | 0.000001  | 0.05     | 0.00001         | Delta RT  | None  |
 490
 491    Confusingly, a sample has an overall QC result, as well as QC warnings and fails for each internal standard.
 492    This makes it easier to determine what caused the overall QC result.
 493
 494    See a screenshot of the sample information card in the Overview page of the website for context.
 495
 496    While the thresholds for QC pass and fail are explicit, allowing the user to determine thresholds for QC warnings
 497    was deemed too cumbersome. Instead, an overall QC result of "Warning" happens if any of the following are true:
 498        1. The number of intensity dropouts is 75% or more than the defined cutoff
 499        2. The QC result is not a "Fail" and 50% or more internal standards have a QC Warning
 500
 501    For each internal standard, a note is added to the "Warning" or "Fail" column of qc_dataframe based on the user's
 502    defined criteria in Settings > QC Configurations. If the internal standard is not marked as a "Fail", then
 503    "Warnings" for individual internal standards could be marked if:
 504        1. The delta RT is greater than 66% of the "RT shift from library value" cutoff
 505        2. The In-run delta RT is greater than 80% of the "RT shift from in-run average value" cutoff
 506        3. The delta m/z is greater than 80% of the "RT shift from library value" cutoff
 507
 508    TODO: Define and implement quality control for biological standards.
 509
 510    Args:
 511        instrument_id (str):
 512            Instrument ID
 513        run_id (str):
 514            Instrument run ID (Job ID)
 515        polarity (str):
 516            Polarity, either "Pos or "Neg"
 517        df_peak_list (DataFrame):
 518            Filtered peak table, from peak_list_to_dataframe()
 519        df_features (DataFrame):
 520            An m/z - RT table derived from internal standard (or biological standard) MSP library in database
 521        is_bio_standard (bool):
 522            Whether sample is a biological standard or not
 523
 524    Returns:
 525        (DataFrame, str): Tuple containing QC results table and QC result (either "Pass", "Fail", or "Warning").
 526    """
 527
 528    # Handles sample QC checks
 529    if not is_bio_standard:
 530
 531        # Refactor internal standards DataFrame
 532        df_features = df_features.rename(
 533            columns={"name": "Name",
 534                     "chromatography": "Chromatography",
 535                     "polarity": "Polarity",
 536                     "precursor_mz": "Library m/z",
 537                     "retention_time": "Library RT",
 538                     "ms2_spectrum": "Library MS2",
 539                     "inchikey": "Library INCHIKEY"})
 540
 541        # Get delta RT and delta m/z values for each internal standard
 542        df_compare = pd.merge(df_features, df_peak_list, on="Name")
 543        df_compare["Delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["Library RT"].astype(float)
 544        df_compare["Delta m/z"] = df_compare["Precursor m/z"].astype(float) - df_compare["Library m/z"].astype(float)
 545        df_peak_list_copy = df_peak_list.copy()
 546
 547        # Get MS-DIAL RT threshold and filter out annotations without MS2 that are outside threshold
 548        with_ms2 = df_compare["MSMS spectrum"].notnull()
 549        without_ms2 = df_compare["MSMS spectrum"].isnull()
 550        annotations_without_ms2 = df_compare[without_ms2]["Name"].astype(str).tolist()
 551
 552        if len(df_compare[with_ms2]) > 0:
 553            rt_threshold = db.get_msdial_configuration_parameters("Default", parameter="post_id_rt_tolerance")
 554            outside_rt_threshold = df_compare["Delta RT"].abs() > rt_threshold
 555            annotations_to_drop = df_compare.loc[(without_ms2) & (outside_rt_threshold)]
 556
 557            df_compare.drop(annotations_to_drop.index, inplace=True)
 558            annotations_to_drop = annotations_to_drop["Name"].astype(str).tolist()
 559            annotations_to_drop = df_peak_list_copy[df_peak_list_copy["Name"].isin(annotations_to_drop)]
 560            df_peak_list_copy.drop(annotations_to_drop.index, inplace=True)
 561
 562        # Get in-run RT average for each internal standard
 563        df_compare["In-run RT average"] = np.nan
 564        df_run_retention_times = db.parse_internal_standard_data(instrument_id, run_id, "retention_time", polarity, "processing", False)
 565
 566        if df_run_retention_times is not None:
 567            # Calculate in-run RT average for each internal standard
 568            for internal_standard in df_run_retention_times.columns:
 569                if internal_standard == "Sample":
 570                    continue
 571                in_run_average = df_run_retention_times[internal_standard].dropna().astype(float).mean()
 572                df_compare.loc[df_compare["Name"] == internal_standard, "In-run RT average"] = in_run_average
 573
 574            # Compare each internal standard RT to in-run RT average
 575            df_compare["In-run delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["In-run RT average"].astype(float)
 576        else:
 577            df_compare["In-run delta RT"] = np.nan
 578
 579        # Prepare final DataFrame
 580        qc_dataframe = df_compare[["Name", "Delta m/z", "Delta RT", "In-run delta RT"]]
 581
 582        # Count internal standard intensity dropouts
 583        qc_dataframe["Intensity dropout"] = 0
 584        qc_dataframe["Warnings"] = ""
 585        qc_dataframe["Fails"] = ""
 586        for feature in df_features["Name"].astype(str).tolist():
 587            if feature not in df_peak_list_copy["Name"].astype(str).tolist():
 588                row = {"Name": feature,
 589                       "Delta m/z": np.nan,
 590                       "Delta RT": np.nan,
 591                       "In-run delta RT": np.nan,
 592                       "Intensity dropout": 1,
 593                       "Warnings": "",
 594                       "Fails": ""}
 595                qc_dataframe = qc_dataframe.append(row, ignore_index=True)
 596
 597        # Determine pass / fail based on user criteria
 598        qc_config = db.get_qc_configuration_parameters(instrument_id=instrument_id, run_id=run_id)
 599        qc_result = "Pass"
 600
 601        # QC of internal standard intensity dropouts
 602        if qc_config["intensity_enabled"].values[0] == 1:
 603
 604            # Mark fails
 605            qc_dataframe.loc[qc_dataframe["Intensity dropout"].astype(int) == 1, "Fails"] = "Missing"
 606
 607            # Count intensity dropouts
 608            intensity_dropouts = qc_dataframe["Intensity dropout"].astype(int).sum()
 609            intensity_dropouts_cutoff = qc_config["intensity_dropouts_cutoff"].astype(int).tolist()[0]
 610
 611            # Compare number of intensity dropouts to user-defined cutoff
 612            if intensity_dropouts >= intensity_dropouts_cutoff:
 613                qc_result = "Fail"
 614
 615            if qc_result != "Fail":
 616                if intensity_dropouts > intensity_dropouts_cutoff / 1.33:
 617                    qc_result = "Warning"
 618
 619        # QC of internal standard RT's against library RT's
 620        if qc_config["library_rt_enabled"].values[0] == 1:
 621
 622            # Check if delta RT's are outside of user-defined cutoff
 623            library_rt_shift_cutoff = qc_config["library_rt_shift_cutoff"].astype(float).values[0]
 624
 625            # Mark fails
 626            fails = qc_dataframe["Delta RT"].abs() > library_rt_shift_cutoff
 627            qc_dataframe.loc[fails, "Fails"] = "RT"
 628
 629            # Mark warnings
 630            warnings = ((library_rt_shift_cutoff / 1.5) < qc_dataframe["Delta RT"].abs()) & \
 631                       ((qc_dataframe["Delta RT"].abs()) < library_rt_shift_cutoff)
 632            qc_dataframe.loc[warnings, "Warnings"] = "RT"
 633
 634            if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2:
 635                qc_result = "Fail"
 636            else:
 637                if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail":
 638                    qc_result = "Warning"
 639
 640        # QC of internal standard RT's against in-run RT average
 641        if qc_config["in_run_rt_enabled"].values[0] == 1 and df_run_retention_times is not None:
 642
 643            # Check if in-run delta RT's are outside of user-defined cutoff
 644            in_run_rt_shift_cutoff = qc_config["in_run_rt_shift_cutoff"].astype(float).values[0]
 645
 646            # Mark fails
 647            fails = qc_dataframe["In-run delta RT"].abs() > in_run_rt_shift_cutoff
 648            qc_dataframe.loc[fails, "Fails"] = "In-Run RT"
 649
 650            # Mark warnings
 651            warnings = ((in_run_rt_shift_cutoff / 1.25) < qc_dataframe["In-run delta RT"].abs()) & \
 652                       (qc_dataframe["In-run delta RT"].abs() < in_run_rt_shift_cutoff)
 653            qc_dataframe.loc[warnings, "Warnings"] = "In-Run RT"
 654
 655            if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2:
 656                qc_result = "Fail"
 657            else:
 658                if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail":
 659                    qc_result = "Warning"
 660
 661        # QC of internal standard precursor m/z against library m/z
 662        if qc_config["library_mz_enabled"].values[0] == 1:
 663
 664            # Check if delta m/z's are outside of user-defined cutoff
 665            library_mz_shift_cutoff = qc_config["library_mz_shift_cutoff"].astype(float).values[0]
 666
 667            # Mark fails
 668            fails = qc_dataframe["Delta m/z"].abs() > library_mz_shift_cutoff
 669            qc_dataframe.loc[fails, "Fails"] = "m/z"
 670
 671            # Mark warnings
 672            warnings = ((library_mz_shift_cutoff / 1.25) < qc_dataframe["Delta m/z"].abs()) & \
 673                       (qc_dataframe["Delta m/z"].abs() < library_mz_shift_cutoff)
 674            qc_dataframe.loc[warnings, "Warnings"] = "m/z"
 675
 676            if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2:
 677                qc_result = "Fail"
 678            else:
 679                if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail":
 680                    qc_result = "Warning"
 681
 682        # Mark annotations without MS2
 683        qc_dataframe.loc[qc_dataframe["Name"].isin(annotations_without_ms2), "Warnings"] = "No MS2"
 684
 685    # Handles biological standard QC checks
 686    else:
 687        qc_dataframe = pd.DataFrame()
 688        qc_result = "Pass"
 689
 690    return qc_dataframe, qc_result
 691
 692
 693def convert_to_dict(sample_id, df_peak_list, qc_dataframe):
 694
 695    """
 696    Converts DataFrames to dictionary records, with features as columns and samples as rows,
 697    which are then cast to string format for database storage.
 698
 699    See parse_internal_standard_data() in the DatabaseFunctions module for more information.
 700
 701    Format:
 702
 703    | Name       | iSTD 1 | iSTD 2 | iSTD 3 | iSTD 4 | ... |
 704    | ---------- | ------ | ------ | ------ | ------ | ... |
 705    | SAMPLE_001 | 1.207  | 1.934  | 3.953  | 8.132  | ... |
 706
 707    Args:
 708        sample_id (str):
 709            Sample ID
 710        df_peak_list (DataFrame):
 711            Filtered MS-DIAL peak table result
 712        qc_dataframe (DataFrame):
 713            Table of various QC results
 714
 715    Returns:
 716        (str, str, str, str): Tuple containing dictionary records of m/z, RT, intensity, and
 717        QC data, respectively, cast as strings for database storage.
 718    """
 719
 720    # m/z, RT, intensity
 721    df_mz = df_peak_list[["Name", "Precursor m/z"]]
 722    df_rt = df_peak_list[["Name", "RT (min)"]]
 723    df_intensity = df_peak_list[["Name", "Height"]]
 724
 725    df_mz = df_mz.rename(columns={"Precursor m/z": sample_id})
 726    df_rt = df_rt.rename(columns={"RT (min)": sample_id})
 727    df_intensity = df_intensity.rename(columns={"Height": sample_id})
 728
 729    df_mz = df_mz.transpose().reset_index()
 730    df_rt = df_rt.transpose().reset_index()
 731    df_intensity = df_intensity.transpose().reset_index()
 732
 733    df_mz.columns = df_mz.iloc[0].astype(str).tolist()
 734    df_rt.columns = df_rt.iloc[0].astype(str).tolist()
 735    df_intensity.columns = df_intensity.iloc[0].astype(str).tolist()
 736
 737    df_mz = df_mz.drop(df_mz.index[0])
 738    df_rt = df_rt.drop(df_rt.index[0])
 739    df_intensity = df_intensity.drop(df_intensity.index[0])
 740
 741    mz_record = df_mz.to_dict(orient="records")[0]
 742    rt_record = df_rt.to_dict(orient="records")[0]
 743    intensity_record = df_intensity.to_dict(orient="records")[0]
 744
 745    # QC results
 746    if len(qc_dataframe) > 0:
 747        for column in qc_dataframe.columns:
 748            if column != "Name":
 749                qc_dataframe.rename(columns={column: sample_id + " " + column}, inplace=True)
 750        qc_dataframe = qc_dataframe.transpose().reset_index()
 751        qc_dataframe.columns = qc_dataframe.iloc[0].astype(str).tolist()
 752        qc_dataframe = qc_dataframe.drop(qc_dataframe.index[0])
 753        qc_dataframe = qc_dataframe.fillna(" ")
 754        qc_record = qc_dataframe.to_dict(orient="records")
 755    else:
 756        qc_record = {}
 757
 758    return str(mz_record), str(rt_record), str(intensity_record), str(qc_record)
 759
 760
 761def process_data_file(path, filename, extension, instrument_id, run_id):
 762
 763    """
 764    Processes data file upon sample acquisition completion.
 765
 766    For more details, please visit the Documentation page on the website.
 767
 768    Performs the following functions:
 769        1. Convert data file to mzML format using MSConvert
 770        2. Process data file using MS-DIAL and user-defined parameter configuration
 771        3. Load peak table into DataFrame and filter out poor annotations
 772        4. Perform quality control checks based on user-defined criteria
 773        5. Notify user of QC warnings or fails via Slack or email
 774        6. Write QC results to instrument database
 775        7. If Google Drive sync is enabled, upload results as CSV files
 776
 777    Args:
 778        path (str):
 779            Data acquisition path
 780        filename (str):
 781            Name of sample data file
 782        extension (str):
 783            Data file extension, derived from instrument vendor
 784        instrument_id (str):
 785            Instrument ID
 786        run_id (str):
 787            Instrument run ID (job ID)
 788
 789    Returns:
 790        None
 791    """
 792
 793    id = instrument_id.replace(" ", "_") + "_" + run_id
 794
 795    # Create the necessary directories
 796    data_directory = os.path.join(os.getcwd(), r"data")
 797    mzml_file_directory = os.path.join(data_directory, id, "data")
 798    qc_results_directory = os.path.join(data_directory, id, "results")
 799
 800    for directory in [data_directory, mzml_file_directory, qc_results_directory]:
 801        if not os.path.exists(directory):
 802            os.makedirs(directory)
 803
 804    mzml_file_directory = mzml_file_directory + "/"
 805    qc_results_directory = qc_results_directory + "/"
 806
 807    # Retrieve chromatography, samples, and biological standards using run ID
 808    df_run = db.get_instrument_run(instrument_id, run_id)
 809    chromatography = df_run["chromatography"].astype(str).values[0]
 810
 811    df_samples = db.get_samples_in_run(instrument_id, run_id, sample_type="Sample")
 812    df_biological_standards = db.get_samples_in_run(instrument_id, run_id, sample_type="Biological Standard")
 813
 814    # Retrieve MS-DIAL parameters, internal standards, and targeted features from database
 815    if filename in df_biological_standards["sample_id"].astype(str).tolist():
 816
 817        # Get biological standard type
 818        biological_standard = df_biological_standards.loc[
 819            df_biological_standards["sample_id"] == filename]
 820
 821        # Get polarity
 822        try:
 823            polarity = biological_standard["polarity"].astype(str).values[0]
 824        except Exception as error:
 825            print("Could not read polarity from database:", error)
 826            print("Using default positive mode.")
 827            polarity = "Pos"
 828
 829        biological_standard = biological_standard["biological_standard"].astype(str).values[0]
 830
 831        # Get parameters and features for that biological standard type
 832        msdial_parameters = db.get_parameter_file_path(chromatography, polarity, biological_standard)
 833        df_features = db.get_targeted_features(biological_standard, chromatography, polarity)
 834        is_bio_standard = True
 835
 836    elif filename in df_samples["sample_id"].astype(str).tolist():
 837
 838        # Get polarity
 839        try:
 840            polarity = df_samples.loc[df_samples["sample_id"] == filename]["polarity"].astype(str).values[0]
 841        except Exception as error:
 842            print("Could not read polarity from database:", error)
 843            print("Using default positive mode.")
 844            polarity = "Positive"
 845
 846        msdial_parameters = db.get_parameter_file_path(chromatography, polarity)
 847        df_features = db.get_internal_standards(chromatography, polarity)
 848        is_bio_standard = False
 849
 850    else:
 851        print("Error! Could not retrieve MS-DIAL libraries and parameters.")
 852        return
 853
 854    # Get MS-DIAL directory
 855    msdial_directory = db.get_msdial_directory()
 856
 857    # Run MSConvert
 858    try:
 859        mzml_file = run_msconvert(path, filename, extension, mzml_file_directory)
 860
 861        # For active instrument runs, give 3 more attempts if MSConvert fails
 862        if not db.is_completed_run(instrument_id, run_id):
 863            for attempt in range(3):
 864                if not os.path.exists(mzml_file):
 865                    print("MSConvert crashed, trying again in 3 minutes...")
 866                    time.sleep(180)
 867                    mzml_file = run_msconvert(path, filename, extension, mzml_file_directory)
 868                else:
 869                    break
 870    except:
 871        mzml_file = None
 872        print("Failed to run MSConvert.")
 873        traceback.print_exc()
 874
 875    # Run MS-DIAL
 876    if mzml_file is not None:
 877        try:
 878            peak_list = run_msdial_processing(filename, msdial_directory, msdial_parameters,
 879                str(mzml_file_directory), str(qc_results_directory))
 880        except:
 881            peak_list = None
 882            print("Failed to run MS-DIAL.")
 883            traceback.print_exc()
 884
 885    # Send peak list to MS-AutoQC algorithm if valid
 886    if mzml_file is not None and peak_list is not None:
 887
 888        # Convert peak list to DataFrame
 889        try:
 890            df_peak_list = peak_list_to_dataframe(peak_list, df_features)
 891        except:
 892            print("Failed to convert peak list to DataFrame.")
 893            traceback.print_exc()
 894            return
 895
 896        # Execute AutoQC algorithm
 897        try:
 898            qc_dataframe, qc_result = qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard)
 899        except:
 900            print("Failed to execute AutoQC algorithm.")
 901            traceback.print_exc()
 902            return
 903
 904        # Convert m/z, RT, and intensity data to dictionary records in string form
 905        try:
 906            mz_record, rt_record, intensity_record, qc_record = convert_to_dict(filename, df_peak_list, qc_dataframe)
 907        except:
 908            print("Failed to convert DataFrames to dictionary record format.")
 909            traceback.print_exc()
 910            return
 911
 912        # Delete MS-DIAL result file
 913        try:
 914            os.remove(qc_results_directory + filename + ".msdial")
 915        except Exception as error:
 916            print("Failed to remove MS-DIAL result file.")
 917            traceback.print_exc()
 918            return
 919
 920    else:
 921        print("Failed to process", filename)
 922        mz_record = None
 923        rt_record = None
 924        intensity_record = None
 925        qc_record = None
 926        qc_result = "Fail"
 927        peak_list = None
 928
 929    # Send email and Slack notification (if they are enabled)
 930    try:
 931        if qc_result != "Pass":
 932            alert = "QC " + qc_result + ": " + filename
 933            if peak_list is None:
 934                alert = "Failed to process " + filename
 935
 936            # Send Slack
 937            if db.slack_notifications_are_enabled():
 938                slack_bot.send_message(alert)
 939
 940            # Send email
 941            if db.email_notifications_are_enabled():
 942                db.send_email(alert, "Please check on your instrument run accordingly.")
 943
 944    except:
 945        print("Failed to send Slack notification.")
 946        traceback.print_exc()
 947
 948    try:
 949        # Write QC results to database and upload to Google Drive
 950        db.write_qc_results(filename, instrument_id, run_id, mz_record, rt_record, intensity_record, qc_record, qc_result, is_bio_standard)
 951
 952        # Update sample counters to trigger dashboard update
 953        db.update_sample_counters_for_run(instrument_id=instrument_id, run_id=run_id, latest_sample=filename)
 954
 955        # If sync is enabled, upload the QC results to Google Drive
 956        if db.sync_is_enabled():
 957            db.upload_qc_results(instrument_id, run_id)
 958
 959    except:
 960        print("Failed to write QC results to database.")
 961        traceback.print_exc()
 962        return
 963
 964
 965def subprocess_is_running(pid):
 966
 967    """
 968    Returns True if subprocess is still running, and False if not.
 969
 970    Args:
 971        pid (int): Subprocess ID
 972
 973    Returns:
 974        bool: True if subprocess is still running, False if not
 975    """
 976
 977    if pid is None:
 978        return False
 979
 980    time.sleep(1)
 981
 982    try:
 983        if psutil.Process(pid).status() == "running":
 984            return True
 985        else:
 986            return False
 987    except Exception as error:
 988        return False
 989
 990
 991def kill_subprocess(pid):
 992
 993    """
 994    Kills subprocess.
 995
 996    Args:
 997        pid (int): Subprocess ID
 998
 999    Returns:
1000        None
1001    """
1002
1003    try:
1004        return psutil.Process(pid).kill()
1005    except Exception as error:
1006        print("Error killing acquisition listener.")
1007        traceback.print_exc()
def sequence_is_valid(filename, contents, vendor='Thermo Fisher'):
13def sequence_is_valid(filename, contents, vendor="Thermo Fisher"):
14
15    """
16    Validates that instrument sequence file contains the correct columns.
17
18    TODO: Add support for other mass spectrometry instrument vendors here.
19
20    Args:
21        filename (str):
22            Acquisition sequence file name
23        contents (io.StringIO):
24            Acquisition sequence as in-memory file object
25        vendor (str, default "Thermo Fisher"):
26            Instrument vendor for parsing sequence
27
28    Returns:
29        True if sequence table is valid, otherwise False.
30    """
31
32    if ".csv" not in filename:
33        return False
34
35    # Select columns from sequence using correct vendor software nomenclature
36    if vendor == "Thermo Fisher":
37
38        # Attempt to load sequence file as a pandas DataFrame
39        try:
40            df_sequence = pd.read_csv(contents, index_col=False)
41        except Exception as error:
42            print("Sequence file could not be read.")
43            traceback.print_exc()
44            return False
45
46        df_sequence.columns = df_sequence.iloc[0]
47        df_sequence = df_sequence.drop(df_sequence.index[0])
48
49        # Define required columns and columns found in sequence file
50        required_columns = ["File Name", "Path", "Instrument Method", "Position", "Inj Vol"]
51        sequence_file_columns = df_sequence.columns.tolist()
52
53        # Check that the required columns are present
54        for column in required_columns:
55            if column not in sequence_file_columns:
56                return False
57
58    return True

Validates that instrument sequence file contains the correct columns.

TODO: Add support for other mass spectrometry instrument vendors here.

Arguments:
  • filename (str): Acquisition sequence file name
  • contents (io.StringIO): Acquisition sequence as in-memory file object
  • vendor (str, default "Thermo Fisher"): Instrument vendor for parsing sequence
Returns:

True if sequence table is valid, otherwise False.

def metadata_is_valid(filename, contents):
61def metadata_is_valid(filename, contents):
62
63    """
64    Validates that sample metadata file contains the required columns.
65
66    Args:
67        filename (str):
68            Metadata file name
69        contents (io.StringIO):
70            Metadata file as in-memory file object
71
72    Returns:
73        True if metadata table is valid, otherwise False.
74    """
75
76    if ".csv" not in filename:
77        return False
78
79    # Attempt to load metadata file as a pandas DataFrame
80    try:
81        df_metadata = pd.read_csv(contents, index_col=False)
82    except Exception as error:
83        print("Metadata file could not be read.")
84        traceback.print_exc()
85        return False
86
87    # Define required columns and columns found in metadata file
88    required_columns = ["Filename", "Name from collaborator", "Sample Name", "Species",
89                        "Matrix", "Growth-Harvest Conditions", "Treatment"]
90    metadata_file_columns = df_metadata.columns.tolist()
91
92    # Check that the required columns are present
93    for column in required_columns:
94        if column not in metadata_file_columns:
95            return False
96
97    return True

Validates that sample metadata file contains the required columns.

Arguments:
  • filename (str): Metadata file name
  • contents (io.StringIO): Metadata file as in-memory file object
Returns:

True if metadata table is valid, otherwise False.

def chromatography_valid(chromatography):
100def chromatography_valid(chromatography):
101
102    """
103    Validates that MSP / TXT files for the given chromatography method exist.
104
105    TODO: Per Brian, some labs don't run in both polarities. Will need to make this function flexible for that.
106
107    Args:
108        chromatography (str): Chromatography method ID to validate
109
110    Returns:
111        True if chromatography method files exist, False if not.
112    """
113
114    # Get chromatography method from database
115    df_methods = db.get_chromatography_methods()
116    df_methods = df_methods.loc[df_methods["method_id"] == chromatography]
117
118    if len(df_methods) == 0:
119        return False
120
121    # Check whether the method's MSP / TXT files exist
122    pos_msp_file = db.get_msp_file_path(chromatography, "Positive")
123    neg_msp_file = db.get_msp_file_path(chromatography, "Negative")
124
125    if not os.path.isfile(pos_msp_file) or not os.path.isfile(neg_msp_file):
126        return False
127
128    return True

Validates that MSP / TXT files for the given chromatography method exist.

TODO: Per Brian, some labs don't run in both polarities. Will need to make this function flexible for that.

Arguments:
  • chromatography (str): Chromatography method ID to validate
Returns:

True if chromatography method files exist, False if not.

def biological_standards_valid(chromatography, biological_standards):
131def biological_standards_valid(chromatography, biological_standards):
132
133    """
134    Validates that the given list of biological standards have MSP files.
135
136    Args:
137        chromatography (str):
138            Chromatography method ID to validate
139        biological_standards (list):
140            List of biological standards to validate
141
142    Returns:
143        True if all MSP / TXT files exist, False if not.
144    """
145
146    # Query the provided biological standards from the database
147    df = db.get_biological_standards()
148    df = df.loc[df["chromatography"] == chromatography]
149    df = df.loc[df["name"].isin(biological_standards)]
150
151    # Check whether the method's MSP / TXT files exist, and return False if they don't
152    pos_msp_files = df["pos_bio_msp_file"].astype(str).tolist()
153    neg_msp_files = df["neg_bio_msp_file"].astype(str).tolist()
154    msp_files = pos_msp_files + neg_msp_files
155
156    for file in msp_files:
157        file_path = os.path.join(os.getcwd(), "data", "methods", file)
158        if not os.path.isfile(file_path):
159            return False
160
161    return True

Validates that the given list of biological standards have MSP files.

Arguments:
  • chromatography (str): Chromatography method ID to validate
  • biological_standards (list): List of biological standards to validate
Returns:

True if all MSP / TXT files exist, False if not.

def convert_sequence_to_json(sequence_contents, vendor='Thermo Fisher'):
164def convert_sequence_to_json(sequence_contents, vendor="Thermo Fisher"):
165
166    """
167    Converts sequence table to JSON string for database storage.
168
169    TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string))
170        instead of pd.read_json(json_string, orient="split").
171
172    Args:
173        sequence_contents (io.StringIO):
174            Acquisition file as in-memory file object
175        vendor (str, default "Thermo Fisher"):
176            Instrument vendor for parsing sequence
177
178    Returns:
179        JSON string of acquisition sequence DataFrame in "split" format.
180    """
181
182    # Select columns from sequence using correct vendor software nomenclature
183    if vendor == "Thermo Fisher":
184        df_sequence = pd.read_csv(sequence_contents, index_col=False)
185        df_sequence.columns = df_sequence.iloc[0]
186        df_sequence = df_sequence.drop(df_sequence.index[0])
187
188    # Convert DataFrames to JSON strings
189    return df_sequence.to_json(orient="split")

Converts sequence table to JSON string for database storage.

TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) instead of pd.read_json(json_string, orient="split").

Arguments:
  • sequence_contents (io.StringIO): Acquisition file as in-memory file object
  • vendor (str, default "Thermo Fisher"): Instrument vendor for parsing sequence
Returns:

JSON string of acquisition sequence DataFrame in "split" format.

def convert_metadata_to_json(metadata_contents):
192def convert_metadata_to_json(metadata_contents):
193
194    """
195    Converts sequence and metadata files to JSON strings for database storage.
196
197    TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string))
198        instead of pd.read_json(json_string, orient="split").
199
200    Args:
201        metadata_contents (io.StringIO): Metadata file as in-memory file object
202
203    Returns:
204        JSON string of sample metadata DataFrame in "split" format.
205    """
206
207    # Select columns from metadata
208    df_metadata = pd.read_csv(metadata_contents, index_col=False)
209
210    # Convert DataFrames to JSON strings
211    return df_metadata.to_json(orient="split")

Converts sequence and metadata files to JSON strings for database storage.

TODO: Convert to "records" orient instead. Much faster to load data using pd.DataFrame(json.loads(json_string)) instead of pd.read_json(json_string, orient="split").

Arguments:
  • metadata_contents (io.StringIO): Metadata file as in-memory file object
Returns:

JSON string of sample metadata DataFrame in "split" format.

def run_msconvert(path, filename, extension, output_folder):
214def run_msconvert(path, filename, extension, output_folder):
215
216    """
217    Makes a copy of data file and converts it from instrument vendor format to open mzML format.
218
219    This function runs msconvert.exe in a background process. It checks every second for 30 seconds if the
220    mzML file was created, and if it hangs, will terminate the msconvert subprocess and return None.
221
222    TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant.
223        The output folder is always fixed, so this parameter should be removed.
224
225    Args:
226        path (str):
227            Data acquisition path (with "/" at the end)
228        filename (str):
229            Name of sample data file
230        extension (str):
231            Data file extension, derived from instrument vendor
232        output_folder (str):
233            Output directory for mzML file – this is always ../data/instrument_id_run_id/data
234
235    Returns:
236        File path for mzML file (*.mzml)
237    """
238
239    # Remove files in output folder (if any)
240    try:
241        for file in os.listdir(output_folder):
242            os.remove(output_folder + file)
243    except Exception as error:
244        print(error)
245    finally:
246        # Copy original data file to output folder
247        shutil.copy2(path + filename + "." + extension, output_folder)
248
249    # Get MSConvert.exe
250    try:
251        msconvert_folder = db.get_msconvert_directory()
252        msconvert_exe = '"' + msconvert_folder + '/msconvert.exe" '
253    except:
254        print("Failed to locate MSConvert.exe!")
255        traceback.print_exc()
256        return None
257
258    # Run MSConvert in a subprocess
259    command = msconvert_exe + output_folder + filename + "." + extension + " -o " + output_folder
260    process = psutil.Popen(command)
261    pid = process.pid
262
263    # Check every second for 30 seconds if mzML file was created; if process hangs, terminate and return None
264    for index in range(31):
265        if not subprocess_is_running(pid):
266            break
267        else:
268            if index != 30:
269                time.sleep(1)
270            else:
271                kill_subprocess(pid)
272                return None
273
274    # Delete copy of original data file
275    data_file_copy = output_folder + filename + "." + extension
276    os.remove(data_file_copy)
277
278    # Return mzML file path to indicate success
279    return output_folder + filename + ".mzml"

Makes a copy of data file and converts it from instrument vendor format to open mzML format.

This function runs msconvert.exe in a background process. It checks every second for 30 seconds if the mzML file was created, and if it hangs, will terminate the msconvert subprocess and return None.

TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. The output folder is always fixed, so this parameter should be removed.

Arguments:
  • path (str): Data acquisition path (with "/" at the end)
  • filename (str): Name of sample data file
  • extension (str): Data file extension, derived from instrument vendor
  • output_folder (str): Output directory for mzML file – this is always ../data/instrument_id_run_id/data
Returns:

File path for mzML file (*.mzml)

def run_msdial_processing(filename, msdial_path, parameter_file, input_folder, output_folder):
282def run_msdial_processing(filename, msdial_path, parameter_file, input_folder, output_folder):
283
284    """
285    Processes data file (in mzML format) using the MS-DIAL console app.
286
287    TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant.
288        The input and output folders are fixed, so these parameters should be removed.
289
290    Args:
291        filename (str):
292            Name of sample data file
293        msdial_path (str):
294            Path for directory storing MSDialConsoleApp.exe
295        parameter_file (str):
296            Path for parameters.txt file, stored in /methods directory
297        input_folder (str):
298            Input folder – this is always ../data/instrument_id_run_id/data
299        output_folder (str):
300            Output folder – this is always ../data/instrument_id_run_id/results
301
302    Returns:
303        File path for MS-DIAL result file (*.msdial)
304    """
305
306    # Navigate to directory containing MS-DIAL
307    home = os.getcwd()
308    os.chdir(msdial_path)
309
310    # Run MS-DIAL in a subprocess
311    command = "MsdialConsoleApp.exe lcmsdda -i " + input_folder \
312              + " -o " + output_folder \
313              + " -m " + parameter_file + " -p"
314    process = psutil.Popen(command)
315    pid = process.pid
316
317    # Check every second for 30 seconds if process was completed; if process hangs, return None
318    for index in range(31):
319        if not subprocess_is_running(pid):
320            break
321        else:
322            if index != 30:
323                time.sleep(1)
324            else:
325                kill_subprocess(pid)
326                os.chdir(home)
327                return None
328
329    # Clear data file directory for next sample
330    for file in os.listdir(input_folder):
331        filepath = os.path.join(input_folder, file)
332        try:
333            shutil.rmtree(filepath)
334        except OSError:
335            os.remove(filepath)
336
337    # Return to original working directory
338    os.chdir(home)
339
340    # MS-DIAL output filename
341    msdial_result = output_folder + "/" + filename.split(".")[0] + ".msdial"
342
343    # Return .msdial file path
344    return msdial_result

Processes data file (in mzML format) using the MS-DIAL console app.

TODO: As MS-AutoQC has evolved, some arguments for this function have become redundant. The input and output folders are fixed, so these parameters should be removed.

Arguments:
  • filename (str): Name of sample data file
  • msdial_path (str): Path for directory storing MSDialConsoleApp.exe
  • parameter_file (str): Path for parameters.txt file, stored in /methods directory
  • input_folder (str): Input folder – this is always ../data/instrument_id_run_id/data
  • output_folder (str): Output folder – this is always ../data/instrument_id_run_id/results
Returns:

File path for MS-DIAL result file (*.msdial)

def peak_list_to_dataframe(sample_peak_list, df_features):
347def peak_list_to_dataframe(sample_peak_list, df_features):
348
349    """
350    Filters duplicates and poor annotations from MS-DIAL peak table and creates DataFrame storing
351    m/z, RT, and intensity data for each internal standard (or targeted metabolite) in the sample.
352
353    TODO: Describe duplicate handling in more detail in this docstring.
354
355    Args:
356        sample_peak_list (str):
357            File path for MS-DIAL peak table, an .msdial file located in /data/instrument_id_run_id/results
358        df_features (DataFrame):
359            An m/z - RT table derived from internal standard (or biological standard) MSP library in database
360
361    Returns:
362        DataFrame with m/z, RT, and intensity data for each internal standard / targeted metabolite in the sample.
363    """
364
365    # Convert .msdial file into a DataFrame
366    df = pd.read_csv(sample_peak_list, sep="\t", engine="python", skip_blank_lines=True)
367    df.rename(columns={"Title": "Name"}, inplace=True)
368
369    # Get only the m/z, RT, and intensity columns
370    df = df[["Name", "Precursor m/z", "RT (min)", "Height", "MSMS spectrum"]]
371
372    # Query only internal standards (or targeted features for biological standard)
373    feature_list = df_features["name"].astype(str).tolist()
374    without_ms2_feature_list = ["w/o MS2:" + feature for feature in feature_list]
375    df = df.loc[(df["Name"].isin(feature_list)) | (df["Name"].isin(without_ms2_feature_list))]
376
377    # Label annotations with and without MS2
378    with_ms2 = df["MSMS spectrum"].notnull()
379    without_ms2 = df["MSMS spectrum"].isnull()
380    df.loc[with_ms2, "MSMS spectrum"] = "MS2"
381
382    # Handle annotations made without MS2
383    if len(df[with_ms2]) > 0:
384        df.replace(["w/o MS2:"], "", regex=True, inplace=True)      # Remove "w/o MS2" from annotation name
385        ms2_matching = True                                         # Boolean that says MS/MS was used for identification
386    else:
387        ms2_matching = False
388
389    # Get duplicate annotations in a DataFrame
390    df_duplicates = df[df.duplicated(subset=["Name"], keep=False)]
391
392    # Remove duplicates from peak list DataFrame
393    df = df[~(df.duplicated(subset=["Name"], keep=False))]
394
395    # Handle duplicate annotations
396    if len(df_duplicates) > 0:
397
398        # Get list of annotations that have duplicates in the peak list
399        annotations = df_duplicates[~df_duplicates.duplicated(subset=["Name"])]["Name"].tolist()
400
401        # For each unique feature, choose the annotation that best matches library m/z and RT values
402        for annotation in annotations:
403
404            # Get all duplicate annotations for that feature
405            df_annotation = df_duplicates[df_duplicates["Name"] == annotation]
406            df_feature_in_library = df_features.loc[df_features["name"] == annotation]
407
408            # Calculate delta m/z and delta RT
409            df_annotation["Delta m/z"] = df_annotation["Precursor m/z"].astype(float) - df_feature_in_library["precursor_mz"].astype(float).values[0]
410            df_annotation["Delta RT"] = df_annotation["RT (min)"].astype(float) - df_feature_in_library["retention_time"].astype(float).values[0]
411
412            # Absolute values
413            df_annotation["Delta m/z"] = df_annotation["Delta m/z"].abs()
414            df_annotation["Delta RT"] = df_annotation["Delta RT"].abs()
415
416            # First, remove duplicates without MS2 (if an MSP with MS2 spectra was used for processing)
417            if ms2_matching:
418                new_df = df_annotation.loc[df_annotation["MSMS spectrum"].notnull()]
419
420                # If annotations with MS2 remain, use them moving forward
421                if len(new_df) > 1:
422                    df_annotation = new_df
423
424                # If no annotations with MS2 remain, filter annotations without MS2 by height
425                else:
426                    # Choose the annotation with the highest intensity
427                    if len(df_annotation) > 1:
428                        df_annotation = df_annotation.loc[
429                            df_annotation["Height"] == df_annotation["Height"].max()]
430
431                    # If there's only one annotation without MS2 left, choose as the "correct" annotation
432                    if len(df_annotation) == 1:
433                        df = df.append(df_annotation, ignore_index=True)
434                        continue
435
436            # Append the annotation with the lowest delta RT and delta m/z to the peak list DataFrame
437            df_rectified = df_annotation.loc[
438                (df_annotation["Delta m/z"] == df_annotation["Delta m/z"].min()) &
439                (df_annotation["Delta RT"] == df_annotation["Delta RT"].min())]
440
441            # If there is no "best" feature with the lowest delta RT and lowest delta m/z, choose the lowest delta RT
442            if len(df_rectified) == 0:
443                df_rectified = df_annotation.loc[
444                    df_annotation["Delta RT"] == df_annotation["Delta RT"].min()]
445
446            # If the RT's are exactly the same, choose the feature between them with the lowest delta m/z
447            if len(df_rectified) > 1:
448                df_rectified = df_rectified.loc[
449                    df_rectified["Delta m/z"] == df_rectified["Delta m/z"].min()]
450
451            # If they both have the same delta m/z, choose the feature between them with the greatest intensity
452            if len(df_rectified) > 1:
453                df_rectified = df_rectified.loc[
454                    df_rectified["Height"] == df_rectified["Height"].max()]
455
456            # If at this point there's still duplicates for some reason, just choose the first one
457            if len(df_rectified) > 1:
458                df_rectified = df_rectified[:1]
459
460            # Append "correct" annotation to peak list DataFrame
461            df = df.append(df_rectified, ignore_index=True)
462
463    # DataFrame readiness before return
464    try:
465        df.drop(columns=["Delta m/z", "Delta RT"], inplace=True)
466    finally:
467        df.reset_index(drop=True, inplace=True)
468        return df

Filters duplicates and poor annotations from MS-DIAL peak table and creates DataFrame storing m/z, RT, and intensity data for each internal standard (or targeted metabolite) in the sample.

TODO: Describe duplicate handling in more detail in this docstring.

Arguments:
  • sample_peak_list (str): File path for MS-DIAL peak table, an .msdial file located in /data/instrument_id_run_id/results
  • df_features (DataFrame): An m/z - RT table derived from internal standard (or biological standard) MSP library in database
Returns:

DataFrame with m/z, RT, and intensity data for each internal standard / targeted metabolite in the sample.

def qc_sample( instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard):
471def qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard):
472
473    """
474    Performs quality control on sample data based on user-defined criteria in Settings > QC Configurations.
475
476    The following quality control parameters are used to determine QC pass, warning, or fail:
477        1. Intensity dropouts cutoff:
478            How many internal standards are missing in the sample?
479        2. RT shift from library value cutoff:
480            How many retention times are shifted from the expected value for the chromatography method?
481        3. RT shift from in-run average cutoff:
482            How many retention times are shifted from their average RT during the run?
483        4. m/z shift from library value cutoff:
484            How many precursor masses are shifted from the expected value for the internal standard?
485
486    This function returns a DataFrame with a single record in the following format:
487
488    | Sample     | Delta m/z | Delta RT | In-run delta RT | Warnings  | Fails |
489    | ---------- | --------- | -------- | --------------- | --------- | ----- |
490    | SAMPLE_001 | 0.000001  | 0.05     | 0.00001         | Delta RT  | None  |
491
492    Confusingly, a sample has an overall QC result, as well as QC warnings and fails for each internal standard.
493    This makes it easier to determine what caused the overall QC result.
494
495    See a screenshot of the sample information card in the Overview page of the website for context.
496
497    While the thresholds for QC pass and fail are explicit, allowing the user to determine thresholds for QC warnings
498    was deemed too cumbersome. Instead, an overall QC result of "Warning" happens if any of the following are true:
499        1. The number of intensity dropouts is 75% or more than the defined cutoff
500        2. The QC result is not a "Fail" and 50% or more internal standards have a QC Warning
501
502    For each internal standard, a note is added to the "Warning" or "Fail" column of qc_dataframe based on the user's
503    defined criteria in Settings > QC Configurations. If the internal standard is not marked as a "Fail", then
504    "Warnings" for individual internal standards could be marked if:
505        1. The delta RT is greater than 66% of the "RT shift from library value" cutoff
506        2. The In-run delta RT is greater than 80% of the "RT shift from in-run average value" cutoff
507        3. The delta m/z is greater than 80% of the "RT shift from library value" cutoff
508
509    TODO: Define and implement quality control for biological standards.
510
511    Args:
512        instrument_id (str):
513            Instrument ID
514        run_id (str):
515            Instrument run ID (Job ID)
516        polarity (str):
517            Polarity, either "Pos or "Neg"
518        df_peak_list (DataFrame):
519            Filtered peak table, from peak_list_to_dataframe()
520        df_features (DataFrame):
521            An m/z - RT table derived from internal standard (or biological standard) MSP library in database
522        is_bio_standard (bool):
523            Whether sample is a biological standard or not
524
525    Returns:
526        (DataFrame, str): Tuple containing QC results table and QC result (either "Pass", "Fail", or "Warning").
527    """
528
529    # Handles sample QC checks
530    if not is_bio_standard:
531
532        # Refactor internal standards DataFrame
533        df_features = df_features.rename(
534            columns={"name": "Name",
535                     "chromatography": "Chromatography",
536                     "polarity": "Polarity",
537                     "precursor_mz": "Library m/z",
538                     "retention_time": "Library RT",
539                     "ms2_spectrum": "Library MS2",
540                     "inchikey": "Library INCHIKEY"})
541
542        # Get delta RT and delta m/z values for each internal standard
543        df_compare = pd.merge(df_features, df_peak_list, on="Name")
544        df_compare["Delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["Library RT"].astype(float)
545        df_compare["Delta m/z"] = df_compare["Precursor m/z"].astype(float) - df_compare["Library m/z"].astype(float)
546        df_peak_list_copy = df_peak_list.copy()
547
548        # Get MS-DIAL RT threshold and filter out annotations without MS2 that are outside threshold
549        with_ms2 = df_compare["MSMS spectrum"].notnull()
550        without_ms2 = df_compare["MSMS spectrum"].isnull()
551        annotations_without_ms2 = df_compare[without_ms2]["Name"].astype(str).tolist()
552
553        if len(df_compare[with_ms2]) > 0:
554            rt_threshold = db.get_msdial_configuration_parameters("Default", parameter="post_id_rt_tolerance")
555            outside_rt_threshold = df_compare["Delta RT"].abs() > rt_threshold
556            annotations_to_drop = df_compare.loc[(without_ms2) & (outside_rt_threshold)]
557
558            df_compare.drop(annotations_to_drop.index, inplace=True)
559            annotations_to_drop = annotations_to_drop["Name"].astype(str).tolist()
560            annotations_to_drop = df_peak_list_copy[df_peak_list_copy["Name"].isin(annotations_to_drop)]
561            df_peak_list_copy.drop(annotations_to_drop.index, inplace=True)
562
563        # Get in-run RT average for each internal standard
564        df_compare["In-run RT average"] = np.nan
565        df_run_retention_times = db.parse_internal_standard_data(instrument_id, run_id, "retention_time", polarity, "processing", False)
566
567        if df_run_retention_times is not None:
568            # Calculate in-run RT average for each internal standard
569            for internal_standard in df_run_retention_times.columns:
570                if internal_standard == "Sample":
571                    continue
572                in_run_average = df_run_retention_times[internal_standard].dropna().astype(float).mean()
573                df_compare.loc[df_compare["Name"] == internal_standard, "In-run RT average"] = in_run_average
574
575            # Compare each internal standard RT to in-run RT average
576            df_compare["In-run delta RT"] = df_compare["RT (min)"].astype(float) - df_compare["In-run RT average"].astype(float)
577        else:
578            df_compare["In-run delta RT"] = np.nan
579
580        # Prepare final DataFrame
581        qc_dataframe = df_compare[["Name", "Delta m/z", "Delta RT", "In-run delta RT"]]
582
583        # Count internal standard intensity dropouts
584        qc_dataframe["Intensity dropout"] = 0
585        qc_dataframe["Warnings"] = ""
586        qc_dataframe["Fails"] = ""
587        for feature in df_features["Name"].astype(str).tolist():
588            if feature not in df_peak_list_copy["Name"].astype(str).tolist():
589                row = {"Name": feature,
590                       "Delta m/z": np.nan,
591                       "Delta RT": np.nan,
592                       "In-run delta RT": np.nan,
593                       "Intensity dropout": 1,
594                       "Warnings": "",
595                       "Fails": ""}
596                qc_dataframe = qc_dataframe.append(row, ignore_index=True)
597
598        # Determine pass / fail based on user criteria
599        qc_config = db.get_qc_configuration_parameters(instrument_id=instrument_id, run_id=run_id)
600        qc_result = "Pass"
601
602        # QC of internal standard intensity dropouts
603        if qc_config["intensity_enabled"].values[0] == 1:
604
605            # Mark fails
606            qc_dataframe.loc[qc_dataframe["Intensity dropout"].astype(int) == 1, "Fails"] = "Missing"
607
608            # Count intensity dropouts
609            intensity_dropouts = qc_dataframe["Intensity dropout"].astype(int).sum()
610            intensity_dropouts_cutoff = qc_config["intensity_dropouts_cutoff"].astype(int).tolist()[0]
611
612            # Compare number of intensity dropouts to user-defined cutoff
613            if intensity_dropouts >= intensity_dropouts_cutoff:
614                qc_result = "Fail"
615
616            if qc_result != "Fail":
617                if intensity_dropouts > intensity_dropouts_cutoff / 1.33:
618                    qc_result = "Warning"
619
620        # QC of internal standard RT's against library RT's
621        if qc_config["library_rt_enabled"].values[0] == 1:
622
623            # Check if delta RT's are outside of user-defined cutoff
624            library_rt_shift_cutoff = qc_config["library_rt_shift_cutoff"].astype(float).values[0]
625
626            # Mark fails
627            fails = qc_dataframe["Delta RT"].abs() > library_rt_shift_cutoff
628            qc_dataframe.loc[fails, "Fails"] = "RT"
629
630            # Mark warnings
631            warnings = ((library_rt_shift_cutoff / 1.5) < qc_dataframe["Delta RT"].abs()) & \
632                       ((qc_dataframe["Delta RT"].abs()) < library_rt_shift_cutoff)
633            qc_dataframe.loc[warnings, "Warnings"] = "RT"
634
635            if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2:
636                qc_result = "Fail"
637            else:
638                if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail":
639                    qc_result = "Warning"
640
641        # QC of internal standard RT's against in-run RT average
642        if qc_config["in_run_rt_enabled"].values[0] == 1 and df_run_retention_times is not None:
643
644            # Check if in-run delta RT's are outside of user-defined cutoff
645            in_run_rt_shift_cutoff = qc_config["in_run_rt_shift_cutoff"].astype(float).values[0]
646
647            # Mark fails
648            fails = qc_dataframe["In-run delta RT"].abs() > in_run_rt_shift_cutoff
649            qc_dataframe.loc[fails, "Fails"] = "In-Run RT"
650
651            # Mark warnings
652            warnings = ((in_run_rt_shift_cutoff / 1.25) < qc_dataframe["In-run delta RT"].abs()) & \
653                       (qc_dataframe["In-run delta RT"].abs() < in_run_rt_shift_cutoff)
654            qc_dataframe.loc[warnings, "Warnings"] = "In-Run RT"
655
656            if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2:
657                qc_result = "Fail"
658            else:
659                if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail":
660                    qc_result = "Warning"
661
662        # QC of internal standard precursor m/z against library m/z
663        if qc_config["library_mz_enabled"].values[0] == 1:
664
665            # Check if delta m/z's are outside of user-defined cutoff
666            library_mz_shift_cutoff = qc_config["library_mz_shift_cutoff"].astype(float).values[0]
667
668            # Mark fails
669            fails = qc_dataframe["Delta m/z"].abs() > library_mz_shift_cutoff
670            qc_dataframe.loc[fails, "Fails"] = "m/z"
671
672            # Mark warnings
673            warnings = ((library_mz_shift_cutoff / 1.25) < qc_dataframe["Delta m/z"].abs()) & \
674                       (qc_dataframe["Delta m/z"].abs() < library_mz_shift_cutoff)
675            qc_dataframe.loc[warnings, "Warnings"] = "m/z"
676
677            if len(qc_dataframe.loc[fails]) >= len(qc_dataframe) / 2:
678                qc_result = "Fail"
679            else:
680                if len(qc_dataframe.loc[warnings]) > len(qc_dataframe) / 2 and qc_result != "Fail":
681                    qc_result = "Warning"
682
683        # Mark annotations without MS2
684        qc_dataframe.loc[qc_dataframe["Name"].isin(annotations_without_ms2), "Warnings"] = "No MS2"
685
686    # Handles biological standard QC checks
687    else:
688        qc_dataframe = pd.DataFrame()
689        qc_result = "Pass"
690
691    return qc_dataframe, qc_result

Performs quality control on sample data based on user-defined criteria in Settings > QC Configurations.

The following quality control parameters are used to determine QC pass, warning, or fail: 1. Intensity dropouts cutoff: How many internal standards are missing in the sample? 2. RT shift from library value cutoff: How many retention times are shifted from the expected value for the chromatography method? 3. RT shift from in-run average cutoff: How many retention times are shifted from their average RT during the run? 4. m/z shift from library value cutoff: How many precursor masses are shifted from the expected value for the internal standard?

This function returns a DataFrame with a single record in the following format:

Sample Delta m/z Delta RT In-run delta RT Warnings Fails
SAMPLE_001 0.000001 0.05 0.00001 Delta RT None

Confusingly, a sample has an overall QC result, as well as QC warnings and fails for each internal standard. This makes it easier to determine what caused the overall QC result.

See a screenshot of the sample information card in the Overview page of the website for context.

While the thresholds for QC pass and fail are explicit, allowing the user to determine thresholds for QC warnings was deemed too cumbersome. Instead, an overall QC result of "Warning" happens if any of the following are true: 1. The number of intensity dropouts is 75% or more than the defined cutoff 2. The QC result is not a "Fail" and 50% or more internal standards have a QC Warning

For each internal standard, a note is added to the "Warning" or "Fail" column of qc_dataframe based on the user's defined criteria in Settings > QC Configurations. If the internal standard is not marked as a "Fail", then "Warnings" for individual internal standards could be marked if: 1. The delta RT is greater than 66% of the "RT shift from library value" cutoff 2. The In-run delta RT is greater than 80% of the "RT shift from in-run average value" cutoff 3. The delta m/z is greater than 80% of the "RT shift from library value" cutoff

TODO: Define and implement quality control for biological standards.

Arguments:
  • instrument_id (str): Instrument ID
  • run_id (str): Instrument run ID (Job ID)
  • polarity (str): Polarity, either "Pos or "Neg"
  • df_peak_list (DataFrame): Filtered peak table, from peak_list_to_dataframe()
  • df_features (DataFrame): An m/z - RT table derived from internal standard (or biological standard) MSP library in database
  • is_bio_standard (bool): Whether sample is a biological standard or not
Returns:

(DataFrame, str): Tuple containing QC results table and QC result (either "Pass", "Fail", or "Warning").

def convert_to_dict(sample_id, df_peak_list, qc_dataframe):
694def convert_to_dict(sample_id, df_peak_list, qc_dataframe):
695
696    """
697    Converts DataFrames to dictionary records, with features as columns and samples as rows,
698    which are then cast to string format for database storage.
699
700    See parse_internal_standard_data() in the DatabaseFunctions module for more information.
701
702    Format:
703
704    | Name       | iSTD 1 | iSTD 2 | iSTD 3 | iSTD 4 | ... |
705    | ---------- | ------ | ------ | ------ | ------ | ... |
706    | SAMPLE_001 | 1.207  | 1.934  | 3.953  | 8.132  | ... |
707
708    Args:
709        sample_id (str):
710            Sample ID
711        df_peak_list (DataFrame):
712            Filtered MS-DIAL peak table result
713        qc_dataframe (DataFrame):
714            Table of various QC results
715
716    Returns:
717        (str, str, str, str): Tuple containing dictionary records of m/z, RT, intensity, and
718        QC data, respectively, cast as strings for database storage.
719    """
720
721    # m/z, RT, intensity
722    df_mz = df_peak_list[["Name", "Precursor m/z"]]
723    df_rt = df_peak_list[["Name", "RT (min)"]]
724    df_intensity = df_peak_list[["Name", "Height"]]
725
726    df_mz = df_mz.rename(columns={"Precursor m/z": sample_id})
727    df_rt = df_rt.rename(columns={"RT (min)": sample_id})
728    df_intensity = df_intensity.rename(columns={"Height": sample_id})
729
730    df_mz = df_mz.transpose().reset_index()
731    df_rt = df_rt.transpose().reset_index()
732    df_intensity = df_intensity.transpose().reset_index()
733
734    df_mz.columns = df_mz.iloc[0].astype(str).tolist()
735    df_rt.columns = df_rt.iloc[0].astype(str).tolist()
736    df_intensity.columns = df_intensity.iloc[0].astype(str).tolist()
737
738    df_mz = df_mz.drop(df_mz.index[0])
739    df_rt = df_rt.drop(df_rt.index[0])
740    df_intensity = df_intensity.drop(df_intensity.index[0])
741
742    mz_record = df_mz.to_dict(orient="records")[0]
743    rt_record = df_rt.to_dict(orient="records")[0]
744    intensity_record = df_intensity.to_dict(orient="records")[0]
745
746    # QC results
747    if len(qc_dataframe) > 0:
748        for column in qc_dataframe.columns:
749            if column != "Name":
750                qc_dataframe.rename(columns={column: sample_id + " " + column}, inplace=True)
751        qc_dataframe = qc_dataframe.transpose().reset_index()
752        qc_dataframe.columns = qc_dataframe.iloc[0].astype(str).tolist()
753        qc_dataframe = qc_dataframe.drop(qc_dataframe.index[0])
754        qc_dataframe = qc_dataframe.fillna(" ")
755        qc_record = qc_dataframe.to_dict(orient="records")
756    else:
757        qc_record = {}
758
759    return str(mz_record), str(rt_record), str(intensity_record), str(qc_record)

Converts DataFrames to dictionary records, with features as columns and samples as rows, which are then cast to string format for database storage.

See parse_internal_standard_data() in the DatabaseFunctions module for more information.

Format:

| Name | iSTD 1 | iSTD 2 | iSTD 3 | iSTD 4 | ... | | ---------- | ------ | ------ | ------ | ------ | ... | | SAMPLE_001 | 1.207 | 1.934 | 3.953 | 8.132 | ... |

Arguments:
  • sample_id (str): Sample ID
  • df_peak_list (DataFrame): Filtered MS-DIAL peak table result
  • qc_dataframe (DataFrame): Table of various QC results
Returns:

(str, str, str, str): Tuple containing dictionary records of m/z, RT, intensity, and QC data, respectively, cast as strings for database storage.

def process_data_file(path, filename, extension, instrument_id, run_id):
762def process_data_file(path, filename, extension, instrument_id, run_id):
763
764    """
765    Processes data file upon sample acquisition completion.
766
767    For more details, please visit the Documentation page on the website.
768
769    Performs the following functions:
770        1. Convert data file to mzML format using MSConvert
771        2. Process data file using MS-DIAL and user-defined parameter configuration
772        3. Load peak table into DataFrame and filter out poor annotations
773        4. Perform quality control checks based on user-defined criteria
774        5. Notify user of QC warnings or fails via Slack or email
775        6. Write QC results to instrument database
776        7. If Google Drive sync is enabled, upload results as CSV files
777
778    Args:
779        path (str):
780            Data acquisition path
781        filename (str):
782            Name of sample data file
783        extension (str):
784            Data file extension, derived from instrument vendor
785        instrument_id (str):
786            Instrument ID
787        run_id (str):
788            Instrument run ID (job ID)
789
790    Returns:
791        None
792    """
793
794    id = instrument_id.replace(" ", "_") + "_" + run_id
795
796    # Create the necessary directories
797    data_directory = os.path.join(os.getcwd(), r"data")
798    mzml_file_directory = os.path.join(data_directory, id, "data")
799    qc_results_directory = os.path.join(data_directory, id, "results")
800
801    for directory in [data_directory, mzml_file_directory, qc_results_directory]:
802        if not os.path.exists(directory):
803            os.makedirs(directory)
804
805    mzml_file_directory = mzml_file_directory + "/"
806    qc_results_directory = qc_results_directory + "/"
807
808    # Retrieve chromatography, samples, and biological standards using run ID
809    df_run = db.get_instrument_run(instrument_id, run_id)
810    chromatography = df_run["chromatography"].astype(str).values[0]
811
812    df_samples = db.get_samples_in_run(instrument_id, run_id, sample_type="Sample")
813    df_biological_standards = db.get_samples_in_run(instrument_id, run_id, sample_type="Biological Standard")
814
815    # Retrieve MS-DIAL parameters, internal standards, and targeted features from database
816    if filename in df_biological_standards["sample_id"].astype(str).tolist():
817
818        # Get biological standard type
819        biological_standard = df_biological_standards.loc[
820            df_biological_standards["sample_id"] == filename]
821
822        # Get polarity
823        try:
824            polarity = biological_standard["polarity"].astype(str).values[0]
825        except Exception as error:
826            print("Could not read polarity from database:", error)
827            print("Using default positive mode.")
828            polarity = "Pos"
829
830        biological_standard = biological_standard["biological_standard"].astype(str).values[0]
831
832        # Get parameters and features for that biological standard type
833        msdial_parameters = db.get_parameter_file_path(chromatography, polarity, biological_standard)
834        df_features = db.get_targeted_features(biological_standard, chromatography, polarity)
835        is_bio_standard = True
836
837    elif filename in df_samples["sample_id"].astype(str).tolist():
838
839        # Get polarity
840        try:
841            polarity = df_samples.loc[df_samples["sample_id"] == filename]["polarity"].astype(str).values[0]
842        except Exception as error:
843            print("Could not read polarity from database:", error)
844            print("Using default positive mode.")
845            polarity = "Positive"
846
847        msdial_parameters = db.get_parameter_file_path(chromatography, polarity)
848        df_features = db.get_internal_standards(chromatography, polarity)
849        is_bio_standard = False
850
851    else:
852        print("Error! Could not retrieve MS-DIAL libraries and parameters.")
853        return
854
855    # Get MS-DIAL directory
856    msdial_directory = db.get_msdial_directory()
857
858    # Run MSConvert
859    try:
860        mzml_file = run_msconvert(path, filename, extension, mzml_file_directory)
861
862        # For active instrument runs, give 3 more attempts if MSConvert fails
863        if not db.is_completed_run(instrument_id, run_id):
864            for attempt in range(3):
865                if not os.path.exists(mzml_file):
866                    print("MSConvert crashed, trying again in 3 minutes...")
867                    time.sleep(180)
868                    mzml_file = run_msconvert(path, filename, extension, mzml_file_directory)
869                else:
870                    break
871    except:
872        mzml_file = None
873        print("Failed to run MSConvert.")
874        traceback.print_exc()
875
876    # Run MS-DIAL
877    if mzml_file is not None:
878        try:
879            peak_list = run_msdial_processing(filename, msdial_directory, msdial_parameters,
880                str(mzml_file_directory), str(qc_results_directory))
881        except:
882            peak_list = None
883            print("Failed to run MS-DIAL.")
884            traceback.print_exc()
885
886    # Send peak list to MS-AutoQC algorithm if valid
887    if mzml_file is not None and peak_list is not None:
888
889        # Convert peak list to DataFrame
890        try:
891            df_peak_list = peak_list_to_dataframe(peak_list, df_features)
892        except:
893            print("Failed to convert peak list to DataFrame.")
894            traceback.print_exc()
895            return
896
897        # Execute AutoQC algorithm
898        try:
899            qc_dataframe, qc_result = qc_sample(instrument_id, run_id, polarity, df_peak_list, df_features, is_bio_standard)
900        except:
901            print("Failed to execute AutoQC algorithm.")
902            traceback.print_exc()
903            return
904
905        # Convert m/z, RT, and intensity data to dictionary records in string form
906        try:
907            mz_record, rt_record, intensity_record, qc_record = convert_to_dict(filename, df_peak_list, qc_dataframe)
908        except:
909            print("Failed to convert DataFrames to dictionary record format.")
910            traceback.print_exc()
911            return
912
913        # Delete MS-DIAL result file
914        try:
915            os.remove(qc_results_directory + filename + ".msdial")
916        except Exception as error:
917            print("Failed to remove MS-DIAL result file.")
918            traceback.print_exc()
919            return
920
921    else:
922        print("Failed to process", filename)
923        mz_record = None
924        rt_record = None
925        intensity_record = None
926        qc_record = None
927        qc_result = "Fail"
928        peak_list = None
929
930    # Send email and Slack notification (if they are enabled)
931    try:
932        if qc_result != "Pass":
933            alert = "QC " + qc_result + ": " + filename
934            if peak_list is None:
935                alert = "Failed to process " + filename
936
937            # Send Slack
938            if db.slack_notifications_are_enabled():
939                slack_bot.send_message(alert)
940
941            # Send email
942            if db.email_notifications_are_enabled():
943                db.send_email(alert, "Please check on your instrument run accordingly.")
944
945    except:
946        print("Failed to send Slack notification.")
947        traceback.print_exc()
948
949    try:
950        # Write QC results to database and upload to Google Drive
951        db.write_qc_results(filename, instrument_id, run_id, mz_record, rt_record, intensity_record, qc_record, qc_result, is_bio_standard)
952
953        # Update sample counters to trigger dashboard update
954        db.update_sample_counters_for_run(instrument_id=instrument_id, run_id=run_id, latest_sample=filename)
955
956        # If sync is enabled, upload the QC results to Google Drive
957        if db.sync_is_enabled():
958            db.upload_qc_results(instrument_id, run_id)
959
960    except:
961        print("Failed to write QC results to database.")
962        traceback.print_exc()
963        return

Processes data file upon sample acquisition completion.

For more details, please visit the Documentation page on the website.

Performs the following functions:
  1. Convert data file to mzML format using MSConvert
  2. Process data file using MS-DIAL and user-defined parameter configuration
  3. Load peak table into DataFrame and filter out poor annotations
  4. Perform quality control checks based on user-defined criteria
  5. Notify user of QC warnings or fails via Slack or email
  6. Write QC results to instrument database
  7. If Google Drive sync is enabled, upload results as CSV files
Arguments:
  • path (str): Data acquisition path
  • filename (str): Name of sample data file
  • extension (str): Data file extension, derived from instrument vendor
  • instrument_id (str): Instrument ID
  • run_id (str): Instrument run ID (job ID)
Returns:

None

def subprocess_is_running(pid):
966def subprocess_is_running(pid):
967
968    """
969    Returns True if subprocess is still running, and False if not.
970
971    Args:
972        pid (int): Subprocess ID
973
974    Returns:
975        bool: True if subprocess is still running, False if not
976    """
977
978    if pid is None:
979        return False
980
981    time.sleep(1)
982
983    try:
984        if psutil.Process(pid).status() == "running":
985            return True
986        else:
987            return False
988    except Exception as error:
989        return False

Returns True if subprocess is still running, and False if not.

Arguments:
  • pid (int): Subprocess ID
Returns:

bool: True if subprocess is still running, False if not

def kill_subprocess(pid):
 992def kill_subprocess(pid):
 993
 994    """
 995    Kills subprocess.
 996
 997    Args:
 998        pid (int): Subprocess ID
 999
1000    Returns:
1001        None
1002    """
1003
1004    try:
1005        return psutil.Process(pid).kill()
1006    except Exception as error:
1007        print("Error killing acquisition listener.")
1008        traceback.print_exc()

Kills subprocess.

Arguments:
  • pid (int): Subprocess ID
Returns:

None