Source code for labinform.datasafe.datasafe

"""Datasafe module for the labinform package.

The datasafe is a key feature of labinform which serves to safely store data.
Functionality includes directory generation and checksum creation.
"""

import hashlib
import os
import shutil
import tarfile
import oyaml as yaml

import labinform.datasafe.manifest as manifest


[docs] class Error(Exception): """Base class for exceptions in this module.""" pass
[docs] class NoSuchDirectoryError(Error): """Raised when an invalid path is set.""" pass
[docs] class IncorrectLoiError(Error): """Raised when an incorrect loi is provided.""" pass
[docs] class DirectoryNotEmptyError(Error): """Raised when it is tried to push data to a non-empty directory.""" pass
[docs] class NoPathForThisLoiError(Error): """Raised when the path corresponding to a given loi doesn't exist.""" pass
[docs] class NoChecksumFilePresentError(Error): """Raised when checksum file cannot be retrieved due to inexistence""" pass
[docs] class Datasafe: r"""Data handler for moving data in the context of a datasafe. The operations performed include generation of a directory structure, storing data in and retrieving data from these directories as well verifying the integrity of and providing general information about the data stored. Attributes ---------- checksum_name: :class:`str` Name used for checksum files covering all files of a dataset checksum_data_name: :class:`str` Name used for checksum files covering only the data of a dataset manifest_name: :class:`str` Name of the manifest file basic_loi: :class:`str` First part for LOIs, i.e. something like "42.xxxx/" data_movement_name: :class:`str` The name that should be used for the \*.tgz archive for moving data. metadata_extensions: :class:`list` Extensions of metadata files """ def __init__(self): self.checksum_name = None self.checksum_data_name = None self.manifest_name = None self.basic_loi = None self.data_movement_name = None self.metadata_extensions = [".info"] self._path = None @property def path(self): """Get or set the path of the datasafe's top level directory. The directory is checked for existence and set as path only in case it exists. """ return self._path @path.setter def path(self, path=""): if not self.verify_path(path): raise NoSuchDirectoryError self._path = path
[docs] @staticmethod def verify_path(path=""): """Verify if a path is correct. Static method which works for any path not just the datasafe root path. Parameters ---------- path: :class:`str` path that should be checked Returns ------- path_okay: :class:`bool` result opf the path check """ path_okay = os.path.isdir(path) return path_okay
[docs] def verify_own_path(self): """Verify if the path set as instance attribute is a correct path. Wrapper around :method: `verify_path` specifically for checking the root path of the datasafe. Returns ------- path_okay: :class:`bool` result opf the path check """ if self._path is None: return False path_okay = self.verify_path(self.path) return path_okay
[docs] def loi_to_path(self, loi=""): """Retrieve a file's datasafe directory path from the data's LOI. Retrieves the data's path (including the datasafe's root path) which is included in the LOI. If the LOI is not correctly formatted, an exception is raised. Parameters ---------- loi: :class:`str` LOI from which the path should be retrieved Returns ------- path: :class:`str` path retrieved from the LOI """ path = self.path loi_parts = loi.split("/") if len(loi_parts) != 7: raise IncorrectLoiError loi_parts_useful = loi_parts[2:] for part in loi_parts_useful: path += "/" path += part if not self.has_dir(path): raise NoPathForThisLoiError return path
[docs] @staticmethod def add_directory(path): """Create a directory at a specified path Parameters ---------- path: :class:`str` path of the directory that should be created """ if not os.path.exists(path): os.makedirs(path)
[docs] @staticmethod def dir_is_empty(path=""): """Check whether a directory is empty. Parameters ---------- path: :class:`str` path of the directory which should be checked """ try: dir_content = os.listdir(path) except FileNotFoundError: raise NoSuchDirectoryError return dir_content == list()
[docs] @staticmethod def has_dir(path=""): """Check whether a directory exists. Parameters ---------- path: :class:`str` path of the directory which should be checked """ has_dir = os.path.isdir(path) return has_dir
[docs] @staticmethod def increment(number=0): """Increment an integer by one. Parameters ---------- number: :class:`int` integer that should be incremented """ incremented = number + 1 return incremented
[docs] @staticmethod def find_highest(path=""): """Find a numbered directory with the highest number. For a given path, find the numbered directory (i.e. directory with an integer as name) with the highest number. If the directory that the path leads to doesn't exist, if it is empty or if the subdirectories are not 'numbered' an error is raised. Parameters ---------- path: :class:`str` path of the directory that should be searched """ try: dir_content = os.listdir(path) except FileNotFoundError: raise NoSuchDirectoryError dir_names = list() for entry in dir_content: try: dir_name = int(entry) dir_names.append(dir_name) except ValueError: pass if dir_names == list(): return 0 else: highest = max(dir_names) return highest
[docs] def generate(self, experiment="", sample_id=""): """Generate directory structure and return identifier. Verify to what extent the relevant directory structure is present and create directories as required. In this context the measurement number for a given sample is - in case of consecutive measurements - automatically increased. Return a unique identifier for the respective measurement and sample, including the directory path. Parameters ---------- experiment: :class:`str` type of experiment performed, e.g. 'cwepr' sample_id: :class:`str` unique identifier for the sample measured Returns ------- loi: :class:`str` unique LOI including the information provided """ path_for_loi = str() path_for_loi = self._add_directory_for_generation(path_for_loi, experiment) path_for_loi = self._add_directory_for_generation(path_for_loi, sample_id) dir_path = os.path.join(self.path, path_for_loi) if self.dir_is_empty(dir_path): path_for_loi = os.path.join(path_for_loi, "1") dir_path = os.path.join(self.path, path_for_loi) else: number = str(self.increment(self.find_highest(dir_path))) path_for_loi = os.path.join(path_for_loi, number) dir_path = os.path.join(self.path, path_for_loi) if not self.has_dir(dir_path): self.add_directory(dir_path) path_for_loi = self._add_directory_for_generation(path_for_loi, "data") path_for_loi = self._add_directory_for_generation(path_for_loi, "raw") loi_complete = self.basic_loi + path_for_loi return loi_complete
def _add_directory_for_generation(self, path, directory): new_path = os.path.join(path, directory) complete_path = os.path.join(self.path, new_path) if not self.has_dir(complete_path): self.add_directory(complete_path) return new_path
[docs] def make_tgz(self, path=""): r"""Pack directory content to \*.tgz file. Pack all files in directory to a \*.tgz file without the folder itself. Parameters ---------- path: :class:`str` path of the directory containing the files """ complete_file_name = os.path.join(path, self.data_movement_name) with tarfile.open(complete_file_name, "w:gz") as tar: for entry in os.listdir(path): if os.path.isfile(os.path.join(path, entry)): tar.add(os.path.join(path, entry), arcname=entry)
def _unpack_tgz(self, path=""): r"""Unpack \*.tgz file ..note:: Due to safety risks it is not recommended to unpack random data archives. Consequently this method is private and does only unpack archives with a specific name to prevent accidental misuse. Parameters ---------- path: :class:`str` path of the directory containing the tgz archive. """ complete_file_name = os.path.join(path, self.data_movement_name) with tarfile.open(complete_file_name, "r:gz") as tar: tar.extractall(path=path)
[docs] def push(self, data="", loi="", check_empty=True): """Move data (one file) into the datasafe. Before moving the existence of the target directory (as specified in the LOI) as well as its emptiness are verified. Before and after moving, the data's checksums are compared. Parameters ---------- data: :class:`str` data (file) to be moved loi: :class:`str` unique identifier providing a directory path check_empty: :class:`bool` whether an error should be raised if the LOI points to an non-empty directory. Returns ------- comparison: :class:`bool` Is the checksum identical before and after pushing? """ checksum_before = self.make_checksum_for_file(data) target_path = self.loi_to_path(loi) if not self.has_dir(target_path): raise NoSuchDirectoryError elif not self.dir_is_empty(target_path) and check_empty: raise DirectoryNotEmptyError else: shutil.copy(data, target_path) file_name = data.split("/")[-1] final_path = os.path.join(target_path, file_name) checksum_after = self.make_checksum_for_file(final_path) comparison = (checksum_before == checksum_after) return comparison
[docs] def multi_push(self, path="", loi=""): """Move data (all files in one directory) into the datasafe. Wrapper around :meth:`push` for moving all files in any one directory. The files are packed to a tgz archive before moving and unpacked after. Before packing and after unpacking the data's checksums are compared. Parameters ---------- path: :class:`str` path of the directory which contains the files to be moved. loi: :class:`str` unique identifier providing a directory path. Returns ------- comparison: :class:`bool` Is the checksum identical before and after pushing? """ if not self.dir_is_empty(self.loi_to_path(loi)): raise DirectoryNotEmptyError checksum_before = self.make_checksum_for_files(path) self.make_tgz(path) complete_file_name = os.path.join(path, self.data_movement_name) self.push(complete_file_name, loi, False) target_path = self.loi_to_path(loi) self._unpack_tgz(target_path) checksum_after = self.make_checksum_for_files(target_path) os.remove(os.path.join(target_path, self.data_movement_name)) comparison = (checksum_before == checksum_after) return comparison
[docs] def pull(self, loi="", target=""): """Retrieve data from the datasafe. Retrieves data from the datasafe if present at the target directory (as specified in the LOI) and moves it to another target directory, raises an exception otherwise. Parameters ---------- loi: :class:`str` unique identifier for the data to be retrieved target: :class:`str` directory where the data should be deposited. Returns ------- path: :class:`str` directory where the data was deposited. """ source_path = self.loi_to_path(loi) self.make_tgz(source_path) complete_file_name = os.path.join(source_path, self.data_movement_name) shutil.copy(complete_file_name, target) self._unpack_tgz(target) final_name = os.path.join(target, self.data_movement_name) os.remove(final_name) return target
[docs] def index(self, loi=""): """Retrieve meta information about a dataset from the datasafe. Retrieves meta information (Manifest.yaml file) for a dataset in the datasafe if present at the target directory (as specified in the LOI), raises an exception otherwise. Parameters ---------- loi: :class:`str` unique identifier for the dataset for which the meta information should be retrieved. Returns ------- manifest_dict: :class:`collections.OrderedDict` retrieved meta information (Manifest.yaml) as ordered dict """ path = self.loi_to_path(loi) complete_file_name = os.path.join(path, self.manifest_name) with open(complete_file_name, "r") as opened_file: manifest_dict = yaml.safe_load(opened_file) return manifest_dict
[docs] def make_checksum_for_files(self, path="", with_meta=False, ignore_control_files=True): """Create a cryptographic hash (currently MD5) for multiple files. All files in the directory are sorted and included in the checksum with the option to exclude control files, i.e. the manifest file and checksum files. Parameters ---------- path: :class:`str` path of directory which contains the files. with_meta: :class:`bool` whether to include metadata for checksum creation. ignore_control_files: :class:`bool` whether to ignore manifest and checksum files for checksum creation. Returns ------- checksum: :class:`str` checksum (currently MD5) """ source_files = os.listdir(path) source_files.sort() md5 = hashlib.md5() for file_name in source_files: complete_file_name = os.path.join(path, file_name) if os.path.isfile(complete_file_name) and \ (ignore_control_files is False or (file_name not in (self.manifest_name, self.checksum_data_name, self.checksum_name, self.data_movement_name))): if with_meta or ("." + file_name.split(".")[-1] not in self.metadata_extensions): with open(complete_file_name, "rb") as opened_file: for line in opened_file.readlines(): md5.update(line) checksum = md5.hexdigest() return checksum
[docs] @staticmethod def make_checksum_for_file(path=""): """Create a hash (currently MD5) for a file at a given path. Parameters ---------- path: :class:`str` path of file for which a checksum should be created. Returns ------- checksum: :class:`str` checksum (currently MD5) """ md5 = hashlib.md5() with open(path, "rb") as opened_file: for line in opened_file.readlines(): md5.update(line) checksum = md5.hexdigest() return checksum
[docs] def make_checksum_file(self, path="", with_meta=False, ignore_control_files=True): """Create a file containing a hash for files in target directory. Creates a checksum for files if present at the target directory and writes it to a checksum file, raises an exception otherwise. Parameters ---------- path: :class:`str` path to the data (files) for which a checksum should be created with_meta: :class:`bool` whether to include metadata for checksum creation. ignore_control_files: :class:`bool` whether to ignore manifest and checksum files for checksum creation. Returns ------- checksum: :class:`str` checksum (currently MD5) """ checksum = self.make_checksum_for_files(path, with_meta, ignore_control_files) if with_meta: file_name = os.path.join(path, self.checksum_name) else: file_name = os.path.join(path, self.checksum_data_name) with open(file_name, "w+") as opened_file: opened_file.write(checksum) return file_name
[docs] def make_both_checksum_files(self, path="", ignore_control_files=True): """Create files containing hashes for files in target directory. Wrapper method: Creates two checksums for files if present at the target directory and writes it to a checksum file, raises an exception otherwise. One checksum includes metadata, one doesn't. Parameters ---------- path: :class:`str` path to the data (files) for which a checksum should be created ignore_control_files: :class:`bool` whether to ignore manifest and checksum files for checksum creation. """ self.make_checksum_file(path, True, ignore_control_files) self.make_checksum_file(path, False, ignore_control_files)
[docs] def retrieve_checksum(self, loi="", with_meta=False): """Return checksum from checksum file for a given LOI. Parameters ---------- loi: :class:`str` unique identifier pointing to a datasafe directory, where the dataset is located for which the checksum should be read. with_meta: :class:`bool` whether to return the checksum that includes metadata. Returns ------- checksum: :class:`str` checksum from file """ path = self.loi_to_path(loi) if with_meta: name = self.checksum_name else: name = self.checksum_data_name file_name = os.path.join(path, name) if not os.path.isfile(file_name): raise NoChecksumFilePresentError with open(file_name, "r") as opened_file: checksum = opened_file.read() return checksum
[docs] def compare_checksum(self, loi="", with_meta=False): """Create local checksum and compare with checksum file. Parameters ---------- loi: :class:`str` unique identifier pointing to a datasafe directory, where the data is stored for which the checksums should be compared. with_meta: :class:`bool` whether to compare the checksums that include metadata. Returns ------- comparison: :class:`bool` result of the checksum comparison. """ checksum_original = self.retrieve_checksum(loi, with_meta) path = self.loi_to_path(loi) checksum_control = self.make_checksum_for_files(path, with_meta) comparison = checksum_control == checksum_original return comparison
[docs] def moveto(self, data="", experiment="", sample_id=""): """Prepare directory in datasafe and move data there. This is a wrapper function which calls :meth:`generate` to generate a directory structure if necessary and creates a local checksum of the file to be moved. Then moves the file to the datasafe, creates another checksum. The two checksums are compared and the result of the comparison is returned. Parameters ---------- data: :class:`str` data (file) that should be moved inside the datasafe. experiment: :class:`str` type of experiment performed, e.g. 'cwepr' sample_id: :class:`str` unique identifier for the sample measured Returns ------- results: :class:`list` list containing the generated LOI and the result of the checksum comparison """ loi = self.generate(experiment, sample_id) self.make_both_checksum_files(data) manifest_writer = manifest.ManifestWriter() manifest_writer.metadata_extensions = self.metadata_extensions manifest_writer.set_properties(self) manifest_writer.write(data, loi) comparison = self.multi_push(data, loi) results = [loi, comparison] return results