ms_autoqc.DatabaseFunctions
1import traceback 2import warnings 3warnings.simplefilter(action="ignore", category=FutureWarning) 4 5import os, io, shutil, time 6import hashlib, json, ast 7import pandas as pd 8import numpy as np 9import sqlalchemy as sa 10from pydrive2.auth import GoogleAuth 11from pydrive2.drive import GoogleDrive 12from sqlalchemy import INTEGER, REAL, TEXT 13import base64 14from email.message import EmailMessage 15import google.auth as google_auth 16from googleapiclient.discovery import build 17from googleapiclient.errors import HttpError 18 19# Set ms_autoqc/src as the working directory 20src_folder = os.path.dirname(os.path.realpath(__file__)) 21os.chdir(src_folder) 22 23# Initialize directories 24root_directory = os.getcwd() 25data_directory = os.path.join(root_directory, "data") 26methods_directory = os.path.join(data_directory, "methods") 27auth_directory = os.path.join(root_directory, "auth") 28 29# Location of settings SQLite database 30settings_database = "sqlite:///data/methods/Settings.db" 31settings_db_file = os.path.join(methods_directory, "Settings.db") 32 33# Google Drive authentication files 34credentials_file = os.path.join(auth_directory, "credentials.txt") 35alt_credentials = os.path.join(auth_directory, "email_credentials.txt") 36drive_settings_file = os.path.join(auth_directory, "settings.yaml") 37auth_container = [GoogleAuth(settings_file=drive_settings_file)] 38 39""" 40The functions defined below operate on two database types: 41 42- One storing instrument run metadata, sample QC results, and biological standard QC results 43- The other storing instrument metadata, workspace settings for workspace access, chromatography methods, 44biological standards, QC configurations, and MS-DIAL configurations 45 46In addition, this file also contains methods for syncing data and settings with Google Drive. 47To get an overview of all functions, please visit the documentation on https://czbiohub.github.io/MS-AutoQC. 48""" 49 50def get_database_file(instrument_id, sqlite_conn=False, zip=False): 51 52 """ 53 Returns database file for a given instrument ID. 54 55 Args: 56 instrument_id (str): 57 Instrument ID that specifies which database file to retrieve 58 sqlite_conn (bool, default False): 59 Whether to receive the path for establishing a SQLite connection 60 zip (bool, default False): 61 Whether to receive the path of the database file in the local app directory 62 63 Returns: 64 str: Path for the database file 65 """ 66 67 if zip: 68 filename = instrument_id.replace(" ", "_") + ".zip" 69 else: 70 filename = instrument_id.replace(" ", "_") + ".db" 71 72 if sqlite_conn: 73 return "sqlite:///data/" + filename 74 else: 75 return os.path.join(data_directory, filename) 76 77 78def connect_to_database(name): 79 80 """ 81 Establishes a connection to a SQLite database of choice 82 83 Args: 84 name (str): 85 Name of the database, either "Settings" or an instrument ID 86 87 Returns: 88 sqlalchemy.MetaData: 89 A container object that consists of different features of a database being described 90 sqlalchemy.Connection: 91 An object that represents a single DBAPI connection, and always emits SQL statements within 92 the context of a transaction block 93 """ 94 95 if name == "Settings": 96 database_file = settings_database 97 else: 98 database_file = get_database_file(instrument_id=name, sqlite_conn=True) 99 100 engine = sa.create_engine(database_file) 101 db_metadata = sa.MetaData(bind=engine) 102 connection = engine.connect() 103 104 return db_metadata, connection 105 106 107def create_databases(instrument_id, new_instrument=False): 108 109 """ 110 Initializes SQLite databases for 1) instrument data and 2) workspace settings. 111 112 Creates the following tables in the instrument database: "runs", "bio_qc_results", "sample_qc_results". 113 114 Creates the following tables in the settings database: "biological_standards", "chromatography_methods", 115 "email_notifications", "instruments", "gdrive_users", "internal_standards", "msdial_parameters", "qc_parameters", 116 "targeted_features", "workspace". 117 118 Args: 119 instrument_id (str): 120 Instrument ID to name the new database ("Thermo QE 1" becomes "Thermo_QE_1.db") 121 new_instrument (bool, default False): 122 Whether a new instrument database is being added to a workspace, or whether a new 123 instrument database AND settings database are being created for the first time 124 125 Returns: 126 None 127 """ 128 129 # Create tables for instrument database 130 instrument_database = get_database_file(instrument_id=instrument_id, sqlite_conn=True) 131 qc_db_engine = sa.create_engine(instrument_database) 132 qc_db_metadata = sa.MetaData() 133 134 bio_qc_results = sa.Table( 135 "bio_qc_results", qc_db_metadata, 136 sa.Column("id", INTEGER, primary_key=True), 137 sa.Column("sample_id", TEXT), 138 sa.Column("run_id", TEXT), 139 sa.Column("polarity", TEXT), 140 sa.Column("precursor_mz", TEXT), 141 sa.Column("retention_time", TEXT), 142 sa.Column("intensity", TEXT), 143 sa.Column("md5", TEXT), 144 sa.Column("qc_dataframe", TEXT), 145 sa.Column("qc_result", TEXT), 146 sa.Column("biological_standard", TEXT), 147 sa.Column("position", TEXT) 148 ) 149 150 runs = sa.Table( 151 "runs", qc_db_metadata, 152 sa.Column("id", INTEGER, primary_key=True), 153 sa.Column("run_id", TEXT), 154 sa.Column("chromatography", TEXT), 155 sa.Column("acquisition_path", TEXT), 156 sa.Column("sequence", TEXT), 157 sa.Column("metadata", TEXT), 158 sa.Column("status", TEXT), 159 sa.Column("samples", INTEGER), 160 sa.Column("completed", INTEGER), 161 sa.Column("passes", INTEGER), 162 sa.Column("fails", INTEGER), 163 sa.Column("latest_sample", TEXT), 164 sa.Column("qc_config_id", TEXT), 165 sa.Column("biological_standards", TEXT), 166 sa.Column("pid", INTEGER), 167 sa.Column("drive_id", TEXT), 168 sa.Column("sample_status", TEXT), 169 sa.Column("job_type", TEXT) 170 ) 171 172 sample_qc_results = sa.Table( 173 "sample_qc_results", qc_db_metadata, 174 sa.Column("id", INTEGER, primary_key=True), 175 sa.Column("sample_id", TEXT), 176 sa.Column("run_id", TEXT), 177 sa.Column("polarity", TEXT), 178 sa.Column("position", TEXT), 179 sa.Column("md5", TEXT), 180 sa.Column("precursor_mz", TEXT), 181 sa.Column("retention_time", TEXT), 182 sa.Column("intensity", TEXT), 183 sa.Column("qc_dataframe", TEXT), 184 sa.Column("qc_result", TEXT) 185 ) 186 187 qc_db_metadata.create_all(qc_db_engine) 188 189 # If only creating instrument database, save and return here 190 if new_instrument: 191 set_device_identity(is_instrument_computer=True, instrument_id=instrument_id) 192 return None 193 194 # Create tables for Settings.db 195 settings_db_engine = sa.create_engine(settings_database) 196 settings_db_metadata = sa.MetaData() 197 198 instruments = sa.Table( 199 "instruments", settings_db_metadata, 200 sa.Column("id", INTEGER, primary_key=True), 201 sa.Column("name", TEXT), 202 sa.Column("vendor", TEXT), 203 sa.Column("drive_id", TEXT), 204 sa.Column("last_modified", TEXT) 205 ) 206 207 biological_standards = sa.Table( 208 "biological_standards", settings_db_metadata, 209 sa.Column("id", INTEGER, primary_key=True), 210 sa.Column("name", TEXT), 211 sa.Column("identifier", TEXT), 212 sa.Column("chromatography", TEXT), 213 sa.Column("num_pos_features", INTEGER), 214 sa.Column("num_neg_features", INTEGER), 215 sa.Column("pos_bio_msp_file", TEXT), 216 sa.Column("neg_bio_msp_file", TEXT), 217 sa.Column("pos_parameter_file", TEXT), 218 sa.Column("neg_parameter_file", TEXT), 219 sa.Column("msdial_config_id", TEXT) 220 ) 221 222 chromatography_methods = sa.Table( 223 "chromatography_methods", settings_db_metadata, 224 sa.Column("id", INTEGER, primary_key=True), 225 sa.Column("method_id", TEXT), 226 sa.Column("num_pos_standards", INTEGER), 227 sa.Column("num_neg_standards", INTEGER), 228 sa.Column("pos_istd_msp_file", TEXT), 229 sa.Column("neg_istd_msp_file", TEXT), 230 sa.Column("pos_parameter_file", TEXT), 231 sa.Column("neg_parameter_file", TEXT), 232 sa.Column("msdial_config_id", TEXT) 233 ) 234 235 gdrive_users = sa.Table( 236 "gdrive_users", settings_db_metadata, 237 sa.Column("id", INTEGER, primary_key=True), 238 sa.Column("name", TEXT), 239 sa.Column("email_address", TEXT), 240 sa.Column("permission_id", TEXT), 241 ) 242 243 internal_standards = sa.Table( 244 "internal_standards", settings_db_metadata, 245 sa.Column("id", INTEGER, primary_key=True), 246 sa.Column("name", TEXT), 247 sa.Column("chromatography", TEXT), 248 sa.Column("polarity", TEXT), 249 sa.Column("precursor_mz", REAL), 250 sa.Column("retention_time", REAL), 251 sa.Column("ms2_spectrum", TEXT), 252 sa.Column("inchikey", TEXT) 253 ) 254 255 msdial_parameters = sa.Table( 256 "msdial_parameters", settings_db_metadata, 257 sa.Column("id", INTEGER, primary_key=True), 258 sa.Column("config_name", TEXT), 259 sa.Column("rt_begin", INTEGER), 260 sa.Column("rt_end", INTEGER), 261 sa.Column("mz_begin", INTEGER), 262 sa.Column("mz_end", INTEGER), 263 sa.Column("ms1_centroid_tolerance", REAL), 264 sa.Column("ms2_centroid_tolerance", REAL), 265 sa.Column("smoothing_method", TEXT), 266 sa.Column("smoothing_level", INTEGER), 267 sa.Column("min_peak_width", INTEGER), 268 sa.Column("min_peak_height", INTEGER), 269 sa.Column("mass_slice_width", REAL), 270 sa.Column("post_id_rt_tolerance", REAL), 271 sa.Column("post_id_mz_tolerance", REAL), 272 sa.Column("post_id_score_cutoff", REAL), 273 sa.Column("alignment_rt_tolerance", REAL), 274 sa.Column("alignment_mz_tolerance", REAL), 275 sa.Column("alignment_rt_factor", REAL), 276 sa.Column("alignment_mz_factor", REAL), 277 sa.Column("peak_count_filter", INTEGER), 278 sa.Column("qc_at_least_filter", TEXT) 279 ) 280 281 email_notifications = sa.Table( 282 "email_notifications", settings_db_metadata, 283 sa.Column("id", INTEGER, primary_key=True), 284 sa.Column("email_address", TEXT), 285 ) 286 287 qc_parameters = sa.Table( 288 "qc_parameters", settings_db_metadata, 289 sa.Column("id", INTEGER, primary_key=True), 290 sa.Column("config_name", TEXT), 291 sa.Column("intensity_dropouts_cutoff", INTEGER), 292 sa.Column("library_rt_shift_cutoff", REAL), 293 sa.Column("in_run_rt_shift_cutoff", REAL), 294 sa.Column("library_mz_shift_cutoff", REAL), 295 sa.Column("intensity_enabled", INTEGER), 296 sa.Column("library_rt_enabled", INTEGER), 297 sa.Column("in_run_rt_enabled", INTEGER), 298 sa.Column("library_mz_enabled", INTEGER) 299 ) 300 301 targeted_features = sa.Table( 302 "targeted_features", settings_db_metadata, 303 sa.Column("id", INTEGER, primary_key=True), 304 sa.Column("name", TEXT), 305 sa.Column("chromatography", TEXT), 306 sa.Column("polarity", TEXT), 307 sa.Column("biological_standard", TEXT), 308 sa.Column("precursor_mz", REAL), 309 sa.Column("retention_time", REAL), 310 sa.Column("ms2_spectrum", TEXT), 311 sa.Column("inchikey", TEXT) 312 ) 313 314 workspace = sa.Table( 315 "workspace", settings_db_metadata, 316 sa.Column("id", INTEGER, primary_key=True), 317 sa.Column("slack_bot_token", TEXT), 318 sa.Column("slack_channel", TEXT), 319 sa.Column("slack_enabled", INTEGER), 320 sa.Column("gdrive_folder_id", TEXT), 321 sa.Column("methods_zip_file_id", TEXT), 322 sa.Column("methods_last_modified", TEXT), 323 sa.Column("msdial_directory", TEXT), 324 sa.Column("is_instrument_computer", INTEGER), 325 sa.Column("instrument_identity", TEXT) 326 ) 327 328 # Insert tables into database 329 settings_db_metadata.create_all(settings_db_engine) 330 331 # Insert default configurations for MS-DIAL and MS-AutoQC 332 add_msdial_configuration("Default") 333 add_qc_configuration("Default") 334 335 # Initialize workspace metadata 336 create_workspace_metadata() 337 338 # Save device identity based on setup values 339 set_device_identity(is_instrument_computer=True, instrument_id=instrument_id) 340 return None 341 342 343def execute_vacuum(database): 344 345 """ 346 Executes VACUUM command on the database of choice. 347 348 Args: 349 database (str): name of the database, either "Settings" or Instrument ID 350 351 Returns: 352 None 353 """ 354 355 db_metadata, connection = connect_to_database(database) 356 connection.execute("VACUUM") 357 connection.close() 358 359 360def get_drive_instance(): 361 362 """ 363 Returns user-authenticated Google Drive instance. 364 """ 365 366 return GoogleDrive(auth_container[0]) 367 368 369def launch_google_drive_authentication(): 370 371 """ 372 Launches Google Drive authentication flow and sets authentication instance. 373 """ 374 375 auth_container[0] = GoogleAuth(settings_file=drive_settings_file) 376 auth_container[0].LocalWebserverAuth() 377 378 379def save_google_drive_credentials(): 380 381 """ 382 Saves Google credentials to a credentials.txt file. 383 """ 384 385 auth_container[0].SaveCredentialsFile(credentials_file) 386 387 388def initialize_google_drive(): 389 390 """ 391 Initializes instance of Google Drive using credentials.txt and settings.yaml in /auth directory 392 393 Args: 394 None 395 396 Returns: 397 bool: Whether the Google client credentials file (in the "auth" directory) exists. 398 """ 399 400 # Create Google Drive instance 401 auth_container[0] = GoogleAuth(settings_file=drive_settings_file) 402 gauth = auth_container[0] 403 404 # If no credentials file, make user authenticate 405 if not os.path.exists(credentials_file) and is_valid(): 406 gauth.LocalWebserverAuth() 407 408 # Try to load saved client credentials 409 gauth.LoadCredentialsFile(credentials_file) 410 411 # Initialize saved credentials 412 if gauth.credentials is not None: 413 414 # Refresh credentials if expired 415 if gauth.access_token_expired: 416 gauth.Refresh() 417 418 # Otherwise, authorize saved credentials 419 else: 420 gauth.Authorize() 421 422 # If no saved credentials, make user authenticate again 423 elif gauth.credentials is None: 424 gauth.LocalWebserverAuth() 425 426 if not os.path.exists(credentials_file) and is_valid(): 427 save_google_drive_credentials() 428 429 # Makes small modification for emails (for usage with Google's google.auth) 430 if not os.path.exists(alt_credentials): 431 data = None 432 with open(credentials_file, "r") as file: 433 data = json.load(file) 434 data["type"] = "authorized_user" 435 with open(alt_credentials, "w") as file: 436 json.dump(data, file) 437 438 return os.path.exists(credentials_file) 439 440 441def is_valid(instrument_id=None): 442 443 """ 444 Checks that all required tables in all databases (or a single database of choice) are present. 445 446 Args: 447 instrument_id (str, default None): 448 Specified if validating a specific database 449 450 Returns: 451 None 452 """ 453 454 # Validate settings database 455 settings_db_required_tables = ["biological_standards", "chromatography_methods", "email_notifications", "instruments", 456 "gdrive_users", "internal_standards", "msdial_parameters", "qc_parameters", "targeted_features", "workspace"] 457 458 try: 459 settings_db_tables = sa.create_engine(settings_database).table_names() 460 if len(settings_db_tables) < len(settings_db_required_tables): 461 return False 462 except: 463 return False 464 465 # Validate instrument databases 466 instrument_db_required_tables = ["bio_qc_results", "runs", "sample_qc_results"] 467 468 # If given an instrument ID, only validate that instrument's database 469 try: 470 if instrument_id is not None: 471 database = get_database_file(instrument_id, sqlite_conn=True) 472 instrument_db_tables = sa.create_engine(database).table_names() 473 if len(instrument_db_tables) < len(instrument_db_required_tables): 474 return False 475 476 # Otherwise, validate all instrument databases 477 else: 478 database_files = [file.replace(".db", "") for file in os.listdir(data_directory) if ".db" in file and "journal.db" not in file] 479 databases = [get_database_file(f, sqlite_conn=True) for f in database_files] 480 481 for database in databases: 482 instrument_db_tables = sa.create_engine(database).table_names() 483 if len(instrument_db_tables) < len(instrument_db_required_tables): 484 return False 485 except: 486 return False 487 488 return True 489 490 491def sync_is_enabled(): 492 493 """ 494 Checks whether Google Drive sync is enabled simply by querying whether Google Drive ID's exist in the database. 495 496 Typically used for separating sync-specific functionality. 497 498 Returns: 499 bool: Whether Google Drive sync is enabled or not 500 """ 501 502 if not is_valid(): 503 return False 504 505 df_workspace = get_table("Settings", "workspace") 506 gdrive_folder_id = df_workspace["gdrive_folder_id"].values[0] 507 methods_zip_file_id = df_workspace["methods_zip_file_id"].values[0] 508 509 if gdrive_folder_id is not None and methods_zip_file_id is not None: 510 if gdrive_folder_id != "None" and methods_zip_file_id != "None": 511 if gdrive_folder_id != "" and methods_zip_file_id != "": 512 return True 513 514 return False 515 516 517def email_notifications_are_enabled(): 518 519 """ 520 Checks whether email notifications are enabled. 521 522 Returns True if databases are valid, Google Drive sync is enabled, and if email addresses were 523 registered by user in Settings > General. Returns False if any condition is not met. 524 525 Returns: 526 bool: True if email notifications are enabled, False if not 527 """ 528 529 if not is_valid(): 530 return False 531 532 if not sync_is_enabled(): 533 return False 534 535 if len(get_table("Settings", "email_notifications")) > 0: 536 return True 537 538 return False 539 540 541def slack_notifications_are_enabled(): 542 543 """ 544 Checks whether Slack notifications are enabled. 545 546 Returns True if user enabled Slack notifications in Settings > General, and False if not. 547 548 Returns: 549 bool: True if Slack notifications are enabled, False if not 550 """ 551 552 if not is_valid(): 553 return False 554 555 try: 556 return bool(get_table("Settings", "workspace")["slack_enabled"].astype(int).tolist()[0]) 557 except: 558 return False 559 560 561def is_instrument_computer(): 562 563 """ 564 Checks whether user's device is the instrument computer. 565 566 This is specified during setup. If the user created a new instrument, or signed in as an instrument device, then 567 this will return True. If the user signed in to their workspace from a non-instrument device, this will return False. 568 569 Typically used to organize / hide UI functions for instrument and non-instrument devices 570 that MS-AutoQC is installed on. 571 572 Returns: 573 True if device is instrument computer, False if not 574 """ 575 576 return bool(get_table("Settings", "workspace")["is_instrument_computer"].astype(int).tolist()[0]) 577 578 579def get_md5_for_settings_db(): 580 581 """ 582 Calculates and returns MD5 checksum for the settings database file. 583 584 Typically used for checking whether the user changed settings and prompting a Google Drive sync (if sync is enabled). 585 586 Returns: 587 An MD5 checksum of /data/methods/Settings.db 588 """ 589 590 hash_md5 = hashlib.md5() 591 592 with open(settings_db_file, "rb") as f: 593 for chunk in iter(lambda: f.read(4096), b""): 594 hash_md5.update(chunk) 595 596 return hash_md5.hexdigest() 597 598 599def settings_were_modified(md5_checksum): 600 601 """ 602 Checks whether settings database file has been modified. 603 604 This is done by comparing the checksum computed when Settings were opened (given as a parameter) 605 with the checksum computed when Settings were closed (in this function call). 606 607 Args: 608 md5_checksum (str): 609 An MD5 checksum of /data/methods/Settings.db that was computed when the user opened Settings in the app 610 611 Returns: 612 bool: True if checksums don't match, False if checksums match. 613 """ 614 615 if md5_checksum != get_md5_for_settings_db(): 616 return True 617 else: 618 return False 619 620 621def zip_database(instrument_id=None, filename=None): 622 623 """ 624 Compresses instrument database file into a ZIP archive in /data directory. 625 626 Used for fast downloads / uploads over network connections to Google Drive (if Google Drive sync is enabled). 627 628 The zip archive is accessible by filename and path in the /data directory. For example, zipping 629 the database for "Thermo QE 1" will generate a zip file with path "../data/Thermo_QE_1.zip". 630 631 Args: 632 instrument_id (str, default None): 633 If specified, selects a database to zip by instrument ID (ex: "Thermo QE 1") 634 filename (str, default None): 635 If specified, selects a database to zip by filename (ex: "Thermo_QE_1.zip") 636 637 Returns: 638 None 639 """ 640 641 if instrument_id is None and filename is None: 642 return None 643 644 if filename is not None: 645 db_zip_file = os.path.join(data_directory, filename) 646 filename = filename.replace(".zip", ".db") 647 648 elif instrument_id is not None: 649 db_zip_file = get_database_file(instrument_id, zip=True) 650 filename = instrument_id.replace(" ", "_") + ".db" 651 652 file_without_extension = db_zip_file.replace(".zip", "") 653 shutil.make_archive(file_without_extension, "zip", data_directory, filename) 654 655 656def unzip_database(instrument_id=None, filename=None): 657 658 """ 659 Unzips ZIP archive containing instrument database file and deletes the archive when complete. 660 661 Args: 662 instrument_id (str, default None): 663 If specified, selects a database to zip by instrument ID (ex: "Thermo QE 1") 664 filename (str, default None): 665 If specified, selects a database to zip by filename (ex: "Thermo_QE_1.zip") 666 667 Returns: 668 None 669 """ 670 671 if instrument_id is None and filename is None: 672 return None 673 674 if instrument_id is not None: 675 db_zip_file = get_database_file(instrument_id, zip=True) 676 elif filename is not None: 677 db_zip_file = os.path.join(data_directory, filename) 678 679 shutil.unpack_archive(db_zip_file, data_directory, "zip") 680 os.remove(db_zip_file) 681 682 683def zip_methods(): 684 685 """ 686 Compresses methods directory into a ZIP archive in /data directory. 687 688 Returns: 689 Path for zip archive of methods directory (ex: "../data/methods.zip") 690 """ 691 692 output_directory_and_name = os.path.join(data_directory, "methods.zip").replace(".zip", "") 693 shutil.make_archive(output_directory_and_name, "zip", methods_directory) 694 return output_directory_and_name + ".zip" 695 696 697def unzip_methods(): 698 699 """ 700 Unzips ZIP archive containing methods directory and deletes the archive when complete. 701 """ 702 703 input_zip = os.path.join(data_directory, "methods.zip") 704 shutil.unpack_archive(input_zip, methods_directory, "zip") 705 os.remove(input_zip) 706 707 708def zip_csv_files(input_directory, output_directory_and_name): 709 710 """ 711 Compresses CSV files into a ZIP archive in /data directory. 712 713 Used for fast upload of instrument run data to Google Drive during an active instrument run (if Google Drive sync is enabled). 714 715 Args: 716 input_directory (str): 717 The temporary directory for files pertaining to an instrument run, denoted as "Instrument_ID_Run_ID". 718 For example, a job with ID "BRDE001" created under instrument with ID "Thermo QE 1" would have its files 719 stored in "/data/Thermo_QE_1_BRDE001". 720 output_directory_and_name (str): 721 Essentially, the file path for the ZIP archive (ex: "/data/Instrument_ID_Run_ID"). 722 723 Returns: 724 Path for zip archive of CSV files with instrument run data (ex: "../data/Instrument_ID_Run_ID.zip") 725 """ 726 727 shutil.make_archive(output_directory_and_name, "zip", input_directory) 728 return output_directory_and_name + ".zip" 729 730 731def unzip_csv_files(input_zip, output_directory): 732 733 """ 734 Unzips ZIP archive of CSV files and deletes the archive upon completion. 735 """ 736 737 shutil.unpack_archive(input_zip, output_directory, "zip") 738 os.remove(input_zip) 739 740 741def get_table(database_name, table_name): 742 743 """ 744 Retrieves table from database as a pandas DataFrame object. 745 746 TODO: Improve this function to accept column and record queries 747 748 Args: 749 database_name (str): 750 The database to query, using instrument ID or "Settings" 751 table_name (str): 752 The table to retrieve 753 754 Returns: 755 DataFrame of table. 756 """ 757 758 if database_name == "Settings": 759 database = settings_database 760 else: 761 database = get_database_file(database_name, sqlite_conn=True) 762 763 engine = sa.create_engine(database) 764 return pd.read_sql("SELECT * FROM " + table_name, engine) 765 766 767def generate_client_settings_yaml(client_id, client_secret): 768 769 """ 770 Generates a settings.yaml file for Google authentication in the /auth directory. 771 772 Client ID and client secret are generated and provided by the user in the Google Cloud Console. 773 774 See: https://docs.iterative.ai/PyDrive2/oauth/#automatic-and-custom-authentication-with-settings-yaml 775 776 Args: 777 client_id (str): 778 The Client ID of the MS-AutoQC application, generated and provided by the user 779 client_secret (str): 780 The Client Secret of the MS-AutoQC application, generated and provided by the user 781 Returns: 782 None 783 """ 784 785 auth_directory = os.path.join(os.getcwd(), "auth") 786 if not os.path.exists(auth_directory): 787 os.makedirs(auth_directory) 788 789 settings_yaml_file = os.path.join(auth_directory, "settings.yaml") 790 791 lines = [ 792 "client_config_backend: settings", 793 "client_config:", 794 " client_id: " + client_id, 795 " client_secret: " + client_secret, 796 "\n", 797 "save_credentials: True", 798 "save_credentials_backend: file", 799 "save_credentials_file: auth/credentials.txt", 800 "\n", 801 "get_refresh_token: True", 802 "\n", 803 "oauth_scope:", 804 " - https://www.googleapis.com/auth/drive", 805 " - https://www.googleapis.com/auth/gmail.send", 806 " - https://www.googleapis.com/auth/userinfo.email" 807 ] 808 809 with open(settings_yaml_file, "w") as file: 810 for line in lines: 811 file.write(line) 812 if line != "\n" and line != lines[-1]: 813 file.write("\n") 814 815 816def insert_google_drive_ids(instrument_id, gdrive_folder_id, instrument_db_file_id, methods_zip_file_id): 817 818 """ 819 Inserts Google Drive ID's into corresponding tables to enable Google Drive sync. 820 821 This function is called when a user creates a new instrument in their workspace. 822 823 The ID's for the following files / folders in Google Drive are stored in the database: 824 1. MS-AutoQC folder 825 2. Instrument database zip file 826 3. Methods directory zip file 827 828 Args: 829 instrument_id (str): 830 Instrument ID 831 gdrive_folder_id (str): 832 Google Drive ID for the MS-AutoQC folder (found in the user's root directory in Drive) 833 instrument_db_file_id (str): 834 Google Drive ID for the instrument database ZIP file 835 methods_zip_file_id (str): 836 Google Drive ID for the methods directory ZIP file 837 838 Returns: 839 None 840 """ 841 842 db_metadata, connection = connect_to_database("Settings") 843 instruments_table = sa.Table("instruments", db_metadata, autoload=True) 844 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 845 846 # Instruments database 847 connection.execute(( 848 sa.update(instruments_table) 849 .where((instruments_table.c.name == instrument_id)) 850 .values(drive_id=instrument_db_file_id) 851 )) 852 853 # MS-AutoQC folder and Methods folder 854 connection.execute(( 855 sa.update(workspace_table) 856 .where((workspace_table.c.id == 1)) 857 .values(gdrive_folder_id=gdrive_folder_id, 858 methods_zip_file_id=methods_zip_file_id) 859 )) 860 861 connection.close() 862 863 864def insert_new_instrument(name, vendor): 865 866 """ 867 Inserts a new instrument into the "instruments" table in the Settings database. 868 869 The name is the instrument ID, and the vendor is one of 5 options: Thermo Fisher, Agilent, Bruker, Sciex, and Waters. 870 871 Args: 872 name (str): 873 Instrument ID 874 vendor (str): 875 Instrument vendor 876 877 Returns: 878 None 879 """ 880 881 # Connect to database 882 db_metadata, connection = connect_to_database("Settings") 883 884 # Get "instruments" table 885 instruments_table = sa.Table("instruments", db_metadata, autoload=True) 886 887 # Prepare insert of new instrument 888 insert_instrument = instruments_table.insert().values( 889 {"name": name, 890 "vendor": vendor} 891 ) 892 893 # Execute the insert, then close the connection 894 connection.execute(insert_instrument) 895 connection.close() 896 897 898def get_instruments_list(): 899 900 """ 901 Returns list of instruments in database. 902 """ 903 904 # Connect to SQLite database 905 engine = sa.create_engine(settings_database) 906 907 # Get instruments table as DataFrame 908 df_instruments = pd.read_sql("SELECT * FROM instruments", engine) 909 910 # Return list of instruments 911 return df_instruments["name"].astype(str).tolist() 912 913 914def get_instrument(instrument_id): 915 916 """ 917 Returns record from "instruments" table as a DataFrame for a given instrument 918 919 Args: 920 instrument_id (str): Instrument ID 921 922 Returns: 923 DataFrame containing the name, vendor, and drive_id for the given instrument 924 """ 925 926 engine = sa.create_engine(settings_database) 927 return pd.read_sql("SELECT * FROM instruments WHERE name = '" + instrument_id + "'", engine) 928 929 930def get_filenames_from_sequence(sequence, vendor="Thermo Fisher"): 931 932 """ 933 Filters preblanks, washes, and shutdown injections from sequence file, and simultaneously assigns 934 polariy to each sample based on presence of "Pos" or "Neg" in Instrument Method column. 935 936 This function is called upon starting a new QC job. 937 938 TODO: Adapt this function for other instrument vendors. 939 TODO: Check the method filename, not entire file path, for "Pos" and "Neg". 940 A folder containing "Pos" or "Neg" will give incorrect polarity assignments. 941 942 Args: 943 sequence (str): 944 The acquisition sequence file, encoded as a JSON string in "split" format 945 vendor (str): 946 The instrument vendor (see to-do statements) 947 948 Returns: 949 DataFrame of acquisition sequence, with preblanks / washes / shutdowns filtered out and polarities assigned 950 """ 951 952 df_sequence = pd.read_json(sequence, orient="split") 953 954 # Filter out preblanks 955 df_sequence = df_sequence.loc[ 956 ~((df_sequence["File Name"].str.contains(r"_BK_", na=False)) & 957 (df_sequence["File Name"].str.contains(r"_pre_", na=False)))] 958 959 # Filter out wash and shutdown 960 df_sequence = df_sequence.loc[ 961 ~(df_sequence["File Name"].str.contains(r"_wash_", na=False)) & 962 ~(df_sequence["File Name"].str.contains(r"shutdown", na=False))] 963 964 # Derive polarity from instrument method filename 965 df_sequence.loc[df_sequence["Instrument Method"].str.contains(r"Pos", na=False), "Polarity"] = "Pos" 966 df_sequence.loc[df_sequence["Instrument Method"].str.contains(r"Neg", na=False), "Polarity"] = "Neg" 967 968 return df_sequence 969 970 971def get_polarity_for_sample(instrument_id, run_id, sample_id, status): 972 973 """ 974 Returns polarity for a given sample. 975 976 TODO: Loading hundreds of rows of data before querying for one sample is massively inefficient. 977 This function was written in haste and can be easily implemented in a much better way. 978 979 Args: 980 instrument_id (str): Instrument ID 981 run_id (str): Instrument run ID (job ID) 982 sample_id (str): Sample ID 983 status (str): Job status 984 985 Returns: 986 Polarity for the given sample, as either "Pos" or "Neg". 987 """ 988 989 if get_device_identity() != instrument_id and sync_is_enabled(): 990 if status == "Complete": 991 df = get_samples_in_run(instrument_id, run_id, "Both") 992 elif status == "Active": 993 df = get_samples_from_csv(instrument_id, run_id, "Both") 994 else: 995 df = get_samples_in_run(instrument_id, run_id, "Both") 996 997 try: 998 polarity = df.loc[df["sample_id"] == sample_id]["polarity"].astype(str).values[0] 999 except: 1000 print("Could not find polarity for sample in database.") 1001 polarity = "Neg" if "Neg" in sample_id else "Pos" 1002 1003 return polarity 1004 1005 1006def insert_new_run(run_id, instrument_id, chromatography, bio_standards, path, sequence, metadata, qc_config_id, job_type): 1007 1008 """ 1009 Initializes sample records in database for a new QC job. 1010 1011 Performs the following functions: 1012 1. Inserts a record for the new instrument run into the "runs" table 1013 2. Inserts sample rows into the "sample_qc_results" table 1014 3. Inserts biological standard sample rows into the "bio_qc_results" table 1015 1016 Args: 1017 run_id (str): 1018 Instrument run ID (job ID) 1019 instrument_id (str): 1020 Instrument ID 1021 chromatography (str): 1022 Chromatography method 1023 bio_standards (str): 1024 Biological standards 1025 path (str): 1026 Data acquisition path 1027 sequence (str): 1028 Acquisition sequence table, as JSON string in "records" format 1029 metadata (str): 1030 Sample metadata table, as JSON string in "records" format 1031 qc_config_id (str): 1032 Name of QC configuration 1033 job_type (str): 1034 Either "completed" or "active" 1035 1036 Returns: 1037 None 1038 """ 1039 1040 # Get list of samples from sequence 1041 df_sequence = get_filenames_from_sequence(sequence) 1042 1043 samples = df_sequence["File Name"].astype(str).tolist() 1044 polarities = df_sequence["Polarity"].astype(str).tolist() 1045 positions = df_sequence["Position"].astype(str).tolist() 1046 1047 num_samples = len(samples) 1048 1049 # Connect to database 1050 db_metadata, connection = connect_to_database(instrument_id) 1051 1052 # Get relevant tables 1053 runs_table = sa.Table("runs", db_metadata, autoload=True) 1054 sample_qc_results_table = sa.Table("sample_qc_results", db_metadata, autoload=True) 1055 bio_qc_results_table = sa.Table("bio_qc_results", db_metadata, autoload=True) 1056 1057 # Get identifiers for biological standard (if any) 1058 identifiers = get_biological_standard_identifiers(bio_standards) 1059 1060 # Prepare insert of user-inputted run data 1061 insert_run = runs_table.insert().values( 1062 {"run_id": run_id, 1063 "chromatography": chromatography, 1064 "acquisition_path": path, 1065 "sequence": sequence, 1066 "metadata": metadata, 1067 "status": "Active", 1068 "samples": num_samples, 1069 "completed": 0, 1070 "passes": 0, 1071 "fails": 0, 1072 "qc_config_id": qc_config_id, 1073 "biological_standards": str(bio_standards), 1074 "job_type": job_type}) 1075 1076 insert_samples = [] 1077 1078 for index, sample in enumerate(samples): 1079 # Check if the biological standard identifier is in the sample name 1080 is_bio_standard = False 1081 1082 for identifier in identifiers.keys(): 1083 if identifier in sample: 1084 is_bio_standard = True 1085 break 1086 1087 # Prepare insert of the sample row into the "sample_qc_results" table 1088 if not is_bio_standard: 1089 insert_sample = sample_qc_results_table.insert().values( 1090 {"sample_id": sample, 1091 "run_id": run_id, 1092 "polarity": polarities[index], 1093 "position": positions[index]}) 1094 1095 # Prepare insert of the sample row into the "bio_qc_results" table 1096 else: 1097 insert_sample = bio_qc_results_table.insert().values( 1098 {"sample_id": sample, 1099 "run_id": run_id, 1100 "polarity": polarities[index], 1101 "biological_standard": identifiers[identifier], 1102 "position": positions[index]}) 1103 1104 # Add this INSERT query into the list of insert queries 1105 insert_samples.append(insert_sample) 1106 1107 # Execute INSERT to database 1108 connection.execute(insert_run) 1109 1110 for insert_sample in insert_samples: 1111 connection.execute(insert_sample) 1112 1113 # Close the connection 1114 connection.close() 1115 1116 1117def get_instrument_run(instrument_id, run_id): 1118 1119 """ 1120 Returns DataFrame of given instrument run from "runs" table. 1121 1122 Args: 1123 instrument_id (str): Instrument ID 1124 run_id (str): Run ID 1125 1126 Returns: 1127 DataFrame containing record for instrument run 1128 """ 1129 1130 database = get_database_file(instrument_id=instrument_id, sqlite_conn=True) 1131 engine = sa.create_engine(database) 1132 query = "SELECT * FROM runs WHERE run_id = '" + run_id + "'" 1133 df_instrument_run = pd.read_sql(query, engine) 1134 return df_instrument_run 1135 1136 1137def get_instrument_run_from_csv(instrument_id, run_id): 1138 1139 """ 1140 Returns DataFrame of selected instrument run from CSV files during active instrument runs. 1141 1142 This function is called when a user views an active instrument run from an external device 1143 (to prevent downloading / uploading the database file with each sample acquisition). 1144 1145 Args: 1146 instrument_id (str): Instrument ID 1147 run_id (str): Run ID 1148 1149 Returns: 1150 DataFrame containing record for instrument run 1151 """ 1152 1153 id = instrument_id.replace(" ", "_") + "_" + run_id 1154 run_csv_file = os.path.join(data_directory, id, "csv", "run.csv") 1155 return pd.read_csv(run_csv_file, index_col=False) 1156 1157 1158def get_instrument_runs(instrument_id, as_list=False): 1159 1160 """ 1161 Returns DataFrame of all runs on a given instrument from "runs" table 1162 1163 Args: 1164 instrument_id (str): 1165 Instrument ID 1166 as_list (str, default False): 1167 If True, returns only a list of names of instrument runs (jobs) 1168 1169 Returns: 1170 DataFrame containing records for instrument runs (QC jobs) for the given instrument 1171 """ 1172 1173 database = get_database_file(instrument_id, sqlite_conn=True) 1174 engine = sa.create_engine(database) 1175 df = pd.read_sql("SELECT * FROM runs", engine) 1176 1177 if as_list: 1178 return df["run_id"].astype(str).tolist() 1179 else: 1180 return df 1181 1182 1183def delete_instrument_run(instrument_id, run_id): 1184 1185 """ 1186 Deletes all records for an instrument run (QC job) from the database. 1187 1188 Args: 1189 instrument_id (str): Instrument ID 1190 run_id (str): Run ID 1191 1192 Returns: 1193 None 1194 """ 1195 1196 # Connect to database 1197 db_metadata, connection = connect_to_database(instrument_id) 1198 1199 # Get relevant tables 1200 runs_table = sa.Table("runs", db_metadata, autoload=True) 1201 sample_qc_results_table = sa.Table("sample_qc_results", db_metadata, autoload=True) 1202 bio_qc_results_table = sa.Table("bio_qc_results", db_metadata, autoload=True) 1203 1204 # Delete from each table 1205 for table in [runs_table, sample_qc_results_table, bio_qc_results_table]: 1206 connection.execute(( 1207 sa.delete(table).where(table.c.run_id == run_id) 1208 )) 1209 1210 # Close the connection 1211 connection.close() 1212 1213 1214def get_acquisition_path(instrument_id, run_id): 1215 1216 """ 1217 Retrieves acquisition path for a given instrument run. 1218 1219 Args: 1220 instrument_id (str): Instrument ID 1221 run_id (str): Run ID 1222 1223 Returns: 1224 Acquisition path for the given instrument run 1225 """ 1226 1227 return get_instrument_run(instrument_id, run_id)["acquisition_path"].astype(str).tolist()[0] 1228 1229 1230def get_md5(instrument_id, sample_id): 1231 1232 """ 1233 Returns MD5 checksum for a data file in "sample_qc_results" table. 1234 1235 Used for comparing MD5 checksums during active instrument runs. 1236 1237 TODO: This function will return incorrect results if two different instrument runs 1238 have samples with the same sample ID. It needs to include "run_id" in the SQL query. 1239 1240 Args: 1241 instrument_id (str): Instrument ID 1242 sample_id (str): Sample ID 1243 1244 Returns: 1245 MD5 checksum stored for the data file. 1246 """ 1247 1248 # Connect to database 1249 database = get_database_file(instrument_id, sqlite_conn=True) 1250 engine = sa.create_engine(database) 1251 1252 # Check if sample is a biological standard 1253 table = "sample_qc_results" 1254 1255 for identifier in get_biological_standard_identifiers().keys(): 1256 if identifier in sample_id: 1257 table = "bio_qc_results" 1258 break 1259 1260 # Get sample from correct table 1261 df_sample_qc_results = pd.read_sql( 1262 "SELECT * FROM " + table + " WHERE sample_id = '" + sample_id + "'", engine) 1263 1264 return df_sample_qc_results["md5"].astype(str).values[0] 1265 1266 1267def update_md5_checksum(instrument_id, sample_id, md5_checksum): 1268 1269 """ 1270 Updates MD5 checksum for a data file during sample acquisition. 1271 1272 TODO: This function will return incorrect results if two different instrument runs 1273 have samples with the same sample ID. It needs to include "run_id" in the SQL query. 1274 1275 Args: 1276 instrument_id (str): 1277 Instrument ID 1278 sample_id (str): 1279 Sample ID (filename) of data file 1280 md5_checksum (str): 1281 MD5 checksum for the sample data file 1282 1283 Returns: 1284 None 1285 """ 1286 1287 # Connect to database 1288 db_metadata, connection = connect_to_database(instrument_id) 1289 1290 # Check if sample is a biological standard and get relevant table 1291 qc_results_table = sa.Table("sample_qc_results", db_metadata, autoload=True) 1292 1293 for identifier in get_biological_standard_identifiers().keys(): 1294 if identifier in sample_id: 1295 qc_results_table = sa.Table("bio_qc_results", db_metadata, autoload=True) 1296 break 1297 1298 # Prepare update of MD5 checksum at sample row 1299 update_md5 = ( 1300 sa.update(qc_results_table) 1301 .where(qc_results_table.c.sample_id == sample_id) 1302 .values(md5=md5_checksum) 1303 ) 1304 1305 # Execute UPDATE into database, then close the connection 1306 connection.execute(update_md5) 1307 connection.close() 1308 1309 1310def write_qc_results(sample_id, instrument_id, run_id, json_mz, json_rt, json_intensity, qc_dataframe, qc_result, is_bio_standard): 1311 1312 """ 1313 Writes QC results (as dictionary records) to sample record upon MS-DIAL processing completion. 1314 1315 QC results consist of m/z, RT, and intensity data for internal standards (or targeted metabolites in biological standards), 1316 as well as a DataFrame containing delta m/z, delta RT, in-run delta RT, warnings, and fails (qc_dataframe) and overall QC result 1317 (which will be "Pass" or "Fail"). 1318 1319 The data is encoded as dictionary in "records" format: [{'col1': 1, 'col2': 0.5}, {'col1': 2, 'col2': 0.75}]. 1320 This dictionary is cast to a string before being passed to this function. 1321 1322 TODO: Update names of arguments from json_x to record_x, as the data is no longer encoded as JSON strings. 1323 The data is now encoded in "records" format as a string. 1324 1325 Args: 1326 sample_id (str): 1327 Sample ID 1328 instrument_id (str): 1329 Instrument ID 1330 run_id (str): 1331 Instrument run ID (Job ID) 1332 json_mz (str): 1333 String dict of internal standard m/z data in "records" format 1334 json_rt (str): 1335 String dict of internal standard RT data in "records" format 1336 json_intensity (str): 1337 String dict of internal standard intensity data in "records" format 1338 qc_dataframe (str): 1339 String dict of various QC data in "records" format 1340 qc_result (str): 1341 QC result for sample, either "Pass" or "Fail" 1342 is_bio_standard (bool): 1343 Whether the sample is a biological standard 1344 1345 Returns: 1346 None 1347 """ 1348 1349 # Connect to database 1350 db_metadata, connection = connect_to_database(instrument_id) 1351 1352 # Get "sample_qc_results" or "bio_qc_results" table 1353 if not is_bio_standard: 1354 qc_results_table = sa.Table("sample_qc_results", db_metadata, autoload=True) 1355 else: 1356 qc_results_table = sa.Table("bio_qc_results", db_metadata, autoload=True) 1357 1358 # Prepare update (insert) of QC results to correct sample row 1359 update_qc_results = ( 1360 sa.update(qc_results_table) 1361 .where((qc_results_table.c.sample_id == sample_id) 1362 & (qc_results_table.c.run_id == run_id)) 1363 .values(precursor_mz=json_mz, 1364 retention_time=json_rt, 1365 intensity=json_intensity, 1366 qc_dataframe=qc_dataframe, 1367 qc_result=qc_result) 1368 ) 1369 1370 # Execute UPDATE into database, then close the connection 1371 connection.execute(update_qc_results) 1372 connection.close() 1373 1374 1375def get_chromatography_methods(): 1376 1377 """ 1378 Returns DataFrame of chromatography methods from the Settings database. 1379 """ 1380 1381 engine = sa.create_engine(settings_database) 1382 df_methods = pd.read_sql("SELECT * FROM chromatography_methods", engine) 1383 return df_methods 1384 1385 1386def get_chromatography_methods_list(): 1387 1388 """ 1389 Returns list of chromatography method ID's from the Settings database. 1390 """ 1391 1392 df_methods = get_chromatography_methods() 1393 return df_methods["method_id"].astype(str).tolist() 1394 1395 1396def insert_chromatography_method(method_id): 1397 1398 """ 1399 Inserts new chromatography method in the "chromatography_methods" table of the Settings database. 1400 1401 Args: 1402 method_id (str): Name of the chromatography method 1403 1404 Returns: 1405 None 1406 """ 1407 1408 # Connect to database 1409 db_metadata, connection = connect_to_database("Settings") 1410 1411 # Get "chromatography_methods" table and "biological_standards" table 1412 chromatography_table = sa.Table("chromatography_methods", db_metadata, autoload=True) 1413 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 1414 1415 # Execute insert of chromatography method 1416 insert_method = chromatography_table.insert().values( 1417 {"method_id": method_id, 1418 "num_pos_standards": 0, 1419 "num_neg_standards": 0, 1420 "pos_istd_msp_file": "", 1421 "neg_istd_msp_file": "", 1422 "pos_parameter_file": "", 1423 "neg_parameter_file": "", 1424 "msdial_config_id": "Default"}) 1425 1426 connection.execute(insert_method) 1427 1428 # Execute insert of method for each biological standard 1429 df_biological_standards = get_biological_standards() 1430 biological_standards = df_biological_standards["name"].astype(str).unique().tolist() 1431 identifiers = df_biological_standards["identifier"].astype(str).tolist() 1432 1433 for index, biological_standard in enumerate(biological_standards): 1434 insert_method_for_bio_standard = biological_standards_table.insert().values({ 1435 "name": biological_standard, 1436 "identifier": identifiers[index], 1437 "chromatography": method_id, 1438 "num_pos_features": 0, 1439 "num_neg_features": 0, 1440 "msdial_config_id": "Default"}) 1441 connection.execute(insert_method_for_bio_standard) 1442 1443 # Execute INSERT to database, then close the connection 1444 connection.close() 1445 1446 1447def remove_chromatography_method(method_id): 1448 1449 """ 1450 Deletes chromatography method and all associated records from the Settings database. 1451 1452 Details: 1453 1. Removes chromatography method in "chromatography_methods" table 1454 2. Removes method from "biological_standards" table 1455 3. Removes associated internal standards from "internal_standards" table 1456 4. Removes associated targeted features from "targeted_features" table 1457 5. Deletes corresponding MSPs from folders 1458 6. Deletes corresponding MSPs from Google Drive (if sync is enabled) 1459 1460 Args: 1461 method_id (str): Name of the chromatography method 1462 1463 Returns: 1464 None 1465 """ 1466 1467 # Delete corresponding MSPs from "methods" directory 1468 df = get_table("Settings", "chromatography_methods") 1469 df = df.loc[df["method_id"] == method_id] 1470 1471 df2 = get_table("Settings", "biological_standards") 1472 df2 = df2.loc[df2["chromatography"] == method_id] 1473 1474 files_to_delete = df["pos_istd_msp_file"].astype(str).tolist() + df["neg_istd_msp_file"].astype(str).tolist() + \ 1475 df2["pos_bio_msp_file"].astype(str).tolist() + df2["neg_bio_msp_file"].astype(str).tolist() 1476 1477 for file in os.listdir(methods_directory): 1478 if file in files_to_delete: 1479 os.remove(os.path.join(methods_directory, file)) 1480 1481 # Connect to database and get relevant tables 1482 db_metadata, connection = connect_to_database("Settings") 1483 chromatography_table = sa.Table("chromatography_methods", db_metadata, autoload=True) 1484 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 1485 internal_standards_table = sa.Table("internal_standards", db_metadata, autoload=True) 1486 targeted_features_table = sa.Table("targeted_features", db_metadata, autoload=True) 1487 1488 delete_queries = [] 1489 1490 # Remove from "chromatography_methods" table 1491 delete_chromatography_method = ( 1492 sa.delete(chromatography_table) 1493 .where((chromatography_table.c.method_id == method_id)) 1494 ) 1495 1496 delete_queries.append(delete_chromatography_method) 1497 1498 # Remove all entries in other tables associated with chromatography 1499 for table in [biological_standards_table, internal_standards_table, targeted_features_table]: 1500 delete_from_table = ( 1501 sa.delete(table) 1502 .where((table.c.chromatography == method_id)) 1503 ) 1504 delete_queries.append(delete_from_table) 1505 1506 # Execute all deletes, then close the connection 1507 for query in delete_queries: 1508 connection.execute(query) 1509 1510 connection.close() 1511 1512 1513def update_msdial_config_for_internal_standards(chromatography, config_id): 1514 1515 """ 1516 Updates MS-DIAL configuration for a given chromatography method. 1517 1518 This MS-DIAL configuration will be used to generate a parameters file 1519 for processing samples run with this chromatography method. 1520 1521 Args: 1522 chromatography (str): 1523 Chromatography method ID (name) 1524 config_id (str): 1525 MS-DIAL configuration ID (name) 1526 1527 Returns: 1528 None 1529 """ 1530 1531 # Connect to database and get relevant tables 1532 db_metadata, connection = connect_to_database("Settings") 1533 methods_table = sa.Table("chromatography_methods", db_metadata, autoload=True) 1534 1535 # Update MS-DIAL configuration for chromatography method 1536 update_msdial_config = ( 1537 sa.update(methods_table) 1538 .where(methods_table.c.method_id == chromatography) 1539 .values(msdial_config_id=config_id) 1540 ) 1541 1542 connection.execute(update_msdial_config) 1543 connection.close() 1544 1545 1546def add_msp_to_database(msp_file, chromatography, polarity, bio_standard=None): 1547 1548 """ 1549 Parses compounds from MSP into the Settings database. 1550 1551 This function writes features from an MSP file into the "internal_standards" or "targeted_features" table, 1552 and inserts location of pos/neg MSP files into "chromatography_methods" table. 1553 1554 TODO: The MSP/TXT libraries have standardized names; there is no need to store the filename in the database. 1555 1556 Args: 1557 msp_file (io.StringIO): 1558 In-memory text-stream file object 1559 chromatography (str): 1560 Chromatography method ID (name) 1561 polarity (str): 1562 Polarity for which MSP should be used for ("Positive Mode" or "Negative Mode") 1563 bio_standard (str, default None): 1564 Parses MSP and applies to biological standard within a chromatography-polarity combination 1565 1566 Returns: 1567 None 1568 """ 1569 1570 # Connect to database 1571 db_metadata, connection = connect_to_database("Settings") 1572 1573 # Write MSP file to folder, store file path in database (further down in function) 1574 if not os.path.exists(methods_directory): 1575 os.makedirs(methods_directory) 1576 1577 if bio_standard is not None: 1578 if polarity == "Positive Mode": 1579 filename = bio_standard.replace(" ", "_") + "_" + chromatography + "_Pos.msp" 1580 elif polarity == "Negative Mode": 1581 filename = bio_standard.replace(" ", "_") + "_" + chromatography + "_Neg.msp" 1582 else: 1583 if polarity == "Positive Mode": 1584 filename = chromatography + "_Pos.msp" 1585 elif polarity == "Negative Mode": 1586 filename = chromatography + "_Neg.msp" 1587 1588 msp_file_path = os.path.join(methods_directory, filename) 1589 1590 with open(msp_file_path, "w") as file: 1591 msp_file.seek(0) 1592 shutil.copyfileobj(msp_file, file) 1593 1594 # Read MSP file 1595 with open(msp_file_path, "r") as msp: 1596 1597 list_of_features = [] 1598 1599 # Split MSP into list of compounds 1600 data = msp.read().split("\n\n") 1601 data = [element.split("\n") for element in data] 1602 1603 # Add each line of each compound into a list 1604 for feature in data: 1605 if len(feature) != 1: 1606 list_of_features.append(feature) 1607 1608 features_dict = {} 1609 added_features = [] 1610 1611 # Iterate through features in MSP 1612 for feature_index, feature in enumerate(list_of_features): 1613 1614 features_dict[feature_index] = { 1615 "Name": None, 1616 "Precursor m/z": None, 1617 "Retention time": None, 1618 "INCHIKEY": None, 1619 "MS2 spectrum": None 1620 } 1621 1622 # Iterate through each line of each feature in the MSP 1623 for data_index, feature_data in enumerate(feature): 1624 1625 # Capture, name, inchikey, m/z, and RT 1626 if "NAME" in feature_data.upper(): 1627 feature_name = feature_data.split(": ")[-1] 1628 if feature_name not in added_features: 1629 added_features.append(feature_name) 1630 features_dict[feature_index]["Name"] = feature_name 1631 continue 1632 else: 1633 break 1634 elif "PRECURSORMZ" in feature_data.upper(): 1635 features_dict[feature_index]["Precursor m/z"] = feature_data.split(": ")[-1] 1636 continue 1637 elif "INCHIKEY" in feature_data.upper(): 1638 features_dict[feature_index]["INCHIKEY"] = feature_data.split(": ")[-1] 1639 continue 1640 elif "RETENTIONTIME" in feature_data.upper(): 1641 features_dict[feature_index]["Retention time"] = feature_data.split(": ")[-1] 1642 continue 1643 1644 # Capture MS2 spectrum 1645 elif "Num Peaks" in feature_data: 1646 1647 # Get number of peaks in MS2 spectrum 1648 num_peaks = int(feature_data.split(": ")[-1]) 1649 1650 # Each line in the MSP corresponds to a peak 1651 start_index = data_index + 1 1652 end_index = data_index + num_peaks + 1 1653 1654 # Each peak is represented as a string e.g. "56.04977\t247187" 1655 peaks_in_spectrum = [] 1656 for peak in feature[start_index:end_index]: 1657 peaks_in_spectrum.append(peak.replace("\t", ":")) 1658 1659 features_dict[feature_index]["MS2 spectrum"] = str(peaks_in_spectrum) 1660 break 1661 1662 features_dict = { key:value for key, value in features_dict.items() if value["Name"] is not None } 1663 1664 # Adding MSP for biological standards 1665 if bio_standard is not None: 1666 1667 # Get "targeted_features" table 1668 targeted_features_table = sa.Table("targeted_features", db_metadata, autoload=True) 1669 1670 # Prepare DELETE of old targeted features 1671 delete_old_targeted_features = ( 1672 sa.delete(targeted_features_table) 1673 .where((targeted_features_table.c.chromatography == chromatography) 1674 & (targeted_features_table.c.polarity == polarity) 1675 & (targeted_features_table.c.biological_standard == bio_standard)) 1676 ) 1677 1678 # Execute DELETE 1679 connection.execute(delete_old_targeted_features) 1680 1681 # Execute INSERT of each targeted feature into targeted_features table 1682 for feature in features_dict: 1683 insert_feature = targeted_features_table.insert().values( 1684 {"name": features_dict[feature]["Name"], 1685 "chromatography": chromatography, 1686 "polarity": polarity, 1687 "biological_standard": bio_standard, 1688 "precursor_mz": features_dict[feature]["Precursor m/z"], 1689 "retention_time": features_dict[feature]["Retention time"], 1690 "ms2_spectrum": features_dict[feature]["MS2 spectrum"], 1691 "inchikey": features_dict[feature]["INCHIKEY"]}) 1692 connection.execute(insert_feature) 1693 1694 # Get "biological_standards" table 1695 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 1696 1697 # Write location of msp file to respective cell 1698 if polarity == "Positive Mode": 1699 update_msp_file = ( 1700 sa.update(biological_standards_table) 1701 .where((biological_standards_table.c.chromatography == chromatography) 1702 & (biological_standards_table.c.name == bio_standard)) 1703 .values(num_pos_features=len(features_dict), 1704 pos_bio_msp_file=filename) 1705 ) 1706 elif polarity == "Negative Mode": 1707 update_msp_file = ( 1708 sa.update(biological_standards_table) 1709 .where((biological_standards_table.c.chromatography == chromatography) 1710 & (biological_standards_table.c.name == bio_standard)) 1711 .values(num_neg_features=len(features_dict), 1712 neg_bio_msp_file=filename) 1713 ) 1714 1715 # Execute UPDATE of MSP file location 1716 connection.execute(update_msp_file) 1717 1718 # Adding MSP for internal standards 1719 else: 1720 1721 # Get internal_standards table 1722 internal_standards_table = sa.Table("internal_standards", db_metadata, autoload=True) 1723 1724 # Prepare DELETE of old internal standards 1725 delete_old_internal_standards = ( 1726 sa.delete(internal_standards_table) 1727 .where((internal_standards_table.c.chromatography == chromatography) 1728 & (internal_standards_table.c.polarity == polarity)) 1729 ) 1730 1731 # Execute DELETE 1732 connection.execute(delete_old_internal_standards) 1733 1734 # Execute INSERT of each internal standard into internal_standards table 1735 for feature in features_dict: 1736 insert_feature = internal_standards_table.insert().values( 1737 {"name": features_dict[feature]["Name"], 1738 "chromatography": chromatography, 1739 "polarity": polarity, 1740 "precursor_mz": features_dict[feature]["Precursor m/z"], 1741 "retention_time": features_dict[feature]["Retention time"], 1742 "ms2_spectrum": features_dict[feature]["MS2 spectrum"], 1743 "inchikey": features_dict[feature]["INCHIKEY"]}) 1744 connection.execute(insert_feature) 1745 1746 # Get "chromatography" table 1747 chromatography_table = sa.Table("chromatography_methods", db_metadata, autoload=True) 1748 1749 # Write location of msp file to respective cell 1750 if polarity == "Positive Mode": 1751 update_msp_file = ( 1752 sa.update(chromatography_table) 1753 .where(chromatography_table.c.method_id == chromatography) 1754 .values(num_pos_standards=len(features_dict), 1755 pos_istd_msp_file=filename) 1756 ) 1757 elif polarity == "Negative Mode": 1758 update_msp_file = ( 1759 sa.update(chromatography_table) 1760 .where(chromatography_table.c.method_id == chromatography) 1761 .values(num_neg_standards=len(features_dict), 1762 neg_istd_msp_file=filename) 1763 ) 1764 1765 # Execute UPDATE of MSP file location 1766 connection.execute(update_msp_file) 1767 1768 # If the corresponding TXT library existed, delete it 1769 txt_library = os.path.join(methods_directory, filename.replace(".msp", ".txt")) 1770 os.remove(txt_library) if os.path.exists(txt_library) else None 1771 1772 # Close the connection 1773 connection.close() 1774 1775 1776def add_csv_to_database(csv_file, chromatography, polarity): 1777 1778 """ 1779 Parses compounds from a CSV file into the Settings database. 1780 1781 Parses compounds from a CSV into the "internal_standards" table, and stores 1782 the location of the pos/neg TXT files in "chromatography_methods" table. 1783 1784 TODO: The MSP/TXT libraries have standardized names; there is no need to store the filename in the database. 1785 1786 Args: 1787 csv_file (io.StringIO): 1788 In-memory text-stream file object 1789 chromatography (str): 1790 Chromatography method ID (name) 1791 polarity (str): 1792 Polarity for which MSP should be used for ("Positive Mode" or "Negative Mode") 1793 1794 Returns: 1795 None 1796 """ 1797 1798 # Convert CSV file into Python dictionary 1799 df_internal_standards = pd.read_csv(csv_file, index_col=False) 1800 internal_standards_dict = df_internal_standards.to_dict("index") 1801 1802 # Create methods directory if it doesn't already exist 1803 if not os.path.exists(methods_directory): 1804 os.makedirs(methods_directory) 1805 1806 # Name file accordingly 1807 if polarity == "Positive Mode": 1808 filename = chromatography + "_Pos.txt" 1809 elif polarity == "Negative Mode": 1810 filename = chromatography + "_Neg.txt" 1811 1812 txt_file_path = os.path.join(methods_directory, filename) 1813 1814 # Write CSV columns to tab-delimited text file 1815 df_internal_standards.to_csv(txt_file_path, sep="\t", index=False) 1816 1817 # Connect to database 1818 db_metadata, connection = connect_to_database("Settings") 1819 1820 # Get internal_standards table 1821 internal_standards_table = sa.Table("internal_standards", db_metadata, autoload=True) 1822 1823 # Prepare DELETE of old internal standards 1824 delete_old_internal_standards = ( 1825 sa.delete(internal_standards_table) 1826 .where((internal_standards_table.c.chromatography == chromatography) 1827 & (internal_standards_table.c.polarity == polarity)) 1828 ) 1829 1830 # Execute DELETE 1831 connection.execute(delete_old_internal_standards) 1832 1833 # Execute INSERT of each internal standard into internal_standards table 1834 for row in internal_standards_dict.keys(): 1835 insert_standard = internal_standards_table.insert().values( 1836 {"name": internal_standards_dict[row]["Common Name"], 1837 "chromatography": chromatography, 1838 "polarity": polarity, 1839 "precursor_mz": internal_standards_dict[row]["MS1 m/z"], 1840 "retention_time": internal_standards_dict[row]["RT (min)"]}) 1841 connection.execute(insert_standard) 1842 1843 # Get "chromatography" table 1844 chromatography_table = sa.Table("chromatography_methods", db_metadata, autoload=True) 1845 1846 # Write location of CSV file to respective cell 1847 if polarity == "Positive Mode": 1848 update_msp_file = ( 1849 sa.update(chromatography_table) 1850 .where(chromatography_table.c.method_id == chromatography) 1851 .values(num_pos_standards=len(internal_standards_dict), 1852 pos_istd_msp_file=filename) 1853 ) 1854 elif polarity == "Negative Mode": 1855 update_msp_file = ( 1856 sa.update(chromatography_table) 1857 .where(chromatography_table.c.method_id == chromatography) 1858 .values(num_neg_standards=len(internal_standards_dict), 1859 neg_istd_msp_file=filename) 1860 ) 1861 1862 # Execute UPDATE of CSV file location 1863 connection.execute(update_msp_file) 1864 1865 # If the corresponding MSP library existed, delete it 1866 msp_library = os.path.join(methods_directory, filename.replace(".txt", ".msp")) 1867 os.remove(msp_library) if os.path.exists(msp_library) else None 1868 1869 # Close the connection 1870 connection.close() 1871 1872 1873def get_msdial_configurations(): 1874 1875 """ 1876 Returns list of user configurations of MS-DIAL parameters from Settings database. 1877 """ 1878 1879 engine = sa.create_engine(settings_database) 1880 df_msdial_configurations = pd.read_sql("SELECT * FROM msdial_parameters", engine) 1881 return df_msdial_configurations["config_name"].astype(str).tolist() 1882 1883 1884def generate_msdial_parameters_file(chromatography, polarity, msp_file_path, bio_standard=None): 1885 1886 """ 1887 Uses parameters from user-curated MS-DIAL configuration to create a parameters.txt file for MS-DIAL. 1888 1889 TODO: Currently, this function is only called upon a new job setup. To allow changes during a QC job, 1890 this function should be called every time the user makes a configuration save in Settings > MS-DIAL Configurations. 1891 1892 Args: 1893 chromatography (str): 1894 Chromatography method ID (name) 1895 polarity (str): 1896 Polarity ("Positive" or "Negative") 1897 msp_file_path (str): 1898 MSP library file path 1899 bio_standard (str, default None): 1900 Specifies that the parameters file is for a biological standard 1901 1902 Returns: 1903 None 1904 """ 1905 1906 # Get parameters of selected configuration 1907 if bio_standard is not None: 1908 df_bio_standards = get_biological_standards() 1909 df_bio_standards = df_bio_standards.loc[ 1910 (df_bio_standards["chromatography"] == chromatography) & (df_bio_standards["name"] == bio_standard)] 1911 config_name = df_bio_standards["msdial_config_id"].astype(str).values[0] 1912 else: 1913 df_methods = get_chromatography_methods() 1914 df_methods = df_methods.loc[df_methods["method_id"] == chromatography] 1915 config_name = df_methods["msdial_config_id"].astype(str).values[0] 1916 1917 parameters = get_msdial_configuration_parameters(config_name) 1918 1919 # Create "methods" directory if it does not exist 1920 if not os.path.exists(methods_directory): 1921 os.makedirs(methods_directory) 1922 1923 # Name parameters file accordingly 1924 if bio_standard is not None: 1925 if polarity == "Positive": 1926 filename = bio_standard.replace(" ", "_") + "_" + config_name.replace(" ", "_") + "_Parameters_Pos.txt" 1927 elif polarity == "Negative": 1928 filename = bio_standard.replace(" ", "_") + "_" + config_name.replace(" ", "_") + "_Parameters_Neg.txt" 1929 else: 1930 if polarity == "Positive": 1931 filename = chromatography.replace(" ", "_") + "_" + config_name.replace(" ", "_") + "_Parameters_Pos.txt" 1932 elif polarity == "Negative": 1933 filename = chromatography.replace(" ", "_") + "_" + config_name.replace(" ", "_") + "_Parameters_Neg.txt" 1934 1935 parameters_file = os.path.join(methods_directory, filename) 1936 1937 # Some specifications based on polarity / file type for the parameters 1938 if polarity == "Positive": 1939 adduct_type = "[M+H]+" 1940 elif polarity == "Negative": 1941 adduct_type = "[M-H]-" 1942 1943 if msp_file_path.endswith(".msp"): 1944 filepath = "MSP file: " + msp_file_path 1945 elif msp_file_path.endswith(".txt"): 1946 filepath = "Text file: " + msp_file_path 1947 1948 # Text file contents 1949 lines = [ 1950 "#Data type", 1951 "MS1 data type: Centroid", 1952 "MS2 data type: Centroid", 1953 "Ion mode: " + polarity, 1954 "DIA file:", "\n" 1955 1956 "#Data collection parameters", 1957 "Retention time begin: " + str(parameters[0]), 1958 "Retention time end: " + str(parameters[1]), 1959 "Mass range begin: " + str(parameters[2]), 1960 "Mass range end: " + str(parameters[3]), "\n", 1961 1962 "#Centroid parameters", 1963 "MS1 tolerance for centroid: " + str(parameters[4]), 1964 "MS2 tolerance for centroid: " + str(parameters[5]), "\n", 1965 1966 "#Peak detection parameters", 1967 "Smoothing method: " + str(parameters[6]), 1968 "Smoothing level: " + str(parameters[7]), 1969 "Minimum peak width: " + str(parameters[8]), 1970 "Minimum peak height: " + str(parameters[9]), 1971 "Mass slice width: " + str(parameters[10]), "\n", 1972 1973 "#Deconvolution parameters", 1974 "Sigma window value: 0.5", 1975 "Amplitude cut off: 0", "\n", 1976 1977 "#Adduct list", 1978 "Adduct list: " + adduct_type, "\n", 1979 1980 "#Text file and post identification (retention time and accurate mass based) setting", 1981 filepath, 1982 "Retention time tolerance for post identification: " + str(parameters[11]), 1983 "Accurate ms1 tolerance for post identification: " + str(parameters[12]), 1984 "Post identification score cut off: " + str(parameters[13]), "\n", 1985 1986 "#Alignment parameters setting", 1987 "Retention time tolerance for alignment: " + str(parameters[14]), 1988 "MS1 tolerance for alignment: " + str(parameters[15]), 1989 "Retention time factor for alignment: " + str(parameters[16]), 1990 "MS1 factor for alignment: " + str(parameters[17]), 1991 "Peak count filter: " + str(parameters[18]), 1992 "QC at least filter: " + str(parameters[19]), 1993 ] 1994 1995 # Write parameters to a text file 1996 with open(parameters_file, "w") as file: 1997 for line in lines: 1998 file.write(line) 1999 if line != "\n": 2000 file.write("\n") 2001 2002 # Write path of parameters text file to chromatography method in database 2003 db_metadata, connection = connect_to_database("Settings") 2004 chromatography_table = sa.Table("chromatography_methods", db_metadata, autoload=True) 2005 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 2006 2007 # For processing biological standard samples 2008 if bio_standard is not None: 2009 if polarity == "Positive": 2010 update_parameter_file = ( 2011 sa.update(biological_standards_table) 2012 .where((biological_standards_table.c.chromatography == chromatography) 2013 & (biological_standards_table.c.name == bio_standard)) 2014 .values(pos_parameter_file=parameters_file) 2015 ) 2016 elif polarity == "Negative": 2017 update_parameter_file = ( 2018 sa.update(biological_standards_table) 2019 .where((biological_standards_table.c.chromatography == chromatography) 2020 & (biological_standards_table.c.name == bio_standard)) 2021 .values(neg_parameter_file=parameters_file) 2022 ) 2023 # For processing samples with internal standards 2024 else: 2025 if polarity == "Positive": 2026 update_parameter_file = ( 2027 sa.update(chromatography_table) 2028 .where(chromatography_table.c.method_id == chromatography) 2029 .values(pos_parameter_file=parameters_file) 2030 ) 2031 elif polarity == "Negative": 2032 update_parameter_file = ( 2033 sa.update(chromatography_table) 2034 .where(chromatography_table.c.method_id == chromatography) 2035 .values(neg_parameter_file=parameters_file) 2036 ) 2037 2038 connection.execute(update_parameter_file) 2039 connection.close() 2040 2041 2042def add_msdial_configuration(msdial_config_name): 2043 2044 """ 2045 Inserts new user configuration of MS-DIAL parameters into the "msdial_parameters" table in Settings database. 2046 2047 Args: 2048 msdial_config_name (str): MS-DIAL configuration ID 2049 2050 Returns: 2051 None 2052 """ 2053 2054 # Connect to database 2055 db_metadata, connection = connect_to_database("Settings") 2056 2057 # Get MS-DIAL parameters table 2058 msdial_parameters_table = sa.Table("msdial_parameters", db_metadata, autoload=True) 2059 2060 # Prepare insert of user-inputted run data 2061 insert_config = msdial_parameters_table.insert().values( 2062 {"config_name": msdial_config_name, 2063 "rt_begin": 0, 2064 "rt_end": 100, 2065 "mz_begin": 0, 2066 "mz_end": 2000, 2067 "ms1_centroid_tolerance": 0.008, 2068 "ms2_centroid_tolerance": 0.01, 2069 "smoothing_method": "LinearWeightedMovingAverage", 2070 "smoothing_level": 3, 2071 "min_peak_width": 3, 2072 "min_peak_height": 35000, 2073 "mass_slice_width": 0.1, 2074 "post_id_rt_tolerance": 0.3, 2075 "post_id_mz_tolerance": 0.008, 2076 "post_id_score_cutoff": 85, 2077 "alignment_rt_tolerance": 0.05, 2078 "alignment_mz_tolerance": 0.008, 2079 "alignment_rt_factor": 0.5, 2080 "alignment_mz_factor": 0.5, 2081 "peak_count_filter": 0, 2082 "qc_at_least_filter": "True"} 2083 ) 2084 2085 # Execute INSERT to database, then close the connection 2086 connection.execute(insert_config) 2087 connection.close() 2088 2089 2090def remove_msdial_configuration(msdial_config_name): 2091 2092 """ 2093 Deletes user configuration of MS-DIAL parameters from the "msdial_parameters" table. 2094 2095 Args: 2096 msdial_config_name (str): MS-DIAL configuration ID 2097 2098 Returns: 2099 None 2100 """ 2101 2102 # Connect to database 2103 db_metadata, connection = connect_to_database("Settings") 2104 2105 # Get MS-DIAL parameters table 2106 msdial_parameters_table = sa.Table("msdial_parameters", db_metadata, autoload=True) 2107 2108 # Prepare DELETE of MS-DIAL configuration 2109 delete_config = ( 2110 sa.delete(msdial_parameters_table) 2111 .where(msdial_parameters_table.c.config_name == msdial_config_name) 2112 ) 2113 2114 # Execute DELETE, then close the connection 2115 connection.execute(delete_config) 2116 connection.close() 2117 2118 2119def get_msdial_configuration_parameters(msdial_config_name, parameter=None): 2120 2121 """ 2122 Returns tuple of parameters defined for a selected MS-DIAL configuration. 2123 2124 TODO: The MS-DIAL configuration is returned as a tuple for a concise implementation of get_msdial_parameters_for_config() 2125 in the DashWebApp module. While convenient there, this function is not optimal for maintainability. Should return 2126 the entire DataFrame record instead. 2127 2128 See update_msdial_configuration() for details on parameters. 2129 2130 Args: 2131 msdial_config_name (str): 2132 MS-DIAL configuration ID 2133 parameter (str, default None): 2134 If specified, returns only the value for the given parameter 2135 2136 Returns: 2137 Tuple of parameters for the given MS-DIAL configuration, or single parameter value. 2138 """ 2139 2140 # Get "msdial_parameters" table from database as a DataFrame 2141 engine = sa.create_engine(settings_database) 2142 df_configurations = pd.read_sql("SELECT * FROM msdial_parameters", engine) 2143 2144 # Get selected configuration 2145 selected_config = df_configurations.loc[ 2146 df_configurations["config_name"] == msdial_config_name] 2147 2148 selected_config.drop(["id", "config_name"], inplace=True, axis=1) 2149 2150 if parameter is not None: 2151 return selected_config[parameter].values[0] 2152 else: 2153 return tuple(selected_config.to_records(index=False)[0]) 2154 2155 2156def update_msdial_configuration(config_name, rt_begin, rt_end, mz_begin, mz_end, ms1_centroid_tolerance, 2157 ms2_centroid_tolerance, smoothing_method, smoothing_level, mass_slice_width, min_peak_width, min_peak_height, 2158 post_id_rt_tolerance, post_id_mz_tolerance, post_id_score_cutoff, alignment_rt_tolerance, alignment_mz_tolerance, 2159 alignment_rt_factor, alignment_mz_factor, peak_count_filter, qc_at_least_filter): 2160 2161 """ 2162 Updates and saves changes of all parameters for a selected MS-DIAL configuration. 2163 2164 For details on MS-DIAL parameters, see: https://mtbinfo-team.github.io/mtbinfo.github.io/MS-DIAL/tutorial#section-2-3 2165 2166 Args: 2167 config_name (str): 2168 Name / ID of MS-DIAL configuration 2169 rt_begin (int): 2170 Minimum retention time in RT range for analysis range 2171 rt_end (int): 2172 Maximum retention time in RT range for analysis 2173 mz_begin (float): 2174 Minimum precursor mass in m/z range for analysis range 2175 mz_end (float): 2176 Maximum precursor mass in m/z range for analysis range 2177 ms1_centroid_tolerance (float): 2178 MS1 centroid tolerance 2179 ms2_centroid_tolerance (float): 2180 MS2 centroid tolerance 2181 smoothing_method (str): 2182 Peak smoothing method for peak detection 2183 smoothing_level (int): 2184 Peak smoothing level 2185 mass_slice_width (float): 2186 Mass slice width 2187 min_peak_width (int): 2188 Minimum peak width threshold 2189 min_peak_height (int): 2190 Minimum peak height threshold 2191 post_id_rt_tolerance (float): 2192 Post-identification retention time tolerance 2193 post_id_mz_tolerance (float): 2194 Post-identification precursor m/z tolerance 2195 post_id_score_cutoff (int): 2196 Similarity score cutoff after peak identification 2197 alignment_rt_tolerance (float): 2198 Post-alignment retention time tolerance 2199 alignment_mz_tolerance (float): 2200 Post-alignment precursor m/z tolerance 2201 alignment_rt_factor (float): 2202 Post-alignment retention time factor 2203 alignment_mz_factor (float): 2204 Post-alignment precursor m/z tolerance 2205 peak_count_filter (int): 2206 Peak count filter 2207 qc_at_least_filter (str): 2208 QC at least filter 2209 2210 Returns: 2211 None 2212 """ 2213 2214 # Connect to database 2215 db_metadata, connection = connect_to_database("Settings") 2216 2217 # Get MS-DIAL parameters table 2218 msdial_parameters_table = sa.Table("msdial_parameters", db_metadata, autoload=True) 2219 2220 # Prepare insert of user-inputted MS-DIAL parameters 2221 update_parameters = ( 2222 sa.update(msdial_parameters_table) 2223 .where(msdial_parameters_table.c.config_name == config_name) 2224 .values(rt_begin=rt_begin, 2225 rt_end=rt_end, 2226 mz_begin=mz_begin, 2227 mz_end=mz_end, 2228 ms1_centroid_tolerance=ms1_centroid_tolerance, 2229 ms2_centroid_tolerance=ms2_centroid_tolerance, 2230 smoothing_method=smoothing_method, 2231 smoothing_level=smoothing_level, 2232 min_peak_width=min_peak_width, 2233 min_peak_height=min_peak_height, 2234 mass_slice_width=mass_slice_width, 2235 post_id_rt_tolerance=post_id_rt_tolerance, 2236 post_id_mz_tolerance=post_id_mz_tolerance, 2237 post_id_score_cutoff=post_id_score_cutoff, 2238 alignment_rt_tolerance=alignment_rt_tolerance, 2239 alignment_mz_tolerance=alignment_mz_tolerance, 2240 alignment_rt_factor=alignment_rt_factor, 2241 alignment_mz_factor=alignment_mz_factor, 2242 peak_count_filter=peak_count_filter, 2243 qc_at_least_filter=qc_at_least_filter) 2244 ) 2245 2246 # Execute UPDATE to database, then close the connection 2247 connection.execute(update_parameters) 2248 connection.close() 2249 2250 2251def get_msp_file_path(chromatography, polarity, bio_standard=None): 2252 2253 """ 2254 Returns file paths of MSPs for a selected chromatography / polarity (both stored 2255 in the methods folder upon user upload) for MS-DIAL parameter file generation. 2256 2257 TODO: Once added to workspace, MSP / TXT library file names are standardized. No need to store / retrieve from database. 2258 Get the file path using the filename e.g. return directory + chromatography + "_" + polarity + ".msp". 2259 2260 Args: 2261 chromatography (str): 2262 Chromatography method ID 2263 polarity (str): 2264 Polarity, either "Positive" or "Negative" 2265 bio_standard (str, default None): 2266 Name of biological standard 2267 2268 Returns: 2269 MSP / TXT library file path. 2270 """ 2271 2272 # Connect to database 2273 engine = sa.create_engine(settings_database) 2274 2275 if bio_standard is not None: 2276 # Get selected biological standard 2277 query = "SELECT * FROM biological_standards WHERE name = '" + bio_standard + "' AND chromatography='" + chromatography + "'" 2278 df_biological_standards = pd.read_sql(query, engine) 2279 2280 # Get file path of MSP in requested polarity 2281 if polarity == "Positive": 2282 msp_file_path = df_biological_standards["pos_bio_msp_file"].astype(str).values[0] 2283 elif polarity == "Negative": 2284 msp_file_path = df_biological_standards["neg_bio_msp_file"].astype(str).values[0] 2285 2286 else: 2287 # Get selected chromatography method 2288 query = "SELECT * FROM chromatography_methods WHERE method_id='" + chromatography + "'" 2289 df_methods = pd.read_sql(query, engine) 2290 2291 # Get file path of MSP in requested polarity 2292 if polarity == "Positive": 2293 msp_file_path = df_methods["pos_istd_msp_file"].astype(str).values[0] 2294 elif polarity == "Negative": 2295 msp_file_path = df_methods["neg_istd_msp_file"].astype(str).values[0] 2296 2297 msp_file_path = os.path.join(methods_directory, msp_file_path) 2298 2299 # Return file path 2300 return msp_file_path 2301 2302 2303def get_parameter_file_path(chromatography, polarity, biological_standard=None): 2304 2305 """ 2306 Returns file path of parameters file stored in database. 2307 2308 TODO: Once generated, MS-DIAL parameter filenames are standardized. No need to store / retrieve from database. 2309 Get the file path using the filename e.g. return directory + chromatography + "_" + polarity + "_Parameters.txt". 2310 2311 Args: 2312 chromatography (str): 2313 Chromatography method ID 2314 polarity (str): 2315 Polarity, either "Positive" or "Negative" 2316 bio_standard (str, default None): 2317 Name of biological standard 2318 2319 Returns: 2320 File path for MS-DIAL parameters.txt file. 2321 """ 2322 2323 engine = sa.create_engine(settings_database) 2324 2325 if biological_standard is not None: 2326 query = "SELECT * FROM biological_standards WHERE chromatography='" + chromatography + \ 2327 "' AND name ='" + biological_standard + "'" 2328 else: 2329 query = "SELECT * FROM chromatography_methods WHERE method_id='" + chromatography + "'" 2330 2331 df = pd.read_sql(query, engine) 2332 2333 if polarity == "Pos": 2334 parameter_file = df["pos_parameter_file"].astype(str).values[0] 2335 elif polarity == "Neg": 2336 parameter_file = df["neg_parameter_file"].astype(str).values[0] 2337 2338 return parameter_file 2339 2340 2341def get_msdial_directory(): 2342 2343 """ 2344 Returns location of MS-DIAL directory. 2345 """ 2346 2347 return get_table("Settings", "workspace")["msdial_directory"].astype(str).values[0] 2348 2349 2350def get_msconvert_directory(): 2351 2352 """ 2353 Returns location of MSConvert directory. 2354 2355 This function uses the MS-DIAL directory path to retrieve user ID, which it then uses to 2356 retrieve the path for MSConvert.exe in C:/Users/<username>/AppData/Local/Apps. 2357 2358 TODO: There is probably a better way to implement this. 2359 2360 Returns: 2361 Location of MSConvert directory in C:/Users/<username>/AppData/Local/Apps/ProteoWizard. 2362 """ 2363 2364 user = get_msdial_directory().replace("\\", "/").split("/")[2] 2365 msconvert_folder = [f.path for f in os.scandir("C:/Users/" + user + "/AppData/Local/Apps/") if f.is_dir() and "ProteoWizard" in f.name][0] 2366 return msconvert_folder 2367 2368 2369def update_msdial_directory(msdial_directory): 2370 2371 """ 2372 Updates location of MS-DIAL directory, stored in "workspace" table of the Settings database. 2373 2374 Args: 2375 msdial_directory (str): New MS-DIAL directory location 2376 2377 Returns: 2378 None 2379 """ 2380 2381 db_metadata, connection = connect_to_database("Settings") 2382 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 2383 2384 update_msdial_directory = ( 2385 sa.update(workspace_table) 2386 .where(workspace_table.c.id == 1) 2387 .values(msdial_directory=msdial_directory) 2388 ) 2389 2390 connection.execute(update_msdial_directory) 2391 connection.close() 2392 2393 2394def get_internal_standards_dict(chromatography, value_type): 2395 2396 """ 2397 Returns dictionary of internal standard keys mapped to either m/z or RT values. 2398 2399 This function is used to establish a y-axis range for internal standard retention time plots. 2400 See load_istd_rt_plot() in the PlotGeneration module. 2401 2402 TODO: This function needs to filter for polarity! 2403 2404 Args: 2405 chromatography (str): 2406 Chromatography method to retrieve internal standards for 2407 value_type (str): 2408 Data type ("precursor_mz", "retention_time", "ms2_spectrum") 2409 2410 Returns: 2411 Dictionary with key-value pairs of { internal_standard: value_type } 2412 """ 2413 2414 engine = sa.create_engine(settings_database) 2415 query = "SELECT * FROM internal_standards " + "WHERE chromatography='" + chromatography + "'" 2416 df_internal_standards = pd.read_sql(query, engine) 2417 2418 dict = {} 2419 keys = df_internal_standards["name"].astype(str).tolist() 2420 values = df_internal_standards[value_type].astype(float).tolist() 2421 2422 for index, key in enumerate(keys): 2423 dict[key] = values[index] 2424 2425 return dict 2426 2427 2428def get_internal_standards(chromatography, polarity): 2429 2430 """ 2431 Returns DataFrame of internal standards for a given chromatography method and polarity. 2432 2433 Args: 2434 chromatography (str): 2435 Chromatography method ID 2436 polarity (str): 2437 Polarity (either "Pos" or "Neg") 2438 2439 Returns: 2440 DataFrame of "internal_standards" table from Settings database, filtered by chromatography and polarity. 2441 """ 2442 2443 if polarity == "Pos": 2444 polarity = "Positive Mode" 2445 elif polarity == "Neg": 2446 polarity = "Negative Mode" 2447 2448 engine = sa.create_engine(settings_database) 2449 2450 query = "SELECT * FROM internal_standards " + \ 2451 "WHERE chromatography='" + chromatography + "' AND polarity='" + polarity + "'" 2452 2453 return pd.read_sql(query, engine) 2454 2455 2456def get_targeted_features(biological_standard, chromatography, polarity): 2457 2458 """ 2459 Returns DataFrame of metabolite targets for a given biological standard, chromatography, and polarity. 2460 2461 Args: 2462 biological_standard (str): 2463 Name of biological standard 2464 chromatography (str): 2465 Chromatography method ID (name) 2466 polarity (str): 2467 Polarity (either "Pos" or "Neg") 2468 2469 Returns: 2470 DataFrame of "targeted_features" table from Settings database, filtered by chromatography and polarity. 2471 """ 2472 2473 if polarity == "Pos": 2474 polarity = "Positive Mode" 2475 elif polarity == "Neg": 2476 polarity = "Negative Mode" 2477 2478 engine = sa.create_engine(settings_database) 2479 2480 query = "SELECT * FROM targeted_features " + \ 2481 "WHERE chromatography='" + chromatography + \ 2482 "' AND polarity='" + polarity + \ 2483 "' AND biological_standard ='" + biological_standard + "'" 2484 2485 return pd.read_sql(query, engine) 2486 2487 2488def get_biological_standards(): 2489 2490 """ 2491 Returns DataFrame of the "biological_standards" table from the Settings database. 2492 """ 2493 2494 # Get table from database as a DataFrame 2495 engine = sa.create_engine(settings_database) 2496 df_biological_standards = pd.read_sql("SELECT * FROM biological_standards", engine) 2497 return df_biological_standards 2498 2499 2500def get_biological_standards_list(): 2501 2502 """ 2503 Returns list of biological standards from the Settings database. 2504 """ 2505 2506 df_biological_standards = get_biological_standards() 2507 return df_biological_standards["name"].astype(str).unique().tolist() 2508 2509 2510def add_biological_standard(name, identifier): 2511 2512 """ 2513 Creates new biological standard with name and identifier. 2514 2515 The biological standard identifier is a text substring used to distinguish between sample and biological standard. 2516 MS-AutoQC checks filenames in the sequence for this identifier to process samples accordingly. 2517 2518 Args: 2519 name (str): 2520 Name of biological standard 2521 identifier (str): 2522 String identifier in filename for biological standard 2523 2524 Returns: 2525 None 2526 """ 2527 2528 # Get list of chromatography methods 2529 chromatography_methods = get_chromatography_methods()["method_id"].tolist() 2530 2531 # Connect to database and get "biological_standards" table 2532 db_metadata, connection = connect_to_database("Settings") 2533 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 2534 2535 # Insert a biological standard row for each chromatography 2536 for method in chromatography_methods: 2537 insert = biological_standards_table.insert().values({ 2538 "name": name, 2539 "identifier": identifier, 2540 "chromatography": method, 2541 "num_pos_features": 0, 2542 "num_neg_features": 0, 2543 "msdial_config_id": "Default" 2544 }) 2545 connection.execute(insert) 2546 2547 # Close the connection 2548 connection.close() 2549 2550 2551def remove_biological_standard(name): 2552 2553 """ 2554 Deletes biological standard and corresponding MSPs from Settings database. 2555 2556 Args: 2557 name (str): Name of the biological standard 2558 2559 Returns: 2560 None 2561 """ 2562 2563 # Delete corresponding MSPs from "methods" directory 2564 df = get_table("Settings", "biological_standards") 2565 df = df.loc[df["name"] == name] 2566 files_to_delete = df["pos_bio_msp_file"].astype(str).tolist() + df["neg_bio_msp_file"].astype(str).tolist() 2567 2568 for file in os.listdir(methods_directory): 2569 if name in files_to_delete: 2570 os.remove(os.path.join(methods_directory, file)) 2571 2572 # Connect to database and get relevant tables 2573 db_metadata, connection = connect_to_database("Settings") 2574 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 2575 targeted_features_table = sa.Table("targeted_features", db_metadata, autoload=True) 2576 2577 # Remove biological standard 2578 delete_biological_standard = ( 2579 sa.delete(biological_standards_table) 2580 .where((biological_standards_table.c.name == name)) 2581 ) 2582 connection.execute(delete_biological_standard) 2583 2584 # Remove targeted features for that biological standard 2585 delete_targeted_features = ( 2586 sa.delete(targeted_features_table) 2587 .where((targeted_features_table.c.biological_standard == name)) 2588 ) 2589 connection.execute(delete_targeted_features) 2590 2591 # Close the connection 2592 connection.close() 2593 2594 2595def update_msdial_config_for_bio_standard(biological_standard, chromatography, config_id): 2596 2597 """ 2598 Updates MS-DIAL configuration for given biological standard and chromatography method combination. 2599 2600 Args: 2601 biological_standard (str): 2602 Name of the biological standard 2603 chromatography (str): 2604 Chromatography method 2605 config_id (str): 2606 Name of MS-DIAL configuration to set for this biological standard - chromatography combination 2607 2608 Returns: 2609 None 2610 """ 2611 2612 # Connect to database and get relevant tables 2613 db_metadata, connection = connect_to_database("Settings") 2614 biological_standards_table = sa.Table("biological_standards", db_metadata, autoload=True) 2615 2616 # Update MS-DIAL configuration for biological standard 2617 update_msdial_config = ( 2618 sa.update(biological_standards_table) 2619 .where((biological_standards_table.c.name == biological_standard) 2620 & (biological_standards_table.c.chromatography == chromatography)) 2621 .values(msdial_config_id=config_id) 2622 ) 2623 2624 connection.execute(update_msdial_config) 2625 connection.close() 2626 2627 2628def get_biological_standard_identifiers(bio_standards=None): 2629 2630 """ 2631 Returns dictionary of identifiers for a given list of biological standards. 2632 2633 If no list is provided, returns dict of identifiers for all biological standards. 2634 2635 Args: 2636 bio_standards (list, default None): List of biological standards 2637 2638 Returns: 2639 Dictionary with key-value pairs of { identifier: biological_standard } 2640 """ 2641 2642 df_bio_standards = get_biological_standards() 2643 2644 identifiers = {} 2645 2646 if bio_standards is not None: 2647 if len(bio_standards) > 0: 2648 for bio_standard in bio_standards: 2649 df = df_bio_standards.loc[df_bio_standards["name"] == bio_standard] 2650 identifier = df["identifier"].astype(str).unique().tolist()[0] 2651 identifiers[identifier] = bio_standard 2652 else: 2653 names = df_bio_standards["name"].astype(str).unique().tolist() 2654 ids = df_bio_standards["identifier"].astype(str).unique().tolist() 2655 for index, name in enumerate(names): 2656 identifiers[ids[index]] = names[index] 2657 2658 return identifiers 2659 2660 2661def get_qc_configurations(): 2662 2663 """ 2664 Returns DataFrame of "qc_parameters" table from Settings database. 2665 """ 2666 2667 engine = sa.create_engine(settings_database) 2668 return pd.read_sql("SELECT * FROM qc_parameters", engine) 2669 2670 2671def get_qc_configurations_list(): 2672 2673 """ 2674 Returns list of names of QC configurations from Settings database. 2675 """ 2676 2677 return get_qc_configurations()["config_name"].astype(str).tolist() 2678 2679 2680def add_qc_configuration(qc_config_name): 2681 2682 """ 2683 Adds a new QC configuration to the "qc_parameters" table in the Settings database. 2684 2685 Args: 2686 qc_config_name (str): Name of the QC configuration 2687 2688 Returns: 2689 None 2690 """ 2691 2692 # Connect to database 2693 db_metadata, connection = connect_to_database("Settings") 2694 2695 # Get QC parameters table 2696 qc_parameters_table = sa.Table("qc_parameters", db_metadata, autoload=True) 2697 2698 # Prepare insert of user-inputted run data 2699 insert_config = qc_parameters_table.insert().values( 2700 {"config_name": qc_config_name, 2701 "intensity_dropouts_cutoff": 4, 2702 "library_rt_shift_cutoff": 0.1, 2703 "in_run_rt_shift_cutoff": 0.05, 2704 "library_mz_shift_cutoff": 0.005, 2705 "intensity_enabled": True, 2706 "library_rt_enabled": True, 2707 "in_run_rt_enabled": True, 2708 "library_mz_enabled": True} 2709 ) 2710 2711 # Execute INSERT to database, then close the connection 2712 connection.execute(insert_config) 2713 connection.close() 2714 2715 2716def remove_qc_configuration(qc_config_name): 2717 2718 """ 2719 Deletes QC configuration from the "qc_parameters" table in the Settings database. 2720 2721 Args: 2722 qc_config_name (str): Name of the QC configuration 2723 2724 Returns: 2725 None 2726 """ 2727 2728 # Connect to database 2729 db_metadata, connection = connect_to_database("Settings") 2730 2731 # Get QC parameters table 2732 qc_parameters_table = sa.Table("qc_parameters", db_metadata, autoload=True) 2733 2734 # Prepare DELETE of MS-DIAL configuration 2735 delete_config = ( 2736 sa.delete(qc_parameters_table) 2737 .where(qc_parameters_table.c.config_name == qc_config_name) 2738 ) 2739 2740 # Execute DELETE, then close the connection 2741 connection.execute(delete_config) 2742 connection.close() 2743 2744 2745def get_qc_configuration_parameters(config_name=None, instrument_id=None, run_id=None): 2746 2747 """ 2748 Returns DataFrame of parameters for a selected QC configuration. 2749 2750 The DataFrame has columns for each parameter, as well as for whether the parameter is enabled. 2751 2752 Args: 2753 config_name (str, default None): 2754 Name of QC configuration 2755 instrument_id (str, default None): 2756 Instrument ID (name) 2757 run_id (str, default None): 2758 Instrument run ID (job ID) 2759 2760 Returns: 2761 DataFrame of parameters for QC configuration. 2762 """ 2763 2764 df_configurations = get_table("Settings", "qc_parameters") 2765 2766 # Get selected configuration 2767 if config_name is not None: 2768 selected_config = df_configurations.loc[df_configurations["config_name"] == config_name] 2769 2770 elif instrument_id is not None and run_id is not None: 2771 df_runs = get_table(instrument_id, "runs") 2772 config_name = df_runs.loc[df_runs["run_id"] == run_id]["qc_config_id"].values[0] 2773 selected_config = df_configurations.loc[ 2774 df_configurations["config_name"] == config_name] 2775 2776 selected_config.drop(inplace=True, columns=["id", "config_name"]) 2777 2778 # Probably not the most efficient way to do this... 2779 for column in ["intensity_enabled", "library_rt_enabled", "in_run_rt_enabled", "library_mz_enabled"]: 2780 selected_config.loc[selected_config[column] == 1, column] = True 2781 selected_config.loc[selected_config[column] == 0, column] = False 2782 2783 # Return parameters of selected configuration as a tuple 2784 return selected_config 2785 2786 2787def update_qc_configuration(config_name, intensity_dropouts_cutoff, library_rt_shift_cutoff, in_run_rt_shift_cutoff, 2788 library_mz_shift_cutoff, intensity_enabled, library_rt_enabled, in_run_rt_enabled, library_mz_enabled): 2789 2790 """ 2791 Updates parameters for the given QC configuration. 2792 2793 Due to the database schema, booleans are stored as integers: 0 for False and 1 for True. They need to be 2794 cast back to booleans in get_qc_configuration_parameters(). A schema change would remove the bloat. 2795 2796 Args: 2797 config_name (str): 2798 Name of QC configuration 2799 intensity_dropouts_cutoff (int): 2800 Minimum number of internal standard intensity dropouts to constitute a QC fail 2801 library_rt_shift_cutoff (float): 2802 Maximum shift from library RT values to constitute a QC fail 2803 in_run_rt_shift_cutoff (float): 2804 Maximum shift from in-run RT values to constitute a QC fail 2805 library_mz_shift_cutoff (float): 2806 Maximum shift from library m/z values to constitute a QC fail 2807 intensity_enabled (bool): 2808 Enables / disables QC check for intensity dropout cutoffs 2809 library_rt_enabled (bool): 2810 Enables / disables QC check for library RT shifts 2811 in_run_rt_enabled (bool): 2812 Enables / disables QC check for in-run RT shifts 2813 library_mz_enabled (bool): 2814 Enables / disables QC check for library m/z shifts 2815 2816 Returns: 2817 None 2818 """ 2819 2820 # Connect to database 2821 db_metadata, connection = connect_to_database("Settings") 2822 2823 # Get QC parameters table 2824 qc_parameters_table = sa.Table("qc_parameters", db_metadata, autoload=True) 2825 2826 # Prepare insert of user-inputted QC parameters 2827 update_parameters = ( 2828 sa.update(qc_parameters_table) 2829 .where(qc_parameters_table.c.config_name == config_name) 2830 .values(intensity_dropouts_cutoff=intensity_dropouts_cutoff, 2831 library_rt_shift_cutoff=library_rt_shift_cutoff, 2832 in_run_rt_shift_cutoff=in_run_rt_shift_cutoff, 2833 library_mz_shift_cutoff=library_mz_shift_cutoff, 2834 intensity_enabled=intensity_enabled, 2835 library_rt_enabled=library_rt_enabled, 2836 in_run_rt_enabled=in_run_rt_enabled, 2837 library_mz_enabled=library_mz_enabled) 2838 ) 2839 2840 # Execute UPDATE to database, then close the connection 2841 connection.execute(update_parameters) 2842 connection.close() 2843 2844 2845def get_samples_in_run(instrument_id, run_id, sample_type="Both"): 2846 2847 """ 2848 Returns DataFrame of samples for a given instrument run from instrument database. 2849 2850 Args: 2851 instrument_id (str): 2852 Instrument ID 2853 run_id (str): 2854 Instrument run ID (job ID) 2855 sample_type (str): 2856 Sample type, either "Sample" or "Biological Standard" or "Both" 2857 2858 Returns: 2859 DataFrame of sample tables for a given instrument run. 2860 """ 2861 2862 if sample_type == "Sample": 2863 df = get_table(instrument_id, "sample_qc_results") 2864 2865 elif sample_type == "Biological Standard": 2866 df = get_table(instrument_id, "bio_qc_results") 2867 2868 elif sample_type == "Both": 2869 df_samples = get_table(instrument_id, "sample_qc_results") 2870 df_bio_standards = get_table(instrument_id, "bio_qc_results") 2871 df_bio_standards.drop(columns=["biological_standard"], inplace=True) 2872 df = df_bio_standards.append(df_samples, ignore_index=True) 2873 2874 return df.loc[df["run_id"] == run_id] 2875 2876 2877def get_samples_from_csv(instrument_id, run_id, sample_type="Both"): 2878 2879 """ 2880 Returns DataFrame of samples in a given run using CSV files from Google Drive. 2881 2882 CSV files of the run metadata, samples, and biological standards tables are stored 2883 in the ../data/Instrument_ID_Run_ID/csv directory, and removed on job completion. 2884 2885 Args: 2886 instrument_id (str): 2887 Instrument ID 2888 run_id (str): 2889 Instrument run ID (job ID) 2890 sample_type (str): 2891 Sample type, either "Sample" or "Biological Standard" or "Both" 2892 2893 Returns: 2894 DataFrame of samples for a given instrument run. 2895 """ 2896 2897 id = instrument_id.replace(" ", "_") + "_" + run_id 2898 csv_directory = os.path.join(data_directory, id, "csv") 2899 2900 samples_csv = os.path.join(csv_directory, "samples.csv") 2901 bio_standards_csv = os.path.join(csv_directory, "bio_standards.csv") 2902 2903 if sample_type == "Sample": 2904 df = pd.read_csv(samples_csv, index_col=False) 2905 2906 elif sample_type == "Biological Standard": 2907 df = pd.read_csv(bio_standards_csv, index_col=False) 2908 2909 elif sample_type == "Both": 2910 df_samples = pd.read_csv(samples_csv, index_col=False) 2911 df_bio_standards = pd.read_csv(bio_standards_csv, index_col=False) 2912 df_bio_standards.drop(columns=["biological_standard"], inplace=True) 2913 df = df_bio_standards.append(df_samples, ignore_index=True) 2914 2915 df = df.loc[df["run_id"] == run_id] 2916 2917 try: 2918 df.drop(columns=["id"], inplace=True) 2919 finally: 2920 return df 2921 2922 2923def get_next_sample(sample_id, instrument_id, run_id): 2924 2925 """ 2926 Returns sample following the given sample, or None if last sample. 2927 2928 Args: 2929 sample_id (str): 2930 Sample ID 2931 instrument_id (str): 2932 Instrument ID 2933 run_id (str): 2934 Instrument run ID (job ID) 2935 2936 Returns: 2937 str: The next sample in the instrument run after the given sample ID, or None if last sample. 2938 """ 2939 2940 # Get list of samples in run 2941 samples = get_samples_in_run(instrument_id, run_id, "Both")["sample_id"].astype(str).tolist() 2942 2943 # Find sample in list 2944 sample_index = samples.index(sample_id) 2945 next_sample_index = sample_index + 1 2946 2947 # Return next sample 2948 if next_sample_index != len(samples): 2949 return samples[next_sample_index] 2950 else: 2951 return None 2952 2953 2954def get_remaining_samples(instrument_id, run_id): 2955 2956 """ 2957 Returns list of samples remaining in a given instrument run (QC job). 2958 2959 TODO: This function should just return the samples with null values in the "qc_result" column. 2960 The "latest_sample" value in the "runs" table may be unreliable. 2961 2962 Args: 2963 instrument_id (str): 2964 Instrument ID 2965 run_id (str): 2966 Instrument run ID (job ID) 2967 2968 Returns: 2969 list: List of samples remaining in a QC job. 2970 """ 2971 2972 # Get last processed sample in run 2973 df_run = get_instrument_run(instrument_id, run_id) 2974 latest_sample = df_run["latest_sample"].astype(str).values[0] 2975 2976 # Get list of samples in run 2977 samples = get_samples_in_run(instrument_id, run_id, "Both")["sample_id"].astype(str).tolist() 2978 2979 # Return all samples if beginning of run 2980 if latest_sample == "None": 2981 return samples 2982 2983 # Get index of latest sample 2984 latest_sample_index = samples.index(latest_sample) 2985 2986 # Return list of samples starting at latest sample 2987 return samples[latest_sample_index:len(samples)] 2988 2989 2990def get_unprocessed_samples(instrument_id, run_id): 2991 2992 """ 2993 For an active run, returns 1) a list of samples that were not processed due to error / runtime termination, 2994 and 2) the current sample being monitored / processed. 2995 2996 Args: 2997 instrument_id (str): 2998 Instrument ID 2999 run_id (str): 3000 Instrument run ID (job ID) 3001 3002 Returns: 3003 tuple: List of unprocessed samples for the given instrument run, and current sample being monitored / processed. 3004 """ 3005 3006 # Get samples in run 3007 df_samples = get_samples_in_run(instrument_id, run_id, "Both") 3008 3009 # Get list of samples in run 3010 samples = df_samples["sample_id"].astype(str).tolist() 3011 3012 # Construct dictionary of unprocessed samples in instrument run 3013 df_unprocessed_samples = df_samples.loc[df_samples["qc_result"].isnull()] 3014 unprocessed_samples = df_unprocessed_samples["sample_id"].astype(str).tolist() 3015 3016 # Get acquisition path, data files, and data file extension 3017 acquisition_path = get_acquisition_path(instrument_id, run_id) 3018 extension = get_data_file_type(instrument_id) 3019 directory_files = os.listdir(acquisition_path) 3020 data_files = [file.split(".")[0] for file in directory_files if file.split(".")[0] in unprocessed_samples] 3021 3022 # Mark acquired data files 3023 df_unprocessed_samples.loc[ 3024 df_unprocessed_samples["sample_id"].isin(data_files), "found"] = "Found" 3025 unprocessed_samples = df_unprocessed_samples.dropna(subset=["found"])["sample_id"].astype(str).tolist() 3026 3027 # Get current sample 3028 if len(unprocessed_samples) > 0: 3029 current_sample = unprocessed_samples[-1] 3030 del unprocessed_samples[-1] 3031 else: 3032 current_sample = None 3033 3034 # Return as tuple 3035 return unprocessed_samples, current_sample 3036 3037 3038def get_current_sample(instrument_id, run_id): 3039 3040 """ 3041 Returns the current sample being monitored / processed. 3042 3043 TODO: The "latest_sample" is the last sample to be processed. Nomenclature needs to be updated in many places. 3044 3045 Args: 3046 instrument_id (str): 3047 Instrument ID 3048 run_id (str): 3049 Instrument run ID (job ID) 3050 3051 Returns: 3052 str: Current sample being monitored / processed. 3053 """ 3054 3055 # Get latest sample in run 3056 df_run = get_instrument_run(instrument_id, run_id) 3057 latest_sample = df_run["latest_sample"].astype(str).values[0] 3058 3059 # Return second sample if beginning of run 3060 if latest_sample == "None": 3061 return samples[1] 3062 3063 3064def parse_internal_standard_data(instrument_id, run_id, result_type, polarity, load_from, as_json=True): 3065 3066 """ 3067 Parses data from database into JSON-ified DataFrame for samples (as rows) vs. internal standards (as columns). 3068 3069 Data is stored in a column (for example, "retention_time") as a single-record string dict with the following structure: 3070 3071 | Sample | iSTD 1 | iSTD 2 | ... | 3072 | ---------- | ------ | ------ | ... | 3073 | SAMPLE_001 | 1.207 | 1.934 | ... | 3074 3075 These records are concatenated together with this function using pd.DataFrame(), which is 100x faster than pd.concat(). 3076 3077 Args: 3078 instrument_id (str): 3079 Instrument ID 3080 run_id (str): 3081 Instrument run ID (job ID) 3082 result_type (str): 3083 Column in sample_qc_results table to parse (either "retention_time" or "precursor_mz" or "intensity") 3084 polarity (str): 3085 Polarity ("Pos" or "Neg") 3086 load_from (str): 3087 Specifies whether to load data from CSV file (during Google Drive sync of active run) or instrument database 3088 as_json (bool, default True): 3089 Whether to return table as JSON string or as DataFrame 3090 3091 Returns: 3092 DataFrame of samples (rows) vs. internal standards (columns) as JSON string. 3093 """ 3094 3095 # Get relevant QC results table from database 3096 if load_from == "database" or load_from == "processing": 3097 df_samples = get_samples_in_run(instrument_id, run_id, "Sample") 3098 elif load_from == "csv": 3099 df_samples = get_samples_from_csv(instrument_id, run_id, "Sample") 3100 3101 # Filter by polarity 3102 df_samples = df_samples.loc[df_samples["polarity"] == polarity] 3103 sample_ids = df_samples["sample_id"].astype(str).tolist() 3104 3105 # Return None if results are None 3106 if load_from == "processing": 3107 if len(df_samples[result_type].dropna()) == 0: 3108 return None 3109 3110 # Initialize DataFrame with individual records of sample data 3111 results = df_samples[result_type].astype(str).tolist() 3112 results = [ast.literal_eval(result) if result != "None" and result != "nan" else {} for result in results] 3113 df_results = pd.DataFrame(results) 3114 df_results.drop(columns=["Name"], inplace=True) 3115 df_results["Sample"] = sample_ids 3116 3117 # Return DataFrame as JSON string 3118 if as_json: 3119 return df_results.to_json(orient="records") 3120 else: 3121 return df_results 3122 3123 3124def parse_biological_standard_data(instrument_id, run_id, result_type, polarity, biological_standard, load_from, as_json=True): 3125 3126 """ 3127 Parses biological standard data into JSON-ified DataFrame of targeted features (as columns) vs. instrument runs (as rows). 3128 3129 The bio_qc_results table in the instrument database is first filtered by biological standard, chromatography, and polarity. 3130 Then, the sample name is replaced with the instrument run it was associated with. 3131 3132 Data is stored in a column (for example, "intensity") as a single-record string dict with the following structure: 3133 3134 | Name | Metabolite 1 | Metabolite 2 | ... | 3135 | ------------------- | ------------ | ------------ | ... | 3136 | INSTRUMENT_RUN_001 | 13597340 | 53024853 | ... | 3137 3138 These records are concatenated together with this function using pd.DataFrame(), which is 100x faster than pd.concat(). 3139 3140 | Name | Metabolite 1 | Metabolite 2 | ... | 3141 | ------------------- | ------------ | ------------ | ... | 3142 | INSTRUMENT_RUN_001 | 13597340 | 53024853 | ... | 3143 | INSTRUMENT_RUN_002 | 23543246 | 102030406 | ... | 3144 | ... | ... | ... | ... | 3145 3146 Args: 3147 instrument_id (str): 3148 Instrument ID 3149 run_id (str): 3150 Instrument run ID (job ID) 3151 result_type (str): 3152 Column in bio_qc_results table to parse (either "retention_time" or "precursor_mz" or "intensity") 3153 polarity (str): 3154 Polarity ("Pos" or "Neg") 3155 biological_standard (str): 3156 Name of biological standard 3157 load_from (str): 3158 Specifies whether to load data from CSV file (during Google Drive sync of active run) or instrument database 3159 as_json (bool, default True): 3160 Whether to return table as JSON string or as DataFrame 3161 3162 Returns: 3163 JSON-ified DataFrame of targeted features for a biological standard (columns) vs. instrument runs (rows). 3164 """ 3165 3166 # Get relevant QC results table from database 3167 if load_from == "database": 3168 df_samples = get_table(instrument_id, "bio_qc_results") 3169 elif load_from == "csv": 3170 id = instrument_id.replace(" ", "_") + "_" + run_id 3171 bio_standards_csv = os.path.join(data_directory, id, "csv", "bio_standards.csv") 3172 df_samples = pd.read_csv(bio_standards_csv, index_col=False) 3173 3174 # Filter by biological standard type 3175 df_samples = df_samples.loc[df_samples["biological_standard"] == biological_standard] 3176 3177 # Filter by polarity 3178 df_samples = df_samples.loc[df_samples["polarity"] == polarity] 3179 3180 # Filter by instrument 3181 df_runs = get_table(instrument_id, "runs") 3182 chromatography = df_runs.loc[df_runs["run_id"] == run_id]["chromatography"].values[0] 3183 3184 # Filter by chromatography 3185 run_ids = df_runs.loc[df_runs["chromatography"] == chromatography]["run_id"].astype(str).tolist() 3186 df_samples = df_samples.loc[df_samples["run_id"].isin(run_ids)] 3187 run_ids = df_samples["run_id"].astype(str).tolist() 3188 3189 # Initialize DataFrame with individual records of sample data 3190 results = df_samples[result_type].fillna('{}').tolist() 3191 results = [ast.literal_eval(result) if result != "None" and result != "nan" else {} for result in results] 3192 df_results = pd.DataFrame(results) 3193 df_results["Name"] = run_ids 3194 3195 # Return DataFrame as JSON string 3196 if as_json: 3197 return df_results.to_json(orient="records") 3198 else: 3199 return df_results 3200 3201 3202def parse_internal_standard_qc_data(instrument_id, run_id, polarity, result_type, load_from, as_json=True): 3203 3204 """ 3205 Parses QC data into JSON-ified DataFrame for samples (as rows) vs. internal standards (as columns). 3206 3207 The QC DataFrame is stored in the "qc_dataframe" column as a single-record string dict with the following structure: 3208 3209 | Sample | Delta m/z | Delta RT | In-run delta RT | Warnings | Fails | 3210 | ---------- | --------- | -------- | --------------- | -------- | ----- | 3211 | SAMPLE_001 | 0.000001 | 0.001 | 0.00001 | None | None | 3212 3213 These records are concatenated together with this function using pd.DataFrame(), which is 100x faster than pd.concat(). 3214 3215 Args: 3216 instrument_id (str): 3217 Instrument ID 3218 run_id (str): 3219 Instrument run ID (job ID) 3220 polarity (str): 3221 Polarity ("Pos" or "Neg") 3222 result_type (str): 3223 Column in sample_qc_results table to parse (either "retention_time" or "precursor_mz" or "intensity") 3224 load_from (str): 3225 Specifies whether to load data from CSV file (during Google Drive sync of active run) or instrument database 3226 as_json (bool, default True): 3227 Whether to return table as JSON string or as DataFrame 3228 3229 Returns: 3230 JSON-ified DataFrame of QC data for samples (as rows) vs. internal standards (as columns). 3231 """ 3232 3233 # Get relevant QC results table from database 3234 if load_from == "database" or load_from == "processing": 3235 df_samples = get_samples_in_run(instrument_id, run_id, "Sample") 3236 elif load_from == "csv": 3237 df_samples = get_samples_from_csv(instrument_id, run_id, "Sample") 3238 3239 # Filter by polarity 3240 df_samples = df_samples.loc[df_samples["polarity"] == polarity] 3241 3242 # For results DataFrame, each index corresponds to the result type 3243 get_result_index = { 3244 "Delta m/z": 0, 3245 "Delta RT": 1, 3246 "In-run delta RT": 2, 3247 "Intensity dropout": 3, 3248 "Warnings": 4, 3249 "Fails": 5 3250 } 3251 3252 # Get list of results using result type 3253 sample_ids = df_samples["sample_id"].astype(str).tolist() 3254 results = df_samples["qc_dataframe"].fillna('[{}, {}, {}, {}, {}, {}]').astype(str).tolist() 3255 3256 type_index = get_result_index[result_type] 3257 results = [ast.literal_eval(result)[type_index] for result in results] 3258 df_results = pd.DataFrame(results) 3259 df_results.drop(columns=["Name"], inplace=True) 3260 df_results["Sample"] = sample_ids 3261 3262 # Return DataFrame as JSON string 3263 if as_json: 3264 return df_results.to_json(orient="records") 3265 else: 3266 return df_results 3267 3268 3269def get_workspace_users_list(): 3270 3271 """ 3272 Returns a list of users that have access to the MS-AutoQC workspace. 3273 """ 3274 3275 return get_table("Settings", "gdrive_users")["email_address"].astype(str).tolist() 3276 3277 3278def add_user_to_workspace(email_address): 3279 3280 """ 3281 Gives user access to workspace in Google Drive and stores email address in database. 3282 3283 Access is granted by sharing the MS-AutoQC folder in Google Drive with the user's Google account. 3284 3285 Args: 3286 email_address (str): Email address for Google account to grant access to workspace. 3287 3288 Returns: 3289 None 3290 """ 3291 3292 if email_address in get_workspace_users_list(): 3293 return "User already exists" 3294 3295 # Get Google Drive instance 3296 drive = get_drive_instance() 3297 3298 # Get ID of MS-AutoQC folder in Google Drive 3299 gdrive_folder_id = get_drive_folder_id() 3300 3301 if gdrive_folder_id is not None: 3302 # Add user access by updating permissions 3303 folder = drive.CreateFile({"id": gdrive_folder_id}) 3304 permission = folder.InsertPermission({ 3305 "type": "user", 3306 "role": "writer", 3307 "value": email_address}) 3308 3309 # Insert user email address in "gdrive_users" table 3310 db_metadata, connection = connect_to_database("Settings") 3311 gdrive_users_table = sa.Table("gdrive_users", db_metadata, autoload=True) 3312 3313 insert_user_email = gdrive_users_table.insert().values( 3314 {"name": permission["name"], 3315 "email_address": email_address, 3316 "permission_id": permission["id"]}) 3317 3318 connection.execute(insert_user_email) 3319 connection.close() 3320 3321 else: 3322 return "Error" 3323 3324 3325def delete_user_from_workspace(email_address): 3326 3327 """ 3328 Removes user access to workspace in Google Drive and deletes email from database. 3329 3330 Args: 3331 email_address (str): Email address for Google account whose access will to be revoked. 3332 3333 Returns: 3334 None 3335 """ 3336 3337 if email_address not in get_workspace_users_list(): 3338 return "User does not exist" 3339 3340 # Get Google Drive instance 3341 drive = get_drive_instance() 3342 3343 # Get ID of MS-AutoQC folder in Google Drive 3344 gdrive_folder_id = get_drive_folder_id() 3345 3346 if gdrive_folder_id is not None: 3347 # Get permission ID of user from database 3348 folder = drive.CreateFile({"id": gdrive_folder_id}) 3349 df_gdrive_users = get_table("Settings", "gdrive_users") 3350 df_gdrive_users = df_gdrive_users.loc[df_gdrive_users["email_address"] == email_address] 3351 permission_id = df_gdrive_users["permission_id"].astype(str).values[0] 3352 3353 # Delete user access by updating permissions 3354 folder.DeletePermission(permission_id) 3355 3356 # Delete user email address in "gdrive_users" table 3357 db_metadata, connection = connect_to_database("Settings") 3358 gdrive_users_table = sa.Table("gdrive_users", db_metadata, autoload=True) 3359 3360 delete_user_email = ( 3361 sa.delete(gdrive_users_table) 3362 .where((gdrive_users_table.c.email_address == email_address)) 3363 ) 3364 3365 connection.execute(delete_user_email) 3366 connection.close() 3367 3368 else: 3369 return "Error" 3370 3371 3372def get_qc_results(instrument_id, sample_list, is_bio_standard=False): 3373 3374 """ 3375 Returns DataFrame of QC results for a given sample list. 3376 3377 TODO: This function will break if samples in different runs have the same sample ID. Add run ID filter. 3378 3379 Args: 3380 instrument_id (str): 3381 Instrument ID 3382 sample_list (list): 3383 List of samples to query 3384 is_bio_standard (bool, default False): 3385 Whether the list is biological standards (True) or samples (False) 3386 3387 Returns: 3388 DataFrame of QC results for a given sample list. 3389 """ 3390 3391 if len(sample_list) == 0: 3392 return pd.DataFrame() 3393 3394 database = get_database_file(instrument_id=instrument_id, sqlite_conn=True) 3395 engine = sa.create_engine(database) 3396 3397 sample_list = str(sample_list).replace("[", "(").replace("]", ")") 3398 3399 if is_bio_standard: 3400 query = "SELECT sample_id, qc_result FROM bio_qc_results WHERE sample_id in " + sample_list 3401 else: 3402 query = "SELECT sample_id, qc_result FROM sample_qc_results WHERE sample_id in " + sample_list 3403 3404 return pd.read_sql(query, engine) 3405 3406 3407def create_workspace_metadata(): 3408 3409 """ 3410 Creates record in "workspace" table to store various metadata. 3411 """ 3412 3413 db_metadata, connection = connect_to_database("Settings") 3414 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 3415 connection.execute(workspace_table.insert().values({"id": 1})) 3416 connection.close() 3417 3418 3419def get_device_identity(): 3420 3421 """ 3422 Returns device identity (either an Instrument ID or "Shared user"). 3423 """ 3424 3425 return get_table("Settings", "workspace")["instrument_identity"].astype(str).tolist()[0] 3426 3427 3428def set_device_identity(is_instrument_computer, instrument_id): 3429 3430 """ 3431 Indicates whether the user's device is the instrument PC or not. 3432 3433 Args: 3434 is_instrument_computer (bool): 3435 Whether the device is an instrument computer or not 3436 instrument_id (str): 3437 Instrument ID (if None, set to "Shared user") 3438 3439 Returns: 3440 None 3441 """ 3442 3443 if not is_instrument_computer: 3444 instrument_id = "Shared user" 3445 3446 db_metadata, connection = connect_to_database("Settings") 3447 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 3448 3449 update_identity = ( 3450 sa.update(workspace_table) 3451 .where(workspace_table.c.id == 1) 3452 .values( 3453 is_instrument_computer=is_instrument_computer, 3454 instrument_identity=instrument_id 3455 ) 3456 ) 3457 3458 connection.execute(update_identity) 3459 connection.close() 3460 3461 3462def run_is_on_instrument_pc(instrument_id, run_id): 3463 3464 """ 3465 Validates that the current device is the instrument PC on which the run was started. 3466 3467 TODO: Use this function in PlotGeneration and DashWebApp module. 3468 3469 Args: 3470 instrument_id (str): 3471 Instrument ID 3472 run_id (str): 3473 Instrument run ID 3474 3475 Returns: 3476 True if instrument run was started on the current device, and False if not. 3477 """ 3478 3479 instrument_id = get_instrument_run(instrument_id, run_id)["instrument_id"].astype(str).tolist()[0] 3480 device_identity = get_table("Settings", "workspace")["instrument_identity"].astype(str).tolist()[0] 3481 3482 if instrument_id == device_identity: 3483 return True 3484 else: 3485 return False 3486 3487 3488def update_slack_bot_token(slack_bot_token): 3489 3490 """ 3491 Updates Slack bot user OAuth 2.0 token in "workspace" table of Settings database. 3492 3493 For details on the Slack API, see: https://slack.dev/python-slack-sdk/ 3494 3495 Args: 3496 slack_bot_token (str): Slack bot user OAuth token 3497 3498 Returns: 3499 None 3500 """ 3501 3502 db_metadata, connection = connect_to_database("Settings") 3503 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 3504 3505 update_slack_bot_token = ( 3506 sa.update(workspace_table) 3507 .where(workspace_table.c.id == 1) 3508 .values(slack_bot_token=slack_bot_token) 3509 ) 3510 3511 connection.execute(update_slack_bot_token) 3512 connection.close() 3513 3514 3515def get_slack_bot_token(): 3516 3517 """ 3518 Returns Slack bot token stored in "workspace" table of Settings database. 3519 """ 3520 3521 return get_table("Settings", "workspace")["slack_bot_token"].astype(str).values[0] 3522 3523 3524def update_slack_channel(slack_channel, notifications_enabled): 3525 3526 """ 3527 Updates Slack channel registered for notifications in "workspace" table of Settings database. 3528 3529 Args: 3530 slack_channel (str): 3531 Slack channel to post messages to 3532 notifications_enabled (bool): 3533 Whether to send Slack notifications for QC warnings and fails 3534 3535 Returns: 3536 None 3537 """ 3538 3539 db_metadata, connection = connect_to_database("Settings") 3540 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 3541 3542 update_slack_channel = ( 3543 sa.update(workspace_table) 3544 .where(workspace_table.c.id == 1) 3545 .values( 3546 slack_channel=slack_channel.replace("#", ""), 3547 slack_enabled=notifications_enabled) 3548 ) 3549 3550 connection.execute(update_slack_channel) 3551 connection.close() 3552 3553 3554def get_slack_channel(): 3555 3556 """ 3557 Returns Slack channel registered for notifications. 3558 """ 3559 3560 return get_table("Settings", "workspace")["slack_channel"].astype(str).values[0] 3561 3562 3563def get_slack_notifications_toggled(): 3564 3565 """ 3566 Returns Slack notification toggle setting. 3567 """ 3568 3569 try: 3570 return get_table("Settings", "workspace")["slack_enabled"].astype(int).tolist()[0] 3571 except: 3572 return None 3573 3574 3575def get_email_notifications_list(as_string=False): 3576 3577 """ 3578 Returns list of emails registered for email notifications for QC warnings and fails. 3579 3580 Args: 3581 as_string (bool, default False): 3582 Whether to return the list as a string (for Gmail API) or as list object (for display in Settings page) 3583 3584 Returns: 3585 List of emails registered for QC warning/fail notifications. 3586 """ 3587 3588 email_list = get_table("Settings", "email_notifications")["email_address"].astype(str).tolist() 3589 3590 if as_string: 3591 email_list_string = "" 3592 3593 for email in email_list: 3594 email_list_string += email 3595 if email != email_list[-1]: 3596 email_list_string += "," 3597 3598 return email_list_string 3599 3600 else: 3601 return email_list 3602 3603 3604def register_email_for_notifications(email_address): 3605 3606 """ 3607 Inserts email address into "email_notifications" table in Settings database. 3608 3609 Args: 3610 email_address (str): Email address to register for notifications. 3611 3612 Returns: 3613 None 3614 """ 3615 3616 db_metadata, connection = connect_to_database("Settings") 3617 email_notifications_table = sa.Table("email_notifications", db_metadata, autoload=True) 3618 3619 insert_email_address = email_notifications_table.insert().values({ 3620 "email_address": email_address 3621 }) 3622 3623 connection.execute(insert_email_address) 3624 connection.close() 3625 3626 3627def delete_email_from_notifications(email_address): 3628 3629 """ 3630 Deletes email address from "email_notifications" table in Settings database. 3631 3632 Args: 3633 email_address (str): Email address to unsubscribe from notifications. 3634 3635 Returns: 3636 None 3637 """ 3638 3639 db_metadata, connection = connect_to_database("Settings") 3640 email_notifications_table = sa.Table("email_notifications", db_metadata, autoload=True) 3641 3642 delete_email_address = ( 3643 sa.delete(email_notifications_table) 3644 .where((email_notifications_table.c.email_address == email_address)) 3645 ) 3646 3647 connection.execute(delete_email_address) 3648 connection.close() 3649 3650 3651def get_completed_samples_count(instrument_id, run_id, status): 3652 3653 """ 3654 Returns tuple containing count for completed samples and total samples in a given instrument run. 3655 3656 Args: 3657 instrument_id (str): 3658 Instrument ID 3659 run_id (str): 3660 Instrument run ID (job ID) 3661 status (str): 3662 Instrument run (QC job) status, either "Active" or "Complete" 3663 3664 Returns: 3665 Tuple with number of completed samples and total samples for a given instrument run. 3666 """ 3667 3668 if status == "Active" and sync_is_enabled(): 3669 if get_device_identity() == instrument_id: 3670 df_instrument_run = get_instrument_run(instrument_id, run_id) 3671 else: 3672 df_instrument_run = get_instrument_run_from_csv(instrument_id, run_id) 3673 else: 3674 df_instrument_run = get_instrument_run(instrument_id, run_id) 3675 3676 completed = df_instrument_run["completed"].astype(int).tolist()[0] 3677 total_samples = df_instrument_run["samples"].astype(int).tolist()[0] 3678 return (completed, total_samples) 3679 3680 3681def get_run_progress(instrument_id, run_id, status): 3682 3683 """ 3684 Returns progress of instrument run as a percentage of samples completed. 3685 3686 Args: 3687 instrument_id (str): 3688 Instrument ID 3689 run_id (str): 3690 Instrument run ID (job ID) 3691 status (str): 3692 Instrument run (QC job) status, either "Active" or "Complete" 3693 3694 Returns: 3695 float: Percent of samples processed for the given instrument run. 3696 """ 3697 3698 completed, total_samples = get_completed_samples_count(instrument_id, run_id, status) 3699 percent_complete = (completed / total_samples) * 100 3700 return round(percent_complete, 1) 3701 3702 3703def update_sample_counters_for_run(instrument_id, run_id, latest_sample): 3704 3705 """ 3706 Increments "completed" count, as well as "pass" and "fail" counts accordingly. 3707 3708 TODO: The "latest_sample" is the last sample to be processed / completed. 3709 Nomenclature should be updated for clarity. 3710 3711 Args: 3712 instrument_id (str): 3713 Instrument ID 3714 run_id (str): 3715 Instrument run ID (job ID) 3716 latest_sample (str): 3717 Last sample to be processed 3718 3719 Returns: 3720 None 3721 """ 3722 3723 df = get_samples_in_run(instrument_id, run_id, "Both") 3724 3725 try: 3726 passes = int(df["qc_result"].value_counts()["Pass"]) 3727 except: 3728 passes = 0 3729 3730 try: 3731 warnings = int(df["qc_result"].value_counts()["Warning"]) 3732 except: 3733 warnings = 0 3734 3735 try: 3736 fails = int(df["qc_result"].value_counts()["Fail"]) 3737 except: 3738 fails = 0 3739 3740 completed = passes + fails 3741 3742 db_metadata, connection = connect_to_database(instrument_id) 3743 instrument_runs_table = sa.Table("runs", db_metadata, autoload=True) 3744 3745 update_status = ( 3746 sa.update(instrument_runs_table) 3747 .where(instrument_runs_table.c.run_id == run_id) 3748 .values( 3749 completed=completed, 3750 passes=passes, 3751 fails=fails, 3752 latest_sample=latest_sample 3753 ) 3754 ) 3755 3756 connection.execute(update_status) 3757 connection.close() 3758 3759 3760def mark_run_as_completed(instrument_id, run_id): 3761 3762 """ 3763 Marks instrument run status as completed. 3764 3765 Args: 3766 instrument_id (str): 3767 Instrument ID 3768 run_id (str): 3769 Instrument run ID (job ID) 3770 3771 Returns: 3772 None 3773 """ 3774 3775 db_metadata, connection = connect_to_database(instrument_id) 3776 instrument_runs_table = sa.Table("runs", db_metadata, autoload=True) 3777 3778 update_status = ( 3779 sa.update(instrument_runs_table) 3780 .where(instrument_runs_table.c.run_id == run_id) 3781 .values(status="Complete") 3782 ) 3783 3784 connection.execute(update_status) 3785 connection.close() 3786 3787 3788def skip_sample(instrument_id, run_id): 3789 3790 """ 3791 Skips sample by setting "latest_sample" value for instrument run to the next sample. 3792 3793 This function was used after restarting the acquisition listener when MS-DIAL got stuck processing a corrupted file. 3794 Now that MS-DIAL runs in the background, it is deprecated and should be removed. 3795 3796 Args: 3797 instrument_id (str): 3798 Instrument ID 3799 run_id (str): 3800 Instrument run ID (job ID) 3801 3802 Returns: 3803 None 3804 """ 3805 3806 # Get next sample 3807 samples = get_remaining_samples(instrument_id, run_id) 3808 next_sample = samples[1] 3809 3810 # Set latest sample to next sample 3811 db_metadata, connection = connect_to_database(instrument_id) 3812 instrument_runs_table = sa.Table("runs", db_metadata, autoload=True) 3813 3814 connection.execute(( 3815 sa.update(instrument_runs_table) 3816 .where(instrument_runs_table.c.run_id == run_id) 3817 .values(latest_sample=next_sample) 3818 )) 3819 3820 connection.close() 3821 3822 3823def store_pid(instrument_id, run_id, pid): 3824 3825 """ 3826 Stores acquisition listener subprocess ID to allow for checkup and termination. 3827 3828 Args: 3829 instrument_id (str): 3830 Instrument ID 3831 run_id (str): 3832 Instrument run ID (job ID) 3833 pid (str): 3834 Process ID for acquisition listener subprocess 3835 3836 Returns: 3837 None 3838 """ 3839 3840 db_metadata, connection = connect_to_database(instrument_id) 3841 instrument_runs_table = sa.Table("runs", db_metadata, autoload=True) 3842 3843 update_pid = ( 3844 sa.update(instrument_runs_table) 3845 .where(instrument_runs_table.c.run_id == run_id) 3846 .values(pid=pid) 3847 ) 3848 3849 connection.execute(update_pid) 3850 connection.close() 3851 3852 3853def get_pid(instrument_id, run_id): 3854 3855 """ 3856 Retrieves acquisition listener process ID from "runs" table in Settings database. 3857 3858 Args: 3859 instrument_id (str): 3860 Instrument ID 3861 run_id (str): 3862 Instrument run ID (job ID) 3863 3864 Returns: 3865 None 3866 """ 3867 3868 try: 3869 return get_instrument_run(instrument_id, run_id)["pid"].astype(int).tolist()[0] 3870 except: 3871 return None 3872 3873 3874def upload_to_google_drive(file_dict): 3875 3876 """ 3877 Uploads files to MS-AutoQC folder in Google Drive. 3878 3879 Args: 3880 file_dict (dict): 3881 Dictionary with key-value structure { filename : file path } 3882 3883 Returns: 3884 dict: Dictionary with key-value structure { filename : Google Drive ID } 3885 """ 3886 3887 # Get Google Drive instance 3888 drive = get_drive_instance() 3889 3890 # Get Google Drive ID for the MS-AutoQC folder 3891 folder_id = get_drive_folder_id() 3892 3893 # Store Drive ID's of uploaded file(s) 3894 drive_ids = {} 3895 3896 # Validate Google Drive folder ID 3897 if folder_id is not None: 3898 if folder_id != "None" and folder_id != "": 3899 3900 # Upload each file to Google Drive 3901 for filename in file_dict.keys(): 3902 if os.path.exists(file_dict[filename]): 3903 metadata = { 3904 "title": filename, 3905 "parents": [{"id": folder_id}], 3906 } 3907 file = drive.CreateFile(metadata=metadata) 3908 file.SetContentFile(file_dict[filename]) 3909 file.Upload() 3910 3911 drive_ids[file["title"]] = file["id"] 3912 3913 return drive_ids 3914 3915 3916def upload_qc_results(instrument_id, run_id): 3917 3918 """ 3919 Uploads QC results for a given instrument run to Google Drive as CSV files. 3920 3921 Args: 3922 instrument_id (str): 3923 Instrument ID 3924 run_id (str): 3925 Instrument run ID (job ID) 3926 3927 Returns: 3928 None 3929 """ 3930 3931 id = instrument_id.replace(" ", "_") + "_" + run_id 3932 3933 # Get Google Drive instance 3934 drive = get_drive_instance() 3935 3936 # Define file names and file paths 3937 run_filename = "run.csv" 3938 samples_csv_filename = "samples.csv" 3939 bio_standards_csv_filename = "bio_standards.csv" 3940 3941 run_directory = os.path.join(data_directory, id) 3942 if not os.path.exists(run_directory): 3943 os.makedirs(run_directory) 3944 3945 csv_directory = os.path.join(run_directory, "csv") 3946 if not os.path.exists(csv_directory): 3947 os.makedirs(csv_directory) 3948 3949 run_csv_path = os.path.join(csv_directory, run_filename) 3950 samples_csv_path = os.path.join(csv_directory, samples_csv_filename) 3951 bio_standards_csv_path = os.path.join(csv_directory, bio_standards_csv_filename) 3952 3953 # Convert sample and biological standard QC results from database into CSV files 3954 df_run = get_instrument_run(instrument_id, run_id) 3955 df_run.to_csv(run_csv_path, index=False) 3956 3957 df_samples = get_samples_in_run(instrument_id=instrument_id, run_id=run_id, sample_type="Sample") 3958 if len(df_samples) > 0: 3959 df_samples.to_csv(samples_csv_path, index=False) 3960 3961 df_bio_standards = get_table(instrument_id, "bio_qc_results") 3962 if len(df_bio_standards) > 0: 3963 df_bio_standards.to_csv(bio_standards_csv_path, index=False) 3964 3965 # Compress CSV files into a ZIP archive for faster upload 3966 zip_filename = id + ".zip" 3967 zip_file_path = zip_csv_files( 3968 input_directory=csv_directory, output_directory_and_name=os.path.join(run_directory, id)) 3969 3970 zip_file = {zip_filename: zip_file_path} 3971 3972 # Get Google Drive ID for the CSV files ZIP archive 3973 zip_file_drive_id = get_instrument_run(instrument_id, run_id)["drive_id"].tolist()[0] 3974 3975 # Update existing ZIP archive in Google Drive 3976 if zip_file_drive_id is not None: 3977 3978 file = drive.CreateFile({ 3979 "id": zip_file_drive_id, 3980 "title": zip_filename, 3981 }) 3982 3983 # Execute upload 3984 file.SetContentFile(zip_file_path) 3985 file.Upload() 3986 3987 # If zip file Drive ID does not exist, 3988 else: 3989 3990 # Upload CSV files ZIP archive to Google Drive for first time 3991 drive_id = upload_to_google_drive(zip_file)[zip_filename] 3992 3993 # Store Drive ID of ZIP file in local database 3994 db_metadata, connection = connect_to_database(instrument_id) 3995 runs_table = sa.Table("runs", db_metadata, autoload=True) 3996 3997 connection.execute(( 3998 sa.update(runs_table) 3999 .where(runs_table.c.run_id == run_id) 4000 .values(drive_id=drive_id) 4001 )) 4002 4003 connection.close() 4004 4005 4006def download_qc_results(instrument_id, run_id): 4007 4008 """ 4009 Downloads CSV files of QC results from Google Drive and stores in /data directory. 4010 4011 Args: 4012 instrument_id (str): 4013 Instrument ID 4014 run_id (str): 4015 Instrument run ID (job ID) 4016 4017 Returns: 4018 tuple: Paths of run.csv, samples.csv, and bio_standards.csv, respectively. 4019 """ 4020 4021 id = instrument_id.replace(" ", "_") + "_" + run_id 4022 4023 # Get Google Drive instance 4024 drive = get_drive_instance() 4025 4026 # Initialize directories 4027 run_directory = os.path.join(data_directory, id) 4028 if not os.path.exists(run_directory): 4029 os.makedirs(run_directory) 4030 4031 csv_directory = os.path.join(run_directory, "csv") 4032 if not os.path.exists(csv_directory): 4033 os.makedirs(csv_directory) 4034 4035 # Zip file 4036 zip_filename = id + ".zip" 4037 zip_file_path = os.path.join(run_directory, zip_filename) 4038 4039 # Get Google Drive folder ID 4040 gdrive_folder_id = get_drive_folder_id() 4041 4042 # Find and download ZIP archive of CSV files from Google Drive 4043 for file in drive.ListFile({"q": "'" + gdrive_folder_id + "' in parents and trashed=false"}).GetList(): 4044 if file["title"] == zip_filename: 4045 os.chdir(run_directory) 4046 file.GetContentFile(file["title"]) 4047 os.chdir(root_directory) 4048 break 4049 4050 # Unzip archive 4051 unzip_csv_files(zip_file_path, csv_directory) 4052 4053 # Define and return file paths 4054 run_csv = os.path.join(csv_directory, "run.csv") 4055 samples_csv = os.path.join(csv_directory, "samples.csv") 4056 bio_standards_csv_file = os.path.join(csv_directory, "bio_standards.csv") 4057 4058 return (run_csv, samples_csv, bio_standards_csv_file) 4059 4060 4061def get_drive_folder_id(): 4062 4063 """ 4064 Returns Google Drive ID for the MS-AutoQC folder (found in user's root Drive directory). 4065 """ 4066 4067 return get_table("Settings", "workspace")["gdrive_folder_id"].values[0] 4068 4069 4070def get_database_drive_id(instrument_id): 4071 4072 """ 4073 Returns Google Drive ID for a given instrument's database. 4074 4075 Args: 4076 instrument_id (str): Instrument ID 4077 4078 Returns: 4079 str: Google Drive ID for the instrument database ZIP archive. 4080 """ 4081 4082 df = get_table("Settings", "instruments") 4083 return df.loc[df["name"] == instrument_id]["drive_id"].values[0] 4084 4085 4086def upload_database(instrument_id, sync_settings=False): 4087 4088 """ 4089 Uploads database file and methods directory to Google Drive as ZIP archives. 4090 4091 Args: 4092 instrument_id (str): 4093 Instrument ID for the instrument database to upload 4094 sync_settings (bool, default False): 4095 Whether to upload methods directory as well 4096 4097 Returns: 4098 str: Timestamp upon upload completion. 4099 """ 4100 4101 # Get Google Drive ID's for the MS-AutoQC folder and database file 4102 gdrive_folder_id = get_drive_folder_id() 4103 instrument_db_file_id = get_database_drive_id(instrument_id) 4104 4105 # Get Google Drive instance 4106 drive = get_drive_instance() 4107 4108 # Vacuum database to optimize size 4109 execute_vacuum(instrument_id) 4110 4111 # Upload methods directory to Google Drive 4112 if sync_settings == True: 4113 upload_methods() 4114 4115 # Upload database to Google Drive 4116 if gdrive_folder_id is not None and instrument_db_file_id is not None: 4117 4118 # Upload zipped database 4119 zip_database(instrument_id=instrument_id) 4120 file = drive.CreateFile( 4121 {"id": instrument_db_file_id, "title": instrument_id.replace(" ", "_") + ".zip"}) 4122 file.SetContentFile(get_database_file(instrument_id, zip=True)) 4123 file.Upload() 4124 4125 # Save modifiedDate of database file 4126 remember_last_modified(database=instrument_id, modified_date=file["modifiedDate"]) 4127 4128 else: 4129 return None 4130 4131 return time.strftime("%H:%M:%S") 4132 4133 4134def download_database(instrument_id, sync_settings=False): 4135 4136 """ 4137 Downloads instrument database ZIP file from Google Drive. 4138 4139 This function is called when accessing an instrument database from a device other than the given instrument. 4140 4141 Args: 4142 instrument_id (str): 4143 Instrument ID for the instrument database to download 4144 sync_settings (bool, default False): 4145 Whether to download methods directory as well 4146 4147 Returns: 4148 str: Timestamp upon download completion. 4149 """ 4150 4151 db_zip_file = instrument_id.replace(" ", "_") + ".zip" 4152 4153 # If the database was not modified by another instrument, skip download (for instruments only) 4154 if not database_was_modified(instrument_id): 4155 return None 4156 4157 # Get Google Drive instance 4158 drive = get_drive_instance() 4159 4160 # Get Google Drive ID's for the MS-AutoQC folder and database file 4161 gdrive_folder_id = get_drive_folder_id() 4162 instrument_db_file_id = get_instrument(instrument_id)["drive_id"].values[0] 4163 4164 # If Google Drive folder is found, look for database next 4165 if gdrive_folder_id is not None and instrument_db_file_id is not None: 4166 4167 # Download newly added / modified MSP files in MS-AutoQC > methods 4168 if sync_settings == True: 4169 download_methods(skip_check=True) 4170 4171 try: 4172 for file in drive.ListFile({"q": "'" + gdrive_folder_id + "' in parents and trashed=false"}).GetList(): 4173 if file["title"] == db_zip_file: 4174 4175 # Download and unzip database 4176 os.chdir(data_directory) # Change to data directory 4177 file.GetContentFile(file["title"]) # Download database and get file ID 4178 os.chdir(root_directory) # Return to root directory 4179 unzip_database(instrument_id=instrument_id) # Unzip database 4180 4181 # Save modifiedDate of database file 4182 remember_last_modified(database=instrument_id, modified_date=file["modifiedDate"]) 4183 4184 except Exception as error: 4185 print("Error downloading database from Google Drive:", error) 4186 return None 4187 else: 4188 return None 4189 4190 return time.strftime("%H:%M:%S") 4191 4192 4193def upload_methods(): 4194 4195 """ 4196 Uploads methods directory ZIP archive to Google Drive. 4197 """ 4198 4199 df_workspace = get_table("Settings", "workspace") 4200 methods_zip_file_id = df_workspace["methods_zip_file_id"].values[0] 4201 4202 # Vacuum database to optimize size 4203 execute_vacuum("Settings") 4204 4205 # Get Google Drive instance 4206 drive = get_drive_instance() 4207 4208 # Upload methods ZIP archive to Google Drive 4209 if methods_zip_file_id is not None: 4210 4211 # Upload zipped database 4212 methods_zip_file = zip_methods() 4213 file = drive.CreateFile({"id": methods_zip_file_id, "title": "methods.zip"}) 4214 file.SetContentFile(methods_zip_file) 4215 file.Upload() 4216 4217 # Save modifiedDate of methods ZIP file 4218 remember_last_modified(database="Settings", modified_date=file["modifiedDate"]) 4219 4220 else: 4221 return None 4222 4223 4224def download_methods(skip_check=False): 4225 4226 """ 4227 Downloads methods directory ZIP archive from Google Drive. 4228 4229 Args: 4230 skip_check (bool, default False): If True, skips checking whether database was modified 4231 4232 Returns: 4233 None 4234 """ 4235 4236 # If the database was not modified by another instrument, skip download (for instruments only) 4237 if not skip_check: 4238 if not database_was_modified("Settings"): 4239 return None 4240 4241 # Get device identity 4242 instrument_bool = is_instrument_computer() 4243 device_identity = get_device_identity() 4244 4245 # Get MS-DIAL directory 4246 try: 4247 msdial_directory = get_msdial_directory() 4248 except: 4249 msdial_directory = None 4250 4251 # Get Google Drive instance 4252 drive = get_drive_instance() 4253 4254 # Get Google Drive folder ID 4255 gdrive_folder_id = get_drive_folder_id() 4256 4257 try: 4258 # Download and unzip methods directory 4259 for file in drive.ListFile({"q": "'" + gdrive_folder_id + "' in parents and trashed=false"}).GetList(): 4260 if file["title"] == "methods.zip": 4261 os.chdir(data_directory) # Change to data directory 4262 file.GetContentFile(file["title"]) # Download methods ZIP archive 4263 os.chdir(root_directory) # Return to root directory 4264 unzip_methods() # Unzip methods directory 4265 4266 # Save modifiedDate of methods directory 4267 remember_last_modified(database="Settings", modified_date=file["modifiedDate"]) 4268 4269 except Exception as error: 4270 print("Error downloading methods from Google Drive:", error) 4271 return None 4272 4273 # Update MS-DIAL directory 4274 update_msdial_directory(msdial_directory) 4275 4276 # Update user device identity 4277 set_device_identity(is_instrument_computer=instrument_bool, instrument_id=device_identity) 4278 return time.strftime("%H:%M:%S") 4279 4280 4281def remember_last_modified(database, modified_date): 4282 4283 """ 4284 Stores last modified time of database file in Google Drive. 4285 4286 This function is called after file upload, and used for comparison before download. 4287 4288 Args: 4289 database (str): 4290 Name of database (either Instrument ID or "Settings") 4291 modified_date (str): 4292 Modified date of file uploaded to Google Drive 4293 4294 Returns: 4295 None 4296 """ 4297 4298 db_metadata, connection = connect_to_database("Settings") 4299 instruments_table = sa.Table("instruments", db_metadata, autoload=True) 4300 workspace_table = sa.Table("workspace", db_metadata, autoload=True) 4301 4302 if database == "Settings": 4303 connection.execute(( 4304 sa.update(workspace_table) 4305 .where((workspace_table.c.id == 1)) 4306 .values(methods_last_modified=modified_date) 4307 )) 4308 else: 4309 connection.execute(( 4310 sa.update(instruments_table) 4311 .where((instruments_table.c.name == database)) 4312 .values(last_modified=modified_date) 4313 )) 4314 4315 connection.close() 4316 4317 4318def database_was_modified(database_name): 4319 4320 """ 4321 Returns True if workspace file was modified by another instrument PC in Google Drive, and False if not. 4322 4323 Args: 4324 database_name (str): Name of database 4325 4326 Returns: 4327 Returns True if workspace file was modified by another instrument PC in Google Drive, and False if not. 4328 """ 4329 4330 # Get Google Drive folder ID from database 4331 gdrive_folder_id = get_drive_folder_id() 4332 4333 # Compare "last modified" values 4334 if database_name == "Settings": 4335 local_last_modified = get_table("Settings", "workspace")["methods_last_modified"].values[0] 4336 filename = "methods.zip" 4337 else: 4338 local_last_modified = get_instrument(database_name)["last_modified"].values[0] 4339 filename = database_name.replace(" ", "_") + ".zip" 4340 4341 # Get Google Drive instance 4342 drive = get_drive_instance() 4343 4344 drive_last_modified = None 4345 for file in drive.ListFile({"q": "'" + gdrive_folder_id + "' in parents and trashed=false"}).GetList(): 4346 if file["title"] == filename: 4347 drive_last_modified = file["modifiedDate"] 4348 break 4349 4350 if local_last_modified == drive_last_modified: 4351 return False 4352 else: 4353 return True 4354 4355 4356def send_sync_signal(folder_id): 4357 4358 """ 4359 Uploads empty file to signal that an instrument PC is syncing to Google Drive. 4360 4361 TODO: This method is deprecated. Please remove if no plans for usage. 4362 4363 Args: 4364 folder_id (str): Google Drive folder ID 4365 4366 Returns: 4367 bool: True if sync signal was sent, False if not. 4368 """ 4369 4370 # Get Google Drive instance 4371 drive = get_drive_instance() 4372 4373 try: 4374 drive.CreateFile(metadata={"title": "Syncing", "parents": [{"id": folder_id}]}).Upload() 4375 return True 4376 except: 4377 return False 4378 4379 4380def safe_to_upload(folder_id): 4381 4382 """ 4383 Returns False if another device is currently uploading to Google Drive, else True. 4384 4385 TODO: This method is deprecated. Please remove if no plans for usage. 4386 4387 Args: 4388 folder_id (str): Google Drive folder ID 4389 4390 Returns: 4391 bool: False if another device is currently uploading to Google Drive, True if not. 4392 """ 4393 4394 # Get Google Drive instance 4395 drive = get_drive_instance() 4396 4397 for file in drive.ListFile({"q": "'" + folder_id + "' in parents and trashed=false"}).GetList(): 4398 if file["title"] == "Syncing": 4399 return False 4400 4401 return True 4402 4403 4404def remove_sync_signal(folder_id): 4405 4406 """ 4407 Removes empty signal file to signal that an instrument PC has completed syncing to Google Drive. 4408 4409 TODO: This method is deprecated. Please remove if no plans for usage. 4410 4411 Args: 4412 folder_id (str): Google Drive folder ID 4413 4414 Returns: 4415 bool: True if sync signal was removed, False if not. 4416 """ 4417 4418 # Get Google Drive instance 4419 drive = get_drive_instance() 4420 4421 try: 4422 for file in drive.ListFile({"q": "'" + folder_id + "' in parents and trashed=false"}).GetList(): 4423 if file["title"] == "Syncing": 4424 file.Delete() 4425 return True 4426 except: 4427 return False 4428 4429 4430def delete_active_run_csv_files(instrument_id, run_id): 4431 4432 """ 4433 Checks for and deletes CSV files from Google Drive at the end of an active instrument run. 4434 4435 Args: 4436 instrument_id (str): 4437 Instrument ID 4438 run_id (str): 4439 Instrument run ID (job ID) 4440 4441 Returns: 4442 None 4443 """ 4444 4445 id = instrument_id.replace(" ", "_") + "_" + run_id 4446 4447 # Find zip archive of CSV files in Google Drive and delete it 4448 drive = get_drive_instance() 4449 gdrive_folder_id = get_drive_folder_id() 4450 4451 if gdrive_folder_id is not None: 4452 drive_file_list = drive.ListFile({"q": "'" + gdrive_folder_id + "' in parents and trashed=false"}).GetList() 4453 for file in drive_file_list: 4454 if file["title"] == id + ".zip": 4455 file.Delete() 4456 break 4457 4458 # Delete Drive ID from database 4459 db_metadata, connection = connect_to_database(instrument_id) 4460 runs_table = sa.Table("runs", db_metadata, autoload=True) 4461 4462 connection.execute(( 4463 sa.update(runs_table) 4464 .where(runs_table.c.run_id == run_id) 4465 .values(drive_id=None) 4466 )) 4467 4468 connection.close() 4469 4470 4471def sync_on_run_completion(instrument_id, run_id): 4472 4473 """ 4474 Syncs database with Google Drive at the end of an active instrument run. 4475 4476 Performs the following actions: 4477 1. Upload database to Google Drive 4478 2. Delete active run CSV files 4479 4480 Args: 4481 instrument_id (str): 4482 Instrument ID 4483 run_id (str): 4484 Instrument run ID (job ID) 4485 4486 Returns: 4487 None 4488 """ 4489 4490 # Get Google Drive instance and folder ID 4491 drive = get_drive_instance() 4492 gdrive_folder_id = get_drive_folder_id() 4493 4494 # Upload database to Google Drive 4495 try: 4496 upload_database(instrument_id) 4497 except Exception as error: 4498 print("sync_on_run_completion() – Error uploading database during sync", error) 4499 return None 4500 4501 # Delete active run CSV files 4502 try: 4503 delete_active_run_csv_files(instrument_id, run_id) 4504 except Exception as error: 4505 print("sync_on_run_completion() – Error deleting CSV files after sync", error) 4506 return None 4507 4508 4509def get_data_file_type(instrument_id): 4510 4511 """ 4512 Returns expected data file extension based on instrument vendor type. 4513 4514 TODO: Modify this function as needed when adding support for other instrument vendors. 4515 4516 Args: 4517 instrument_id (str): Instrument ID 4518 4519 Returns: 4520 Data file extension for instrument vendor. 4521 """ 4522 4523 engine = sa.create_engine(settings_database) 4524 df_instruments = pd.read_sql("SELECT * FROM instruments WHERE name='" + instrument_id + "'", engine) 4525 vendor = df_instruments["vendor"].astype(str).tolist()[0] 4526 4527 if vendor == "Thermo Fisher": 4528 return "raw" 4529 elif vendor == "Agilent": 4530 return "d" 4531 elif vendor == "Bruker": 4532 return "baf" 4533 elif vendor == "Waters": 4534 return "raw" 4535 elif vendor == "Sciex": 4536 return "wiff2" 4537 4538 4539def is_completed_run(instrument_id, run_id): 4540 4541 """ 4542 Returns True if the given QC job is for a completed run, and False if for an active run. 4543 4544 Args: 4545 instrument_id (str): 4546 Instrument ID 4547 run_id (str): 4548 Instrument run ID (job ID) 4549 4550 Returns: 4551 bool: True if the job is for a completed run, and False if job is for an active run. 4552 """ 4553 4554 try: 4555 job_type = get_instrument_run(instrument_id, run_id)["job_type"].astype(str).values[0] 4556 if job_type == "completed": 4557 return True 4558 else: 4559 return False 4560 except: 4561 print("Could not get MS-AutoQC job type.") 4562 traceback.print_exc() 4563 return False 4564 4565 4566def delete_temp_directory(instrument_id, run_id): 4567 4568 """ 4569 Deletes temporary data file directory in local app directory. 4570 4571 This function is called at the end of an instrument run (QC job). 4572 4573 Args: 4574 instrument_id (str): 4575 Instrument ID 4576 run_id (str): 4577 Instrument run ID (job ID) 4578 4579 Returns: 4580 None 4581 """ 4582 4583 # Delete temporary data file directory 4584 try: 4585 id = instrument_id.replace(" ", "_") + "_" + run_id 4586 temp_directory = os.path.join(data_directory, id) 4587 if os.path.exists(temp_directory): 4588 shutil.rmtree(temp_directory) 4589 except: 4590 print("Could not delete temporary data directory.") 4591 4592 4593def pipeline_valid(module=None): 4594 4595 """ 4596 Validates that MSConvert and MS-DIAL dependencies are installed. 4597 4598 This function is called during job setup validation. 4599 4600 Args: 4601 module (str, default None): If specified, only validates given module. 4602 4603 Returns: 4604 bool: Whether MSConvert.exe and MsdialConsoleApp.exe exist. 4605 """ 4606 4607 try: 4608 msconvert_installed = os.path.exists(os.path.join(get_msconvert_directory(), "msconvert.exe")) 4609 except: 4610 msconvert_installed = False 4611 4612 try: 4613 msdial_installed = os.path.exists(os.path.join(get_msdial_directory(), "MsdialConsoleApp.exe")) 4614 except: 4615 msdial_installed = False 4616 4617 if module == "msdial": 4618 return msdial_installed 4619 elif module == "msconvert": 4620 return msconvert_installed 4621 else: 4622 return msconvert_installed and msdial_installed 4623 4624 4625def send_email(subject, message_body): 4626 4627 """ 4628 Sends email using Google authenticated credentials. 4629 4630 This function is called for QC warnings and fails if: 4631 1. Google Drive sync is enabled 4632 2. Email addresses are registered for notifications 4633 4634 Args: 4635 subject (str): 4636 Subject of email 4637 message_body (str): 4638 Body of email 4639 4640 Returns: 4641 On success, an email.message.EmailMessage object. 4642 """ 4643 4644 try: 4645 credentials = google_auth.load_credentials_from_file(alt_credentials)[0] 4646 4647 service = build("gmail", "v1", credentials=credentials) 4648 message = EmailMessage() 4649 4650 message.set_content(message_body) 4651 4652 message["Subject"] = subject 4653 message["To"] = get_email_notifications_list(as_string=True) 4654 4655 encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() 4656 create_message = { "raw": encoded_message } 4657 4658 send_message = (service.users().messages().send(userId="me", body=create_message).execute()) 4659 4660 except Exception as error: 4661 traceback.print_exc() 4662 send_message = None 4663 4664 return send_message
The functions defined below operate on two database types:
- One storing instrument run metadata, sample QC results, and biological standard QC results
- The other storing instrument metadata, workspace settings for workspace access, chromatography methods, biological standards, QC configurations, and MS-DIAL configurations
In addition, this file also contains methods for syncing data and settings with Google Drive. To get an overview of all functions, please visit the documentation on https://czbiohub.github.io/MS-AutoQC.
51def get_database_file(instrument_id, sqlite_conn=False, zip=False): 52 53 """ 54 Returns database file for a given instrument ID. 55 56 Args: 57 instrument_id (str): 58 Instrument ID that specifies which database file to retrieve 59 sqlite_conn (bool, default False): 60 Whether to receive the path for establishing a SQLite connection 61 zip (bool, default False): 62 Whether to receive the path of the database file in the local app directory 63 64 Returns: 65 str: Path for the database file 66 """ 67 68 if zip: 69 filename = instrument_id.replace(" ", "_") + ".zip" 70 else: 71 filename = instrument_id.replace(" ", "_") + ".db" 72 73 if sqlite_conn: 74 return "sqlite:///data/" + filename 75 else: 76 return os.path.join(data_directory, filename)
Returns database file for a given instrument ID.
Arguments:
- instrument_id (str): Instrument ID that specifies which database file to retrieve
- sqlite_conn (bool, default False): Whether to receive the path for establishing a SQLite connection
- zip (bool, default False): Whether to receive the path of the database file in the local app directory
Returns:
str: Path for the database file
79def connect_to_database(name): 80 81 """ 82 Establishes a connection to a SQLite database of choice 83 84 Args: 85 name (str): 86 Name of the database, either "Settings" or an instrument ID 87 88 Returns: 89 sqlalchemy.MetaData: 90 A container object that consists of different features of a database being described 91 sqlalchemy.Connection: 92 An object that represents a single DBAPI connection, and always emits SQL statements within 93 the context of a transaction block 94 """ 95 96 if name == "Settings": 97 database_file = settings_database 98 else: 99 database_file = get_database_file(instrument_id=name, sqlite_conn=True) 100 101 engine = sa.create_engine(database_file) 102 db_metadata = sa.MetaData(bind=engine) 103 connection = engine.connect() 104 105 return db_metadata, connection
Establishes a connection to a SQLite database of choice
Arguments:
- name (str): Name of the database, either "Settings" or an instrument ID
Returns:
sqlalchemy.MetaData: A container object that consists of different features of a database being described sqlalchemy.Connection: An object that represents a single DBAPI connection, and always emits SQL statements within the context of a transaction block
108def create_databases(instrument_id, new_instrument=False): 109 110 """ 111 Initializes SQLite databases for 1) instrument data and 2) workspace settings. 112 113 Creates the following tables in the instrument database: "runs", "bio_qc_results", "sample_qc_results". 114 115 Creates the following tables in the settings database: "biological_standards", "chromatography_methods", 116 "email_notifications", "instruments", "gdrive_users", "internal_standards", "msdial_parameters", "qc_parameters", 117 "targeted_features", "workspace". 118 119 Args: 120 instrument_id (str): 121 Instrument ID to name the new database ("Thermo QE 1" becomes "Thermo_QE_1.db") 122 new_instrument (bool, default False): 123 Whether a new instrument database is being added to a workspace, or whether a new 124 instrument database AND settings database are being created for the first time 125 126 Returns: 127 None 128 """ 129 130 # Create tables for instrument database 131 instrument_database = get_database_file(instrument_id=instrument_id, sqlite_conn=True) 132 qc_db_engine = sa.create_engine(instrument_database) 133 qc_db_metadata = sa.MetaData() 134 135 bio_qc_results = sa.Table( 136 "bio_qc_results", qc_db_metadata, 137 sa.Column("id", INTEGER, primary_key=True), 138 sa.Column("sample_id", TEXT), 139 sa.Column("run_id", TEXT), 140 sa.Column("polarity", TEXT), 141 sa.Column("precursor_mz", TEXT), 142 sa.Column("retention_time", TEXT), 143 sa.Column("intensity", TEXT), 144 sa.Column("md5", TEXT), 145 sa.Column("qc_dataframe", TEXT), 146 sa.Column("qc_result", TEXT), 147 sa.Column("biological_standard", TEXT), 148 sa.Column("position", TEXT) 149 ) 150 151 runs = sa.Table( 152 "runs", qc_db_metadata, 153 sa.Column("id", INTEGER, primary_key=True), 154 sa.Column("run_id", TEXT), 155 sa.Column("chromatography", TEXT), 156 sa.Column("acquisition_path", TEXT), 157 sa.Column("sequence", TEXT), 158 sa.Column("metadata", TEXT), 159 sa.Column("status", TEXT), 160 sa.Column("samples", INTEGER), 161 sa.Column("completed", INTEGER), 162 sa.Column("passes", INTEGER), 163 sa.Column("fails", INTEGER), 164 sa.Column("latest_sample", TEXT), 165 sa.Column("qc_config_id", TEXT), 166 sa.Column("biological_standards", TEXT), 167 sa.Column("pid", INTEGER), 168 sa.Column("drive_id", TEXT), 169 sa.Column("sample_status", TEXT), 170 sa.Column("job_type", TEXT) 171 ) 172 173 sample_qc_results = sa.Table( 174 "sample_qc_results", qc_db_metadata, 175 sa.Column("id", INTEGER, primary_key=True), 176 sa.Column("sample_id", TEXT), 177 sa.Column("run_id", TEXT), 178 sa.Column("polarity", TEXT), 179 sa.Column("position", TEXT), 180 sa.Column("md5", TEXT), 181 sa.Column("precursor_mz", TEXT), 182 sa.Column("retention_time", TEXT), 183 sa.Column("intensity", TEXT), 184 sa.Column("qc_dataframe", TEXT), 185 sa.Column("qc_result", TEXT) 186 ) 187 188 qc_db_metadata.create_all(qc_db_engine) 189 190 # If only creating instrument database, save and return here 191 if new_instrument: 192 set_device_identity(is_instrument_computer=True, instrument_id=instrument_id) 193 return None 194 195 # Create tables for Settings.db 196 settings_db_engine = sa.create_engine(settings_database) 197 settings_db_metadata = sa.MetaData() 198 199 instruments = sa.Table( 200 "instruments", settings_db_metadata, 201 sa.Column("id", INTEGER, primary_key=True), 202 sa.Column("name", TEXT), 203 sa.Column("vendor", TEXT), 204 sa.Column("drive_id", TEXT), 205 sa.Column("last_modified", TEXT) 206 ) 207 208 biological_standards = sa.Table( 209 "biological_standards", settings_db_metadata, 210 sa.Column("id", INTEGER, primary_key=True), 211 sa.Column("name", TEXT), 212 sa.Column("identifier", TEXT), 213 sa.Column("chromatography", TEXT), 214 sa.Column("num_pos_features", INTEGER), 215 sa.Column("num_neg_features", INTEGER), 216 sa.Column("pos_bio_msp_file", TEXT), 217 sa.Column("neg_bio_msp_file", TEXT), 218 sa.Column("pos_parameter_file", TEXT), 219 sa.Column("neg_parameter_file", TEXT), 220 sa.Column("msdial_config_id", TEXT) 221 ) 222 223 chromatography_methods = sa.Table( 224 "chromatography_methods", settings_db_metadata, 225 sa.Column("id", INTEGER, primary_key=True), 226 sa.Column("method_id", TEXT), 227 sa.Column("num_pos_standards", INTEGER), 228 sa.Column("num_neg_standards", INTEGER), 229 sa.Column("pos_istd_msp_file", TEXT), 230 sa.Column("neg_istd_msp_file", TEXT), 231 sa.Column("pos_parameter_file", TEXT), 232 sa.Column("neg_parameter_file", TEXT), 233 sa.Column("msdial_config_id", TEXT) 234 ) 235 236 gdrive_users = sa.Table( 237 "gdrive_users", settings_db_metadata, 238 sa.Column("id", INTEGER, primary_key=True), 239 sa.Column("name", TEXT), 240 sa.Column("email_address", TEXT), 241 sa.Column("permission_id", TEXT), 242 ) 243 244 internal_standards = sa.Table( 245 "internal_standards", settings_db_metadata, 246 sa.Column("id", INTEGER, primary_key=True), 247 sa.Column("name", TEXT), 248 sa.Column("chromatography", TEXT), 249 sa.Column("polarity", TEXT), 250 sa.Column("precursor_mz", REAL), 251 sa.Column("retention_time", REAL), 252 sa.Column("ms2_spectrum", TEXT), 253 sa.Column("inchikey", TEXT) 254 ) 255 256 msdial_parameters = sa.Table( 257 "msdial_parameters", settings_db_metadata, 258 sa.Column("id", INTEGER, primary_key=True), 259 sa.Column("config_name", TEXT), 260 sa.Column("rt_begin", INTEGER), 261 sa.Column("rt_end", INTEGER), 262 sa.Column("mz_begin", INTEGER), 263 sa.Column("mz_end", INTEGER), 264 sa.Column("ms1_centroid_tolerance", REAL), 265 sa.Column("ms2_centroid_tolerance", REAL), 266 sa.Column("smoothing_method", TEXT), 267 sa.Column("smoothing_level", INTEGER), 268 sa.Column("min_peak_width", INTEGER), 269 sa.Column("min_peak_height", INTEGER), 270 sa.Column("mass_slice_width", REAL), 271 sa.Column("post_id_rt_tolerance", REAL), 272 sa.Column("post_id_mz_tolerance", REAL), 273 sa.Column("post_id_score_cutoff", REAL), 274 sa.Column("alignment_rt_tolerance", REAL), 275 sa.Column("alignment_mz_tolerance", REAL), 276 sa.Column("alignment_rt_factor", REAL), 277 sa.Column("alignment_mz_factor", REAL), 278 sa.Column("peak_count_filter", INTEGER), 279 sa.Column("qc_at_least_filter", TEXT) 280 ) 281 282 email_notifications = sa.Table( 283 "email_notifications", settings_db_metadata, 284 sa.Column("id", INTEGER, primary_key=True), 285 sa.Column("email_address", TEXT), 286 ) 287 288 qc_parameters = sa.Table( 289 "qc_parameters", settings_db_metadata, 290 sa.Column("id", INTEGER, primary_key=True), 291 sa.Column("config_name", TEXT), 292 sa.Column("intensity_dropouts_cutoff", INTEGER), 293 sa.Column("library_rt_shift_cutoff", REAL), 294 sa.Column("in_run_rt_shift_cutoff", REAL), 295 sa.Column("library_mz_shift_cutoff", REAL), 296 sa.Column("intensity_enabled", INTEGER), 297 sa.Column("library_rt_enabled", INTEGER), 298 sa.Column("in_run_rt_enabled", INTEGER), 299 sa.Column("library_mz_enabled", INTEGER) 300 ) 301 302 targeted_features = sa.Table( 303 "targeted_features", settings_db_metadata, 304 sa.Column("id", INTEGER, primary_key=True), 305 sa.Column("name", TEXT), 306 sa.Column("chromatography", TEXT), 307 sa.Column("polarity", TEXT), 308 sa.Column("biological_standard", TEXT), 309 sa.Column("precursor_mz", REAL), 310 sa.Column("retention_time", REAL), 311 sa.Column("ms2_spectrum", TEXT), 312 sa.Column("inchikey", TEXT) 313 ) 314 315 workspace = sa.Table( 316 "workspace", settings_db_metadata, 317 sa.Column("id", INTEGER, primary_key=True), 318 sa.Column("slack_bot_token", TEXT), 319 sa.Column("slack_channel", TEXT), 320 sa.Column("slack_enabled", INTEGER), 321 sa.Column("gdrive_folder_id", TEXT), 322 sa.Column("methods_zip_file_id", TEXT), 323 sa.Column("methods_last_modified", TEXT), 324 sa.Column("msdial_directory", TEXT), 325 sa.Column("is_instrument_computer", INTEGER), 326 sa.Column("instrument_identity", TEXT) 327 ) 328 329 # Insert tables into database 330 settings_db_metadata.create_all(settings_db_engine) 331 332 # Insert default configurations for MS-DIAL and MS-AutoQC 333 add_msdial_configuration("Default") 334 add_qc_configuration("Default") 335 336 # Initialize workspace metadata 337 create_workspace_metadata() 338 339 # Save device identity based on setup values 340 set_device_identity(is_instrument_computer=True, instrument_id=instrument_id) 341 return None
Initializes SQLite databases for 1) instrument data and 2) workspace settings.
Creates the following tables in the instrument database: "runs", "bio_qc_results", "sample_qc_results".
Creates the following tables in the settings database: "biological_standards", "chromatography_methods", "email_notifications", "instruments", "gdrive_users", "internal_standards", "msdial_parameters", "qc_parameters", "targeted_features", "workspace".
Arguments:
- instrument_id (str): Instrument ID to name the new database ("Thermo QE 1" becomes "Thermo_QE_1.db")
- new_instrument (bool, default False): Whether a new instrument database is being added to a workspace, or whether a new instrument database AND settings database are being created for the first time
Returns:
None
344def execute_vacuum(database): 345 346 """ 347 Executes VACUUM command on the database of choice. 348 349 Args: 350 database (str): name of the database, either "Settings" or Instrument ID 351 352 Returns: 353 None 354 """ 355 356 db_metadata, connection = connect_to_database(database) 357 connection.execute("VACUUM") 358 connection.close()
Executes VACUUM command on the database of choice.
Arguments:
- database (str): name of the database, either "Settings" or Instrument ID
Returns:
None
361def get_drive_instance(): 362 363 """ 364 Returns user-authenticated Google Drive instance. 365 """ 366 367 return GoogleDrive(auth_container[0])
Returns user-authenticated Google Drive instance.
370def launch_google_drive_authentication(): 371 372 """ 373 Launches Google Drive authentication flow and sets authentication instance. 374 """ 375 376 auth_container[0] = GoogleAuth(settings_file=drive_settings_file) 377 auth_container[0].LocalWebserverAuth()
Launches Google Drive authentication flow and sets authentication instance.
380def save_google_drive_credentials(): 381 382 """ 383 Saves Google credentials to a credentials.txt file. 384 """ 385 386 auth_container[0].SaveCredentialsFile(credentials_file)
Saves Google credentials to a credentials.txt file.
389def initialize_google_drive(): 390 391 """ 392 Initializes instance of Google Drive using credentials.txt and settings.yaml in /auth directory 393 394 Args: 395 None 396 397 Returns: 398 bool: Whether the Google client credentials file (in the "auth" directory) exists. 399 """ 400 401 # Create Google Drive instance 402 auth_container[0] = GoogleAuth(settings_file=drive_settings_file) 403 gauth = auth_container[0] 404 405 # If no credentials file, make user authenticate 406 if not os.path.exists(credentials_file) and is_valid(): 407 gauth.LocalWebserverAuth() 408 409 # Try to load saved client credentials 410 gauth.LoadCredentialsFile(credentials_file) 411 412 # Initialize saved credentials 413 if gauth.credentials is not None: 414 415 # Refresh credentials if expired 416 if gauth.access_token_expired: 417 gauth.Refresh() 418 419 # Otherwise, authorize saved credentials 420 else: 421 gauth.Authorize() 422 423 # If no saved credentials, make user authenticate again 424 elif gauth.credentials is None: 425 gauth.LocalWebserverAuth() 426 427 if not os.path.exists(credentials_file) and is_valid(): 428 save_google_drive_credentials() 429 430 # Makes small modification for emails (for usage with Google's google.auth) 431 if not os.path.exists(alt_credentials): 432 data = None 433 with open(credentials_file, "r") as file: 434 data = json.load(file)