Source code for pylimer_tools.io.read_lammps_output_file

"""
This module provides a few functions to read LAMMPS' output files, including:

- log files (thermo output)
- dump files (focusing on the coordinates of atoms)
- data files (the LAMMPS structure)
- averaged data (from :code:`fix ave/time...` or :code:`fix ave/hist...`)
- correlation data (from :code:`fix ave/correlate/...`)

"""

import os
import re
import warnings
from typing import List, Union

import pandas as pd

from pylimer_tools.io.extract_thermo_data import extract_thermo_params
from pylimer_tools.utils.cache_utility import do_cache, load_cache
from pylimer_tools_cpp import AtomStyle, Universe, UniverseSequence


[docs] def read_log_file( filepath, lines_to_read_to_detect_header=500000) -> pd.DataFrame: """ Read a LAMMPS' log (thermo output) file. :param filepath: Path to the LAMMPS log file :type filepath: str :param lines_to_read_to_detect_header: Maximum number of lines to read when detecting the header :type lines_to_read_to_detect_header: int :return: DataFrame containing the parsed thermo data :rtype: pd.DataFrame """ return extract_thermo_params( filepath, header=None, texts_to_read=500000, lines_to_read_to_detect_header=lines_to_read_to_detect_header, )
[docs] def read_dump_file( data_file, dump_file, atom_style: Union[List[AtomStyle], None] = None ) -> UniverseSequence: """ Read a file with LAMMPS' dump of snapshots of structures into a Universe. :param data_file: Path to the LAMMPS data file containing structure information :type data_file: str :param dump_file: Path to the LAMMPS dump file containing trajectory information :type dump_file: str :param atom_style: The atom style(s) used in the data file :type atom_style: Union[List[AtomStyle], None] :return: Sequence of Universe objects representing the trajectory :rtype: UniverseSequence """ u_s = UniverseSequence() if atom_style is not None: u_s.set_data_file_atom_style(atom_style) u_s.initialize_from_dump_file(data_file, dump_file) return u_s
[docs] def read_data_file( structure_file: str, atom_style: Union[List[AtomStyle], None] = None ) -> Universe: """ Read a file with LAMMPS' data type of structure into a Universe. :param structure_file: Path to the structure file :type structure_file: str :param atom_style: The atom style(s) in the structure file (defaults to AtomStyle.Molecule if None) :type atom_style: Union[List[AtomStyle], None] :return: Universe object representing the molecular structure :rtype: Universe :raises FileNotFoundError: If the structure file does not exist """ if not (os.path.isfile(structure_file)): raise FileNotFoundError( f"Structure-file '{structure_file}' not found.") u_s = UniverseSequence() if atom_style is not None: u_s.set_data_file_atom_style(atom_style) u_s.initialize_from_data_sequence([structure_file]) universe = u_s.at_index(0) del u_s return universe
[docs] def read_averages_file(filepath, use_cache: bool = True, sep=" ") -> pd.DataFrame: """ Read a file written by a `fix ave/time` command. Uses pandas' read_csv after detecting the columns. Important assumption: The first 2 or 3 lines in the file are: - comment, - then one header indicating the columns, - and then either data or potentially a second header, if it is a sectioned file (e.g., from a `fix ave/time ... vector`) :param filepath: Path to the averages file :type filepath: str :param use_cache: Whether to use the cache to speed up reading & writing :type use_cache: bool :param sep: Delimiter used in the file (default is space) :type sep: str :return: DataFrame containing the parsed average data :rtype: pd.DataFrame :raises FileNotFoundError: If the averages file does not exist """ if not (os.path.isfile(filepath)): raise FileNotFoundError(f"Averages-file '{filepath}' not found.") header_line = None with open(filepath, "r") as f: line0 = f.readline() line1 = f.readline() line2 = f.readline() if line2.startswith("#"): return read_sectioned_averages_file(filepath, use_cache=use_cache) header_line = line1 if line1.startswith("#") else line0 header_line = header_line.removeprefix("#").strip() try: data = pd.read_csv( filepath, comment="#", names=header_line.split(), sep=sep) except pd.errors.EmptyDataError: return pd.DataFrame() return data
[docs] def read_sectioned_averages_file( filepath, use_cache: bool = True) -> pd.DataFrame: """ Read a file written by a `fix ave/time` command with multiple sections. Use the section delimiter columns together with pandas' groupby() to restore the original sections. :param filepath: Path to the sectioned averages file :type filepath: str :param use_cache: Whether to use the cache to speed up reading & writing :type use_cache: bool :return: DataFrame containing the parsed sectioned data :rtype: pd.DataFrame :raises FileNotFoundError: If the file does not exist :raises ValueError: If the file format is not recognized as a proper sectioned averages file """ if not (os.path.isfile(filepath)): raise FileNotFoundError(f"Averages-file '{filepath}' not found.") cache_suffix = "sectionedavg-cache.pickle" cache_content = load_cache(filepath, cache_suffix) if cache_content is not None and use_cache: return cache_content data = {} with open(filepath, "r") as f: f.readline() # discard line 0 line1 = f.readline() line2 = f.readline() if not line2.startswith("#"): raise ValueError( "The file '{}' was not detected to be a proper sectioned averages file.".format( filepath ) ) # return readSectionedAveragesFile(filepath) header_line1 = line1.removeprefix("#").strip() header_line2 = line2.removeprefix("#").strip() header_line1_split = header_line1.split() header_line2_split = header_line2.split() if len(header_line1_split) == len(header_line2_split): raise ValueError( "Cannot read this file, as we cannot distinguish between section header and main data" ) current_data = [] current_key = None for line in f: split_line = line.split() if current_key is None: assert len(split_line) == len(header_line1.split()) current_key = line continue if len(split_line) == len(header_line1_split): data[current_key] = current_data current_data = [] current_key = line else: assert len(split_line) == len(header_line2_split) current_data.append(split_line) data[current_key] = current_data # convert all the data to a dataframe dfs_to_concat = [] if header_line1_split is None: raise ValueError("Did not find a useable header line.") for key in data.keys(): split_key = key.split() local_dataframe = pd.DataFrame(data[key], columns=header_line2_split) for i, col in enumerate(header_line1_split): local_dataframe[col] = split_key[i] dfs_to_concat.append(local_dataframe) df = pd.concat(dfs_to_concat, ignore_index=True) # convert all columns of DataFrame df = df.apply(pd.to_numeric, errors="ignore") do_cache(df, filepath, cache_suffix) return df
[docs] def read_histogram_file(filepath, use_cache: bool = True) -> pd.DataFrame: """ Read a file written by `fix ave/hist` or similar. This is a wrapper around read_sectioned_averages_file for histogram data. :param filepath: Path to the histogram file :type filepath: str :param use_cache: Whether to use the cache to speed up reading & writing :type use_cache: bool :return: DataFrame containing the parsed histogram data :rtype: pd.DataFrame :see: :func:`~pylimer_tools.io.read_lammps_output_file.read_sectioned_averages_file` """ return read_sectioned_averages_file(filepath, use_cache)
[docs] def read_correlation_file( filepath, group_key="Timestep", use_cache: bool = True ) -> pd.DataFrame: """ Read a file written by a `fix ave/correlate{/long}` command. :param filepath: Path to the correlation file :type filepath: str :param group_key: The key that denotes a new section :type group_key: str :param use_cache: Whether to use the cache to speed up reading & writing :type use_cache: bool :return: DataFrame containing the correlation data. Use the group_key with the DataFrame's groupby() to restore the original sections. :rtype: pd.DataFrame :raises FileNotFoundError: If the correlation file does not exist """ if not (os.path.isfile(filepath)): raise FileNotFoundError(f"Correlation-file '{filepath}' not found.") cache_suffix = "{}-correlation-cache.pickle".format( group_key if isinstance(group_key, str) else "g" ) cache_content = load_cache(filepath, cache_suffix) if cache_content is not None and use_cache: return cache_content data = {} header_line = None with open(filepath, "r") as f: current_data = [] current_key = None header_line = f.readline() if header_line.startswith("#"): # in LAMMPS files, there is a title line that does not exist in our DPD output, # -> this line is needed for LAMMPS header_line = f.readline() cols = header_line.removeprefix("#").strip().split() normal_line_len = len(cols) lines_interpreted = 0 def is_group_key(line): # if (isinstance(group_key, list)): # return np.any([x in line for x in group_key]) # else: return group_key in line for line in f: if (line.startswith("#") or len(line.strip()) == 0) and not is_group_key( line ): if lines_interpreted == 0: header_line = line continue if line == header_line: continue split = line.removeprefix("#").strip().split() if len(split) == 2 or is_group_key(line): if current_key is not None and len(current_data) > 0: data[current_key] = current_data current_data = [] # new key current_key = line elif len(split) == normal_line_len or normal_line_len is None: # normal_line_len = len(split) current_data.append(split) else: raise ValueError( "Did not expect {} splited values on line with content {} in correlation file {}".format( len(split), line, filepath ) ) lines_interpreted += 1 if current_key is not None and len(current_data) > 0: data[current_key] = current_data cols.append(group_key) correlated_data_assembled = [] for key in data.keys(): assert group_key in str(key) compiled_regex = re.compile(r"{}:? ([\d]+)".format(group_key)) results = compiled_regex.search(key) if results is None: warnings.warn( "Did not find {} with number in {} when reading {}".format( group_key, key, filepath ) ) assert results is not None timestep = int(results.group(1)) for row in data[key]: row.append(timestep) assert len(row) == len(cols) correlated_data_assembled.append(row) correlated_data = pd.DataFrame(correlated_data_assembled, columns=cols) # convert all columns of DataFrame correlated_data = correlated_data.apply(pd.to_numeric, errors="ignore") do_cache(correlated_data, filepath, cache_suffix) return correlated_data