"""Datasafe module for the labinform package.
The datasafe is a key feature of labinform which serves to safely store data.
Functionality includes directory generation and checksum creation.
"""
import hashlib
import os
import shutil
import tarfile
import oyaml as yaml
import labinform.datasafe.manifest as manifest
[docs]
class Error(Exception):
"""Base class for exceptions in this module."""
pass
[docs]
class NoSuchDirectoryError(Error):
"""Raised when an invalid path is set."""
pass
[docs]
class IncorrectLoiError(Error):
"""Raised when an incorrect loi is provided."""
pass
[docs]
class DirectoryNotEmptyError(Error):
"""Raised when it is tried to push data to a non-empty directory."""
pass
[docs]
class NoPathForThisLoiError(Error):
"""Raised when the path corresponding to a given loi doesn't exist."""
pass
[docs]
class NoChecksumFilePresentError(Error):
"""Raised when checksum file cannot be retrieved due to inexistence"""
pass
[docs]
class Datasafe:
r"""Data handler for moving data in the context of a datasafe.
The operations performed include generation of a directory structure,
storing data in and retrieving data from these directories as well
verifying the integrity of and providing general information about the
data stored.
Attributes
----------
checksum_name: :class:`str`
Name used for checksum files covering all files of a dataset
checksum_data_name: :class:`str`
Name used for checksum files covering only the data of a dataset
manifest_name: :class:`str`
Name of the manifest file
basic_loi: :class:`str`
First part for LOIs, i.e. something like "42.xxxx/"
data_movement_name: :class:`str`
The name that should be used for the \*.tgz archive for moving data.
metadata_extensions: :class:`list`
Extensions of metadata files
"""
def __init__(self):
self.checksum_name = None
self.checksum_data_name = None
self.manifest_name = None
self.basic_loi = None
self.data_movement_name = None
self.metadata_extensions = [".info"]
self._path = None
@property
def path(self):
"""Get or set the path of the datasafe's top level directory.
The directory is checked for existence and set as path only in
case it exists.
"""
return self._path
@path.setter
def path(self, path=""):
if not self.verify_path(path):
raise NoSuchDirectoryError
self._path = path
[docs]
@staticmethod
def verify_path(path=""):
"""Verify if a path is correct.
Static method which works for any path not just the datasafe root
path.
Parameters
----------
path: :class:`str`
path that should be checked
Returns
-------
path_okay: :class:`bool`
result opf the path check
"""
path_okay = os.path.isdir(path)
return path_okay
[docs]
def verify_own_path(self):
"""Verify if the path set as instance attribute is a correct path.
Wrapper around :method: `verify_path` specifically for checking the
root path of the datasafe.
Returns
-------
path_okay: :class:`bool`
result opf the path check
"""
if self._path is None:
return False
path_okay = self.verify_path(self.path)
return path_okay
[docs]
def loi_to_path(self, loi=""):
"""Retrieve a file's datasafe directory path from the data's LOI.
Retrieves the data's path (including the datasafe's root path) which
is included in the LOI. If the LOI is not correctly formatted, an
exception is raised.
Parameters
----------
loi: :class:`str`
LOI from which the path should be retrieved
Returns
-------
path: :class:`str`
path retrieved from the LOI
"""
path = self.path
loi_parts = loi.split("/")
if len(loi_parts) != 7:
raise IncorrectLoiError
loi_parts_useful = loi_parts[2:]
for part in loi_parts_useful:
path += "/"
path += part
if not self.has_dir(path):
raise NoPathForThisLoiError
return path
[docs]
@staticmethod
def add_directory(path):
"""Create a directory at a specified path
Parameters
----------
path: :class:`str`
path of the directory that should be created
"""
if not os.path.exists(path):
os.makedirs(path)
[docs]
@staticmethod
def dir_is_empty(path=""):
"""Check whether a directory is empty.
Parameters
----------
path: :class:`str`
path of the directory which should be checked
"""
try:
dir_content = os.listdir(path)
except FileNotFoundError:
raise NoSuchDirectoryError
return dir_content == list()
[docs]
@staticmethod
def has_dir(path=""):
"""Check whether a directory exists.
Parameters
----------
path: :class:`str`
path of the directory which should be checked
"""
has_dir = os.path.isdir(path)
return has_dir
[docs]
@staticmethod
def increment(number=0):
"""Increment an integer by one.
Parameters
----------
number: :class:`int`
integer that should be incremented
"""
incremented = number + 1
return incremented
[docs]
@staticmethod
def find_highest(path=""):
"""Find a numbered directory with the highest number.
For a given path, find the numbered directory (i.e. directory with an
integer as name) with the highest number. If the directory that the
path leads to doesn't exist, if it is empty or if the subdirectories
are not 'numbered' an error is raised.
Parameters
----------
path: :class:`str`
path of the directory that should be searched
"""
try:
dir_content = os.listdir(path)
except FileNotFoundError:
raise NoSuchDirectoryError
dir_names = list()
for entry in dir_content:
try:
dir_name = int(entry)
dir_names.append(dir_name)
except ValueError:
pass
if dir_names == list():
return 0
else:
highest = max(dir_names)
return highest
[docs]
def generate(self, experiment="", sample_id=""):
"""Generate directory structure and return identifier.
Verify to what extent the relevant directory structure is present and
create directories as required. In this context the
measurement number for a given sample is - in case of consecutive
measurements - automatically increased.
Return a unique identifier for the respective measurement and sample,
including the directory path.
Parameters
----------
experiment: :class:`str`
type of experiment performed, e.g. 'cwepr'
sample_id: :class:`str`
unique identifier for the sample measured
Returns
-------
loi: :class:`str`
unique LOI including the information provided
"""
path_for_loi = str()
path_for_loi = self._add_directory_for_generation(path_for_loi,
experiment)
path_for_loi = self._add_directory_for_generation(path_for_loi,
sample_id)
dir_path = os.path.join(self.path, path_for_loi)
if self.dir_is_empty(dir_path):
path_for_loi = os.path.join(path_for_loi, "1")
dir_path = os.path.join(self.path, path_for_loi)
else:
number = str(self.increment(self.find_highest(dir_path)))
path_for_loi = os.path.join(path_for_loi, number)
dir_path = os.path.join(self.path, path_for_loi)
if not self.has_dir(dir_path):
self.add_directory(dir_path)
path_for_loi = self._add_directory_for_generation(path_for_loi,
"data")
path_for_loi = self._add_directory_for_generation(path_for_loi,
"raw")
loi_complete = self.basic_loi + path_for_loi
return loi_complete
def _add_directory_for_generation(self, path, directory):
new_path = os.path.join(path, directory)
complete_path = os.path.join(self.path, new_path)
if not self.has_dir(complete_path):
self.add_directory(complete_path)
return new_path
[docs]
def make_tgz(self, path=""):
r"""Pack directory content to \*.tgz file.
Pack all files in directory to a \*.tgz file without the folder itself.
Parameters
----------
path: :class:`str`
path of the directory containing the files
"""
complete_file_name = os.path.join(path, self.data_movement_name)
with tarfile.open(complete_file_name, "w:gz") as tar:
for entry in os.listdir(path):
if os.path.isfile(os.path.join(path, entry)):
tar.add(os.path.join(path, entry), arcname=entry)
def _unpack_tgz(self, path=""):
r"""Unpack \*.tgz file
..note::
Due to safety risks it is not recommended to unpack random data
archives. Consequently this method is private and does only unpack
archives with a specific name to prevent accidental misuse.
Parameters
----------
path: :class:`str`
path of the directory containing the tgz archive.
"""
complete_file_name = os.path.join(path, self.data_movement_name)
with tarfile.open(complete_file_name, "r:gz") as tar:
tar.extractall(path=path)
[docs]
def push(self, data="", loi="", check_empty=True):
"""Move data (one file) into the datasafe.
Before moving the existence of the target directory (as specified in
the LOI) as well as its emptiness are verified. Before and after
moving, the data's checksums are compared.
Parameters
----------
data: :class:`str`
data (file) to be moved
loi: :class:`str`
unique identifier providing a directory path
check_empty: :class:`bool`
whether an error should be raised if the LOI points to an
non-empty directory.
Returns
-------
comparison: :class:`bool`
Is the checksum identical before and after pushing?
"""
checksum_before = self.make_checksum_for_file(data)
target_path = self.loi_to_path(loi)
if not self.has_dir(target_path):
raise NoSuchDirectoryError
elif not self.dir_is_empty(target_path) and check_empty:
raise DirectoryNotEmptyError
else:
shutil.copy(data, target_path)
file_name = data.split("/")[-1]
final_path = os.path.join(target_path, file_name)
checksum_after = self.make_checksum_for_file(final_path)
comparison = (checksum_before == checksum_after)
return comparison
[docs]
def multi_push(self, path="", loi=""):
"""Move data (all files in one directory) into the datasafe.
Wrapper around :meth:`push` for moving all files in any one directory.
The files are packed to a tgz archive before moving and unpacked after.
Before packing and after unpacking the data's checksums are compared.
Parameters
----------
path: :class:`str`
path of the directory which contains the files to be moved.
loi: :class:`str`
unique identifier providing a directory path.
Returns
-------
comparison: :class:`bool`
Is the checksum identical before and after pushing?
"""
if not self.dir_is_empty(self.loi_to_path(loi)):
raise DirectoryNotEmptyError
checksum_before = self.make_checksum_for_files(path)
self.make_tgz(path)
complete_file_name = os.path.join(path, self.data_movement_name)
self.push(complete_file_name, loi, False)
target_path = self.loi_to_path(loi)
self._unpack_tgz(target_path)
checksum_after = self.make_checksum_for_files(target_path)
os.remove(os.path.join(target_path, self.data_movement_name))
comparison = (checksum_before == checksum_after)
return comparison
[docs]
def pull(self, loi="", target=""):
"""Retrieve data from the datasafe.
Retrieves data from the datasafe if present at the target directory
(as specified in the LOI) and moves it to another target directory,
raises an exception otherwise.
Parameters
----------
loi: :class:`str`
unique identifier for the data to be retrieved
target: :class:`str`
directory where the data should be deposited.
Returns
-------
path: :class:`str`
directory where the data was deposited.
"""
source_path = self.loi_to_path(loi)
self.make_tgz(source_path)
complete_file_name = os.path.join(source_path, self.data_movement_name)
shutil.copy(complete_file_name, target)
self._unpack_tgz(target)
final_name = os.path.join(target, self.data_movement_name)
os.remove(final_name)
return target
[docs]
def index(self, loi=""):
"""Retrieve meta information about a dataset from the datasafe.
Retrieves meta information (Manifest.yaml file) for a
dataset in the datasafe if present at the target directory (as
specified in the LOI), raises an exception otherwise.
Parameters
----------
loi: :class:`str`
unique identifier for the dataset for which the meta information
should be retrieved.
Returns
-------
manifest_dict: :class:`collections.OrderedDict`
retrieved meta information (Manifest.yaml) as ordered dict
"""
path = self.loi_to_path(loi)
complete_file_name = os.path.join(path, self.manifest_name)
with open(complete_file_name, "r") as opened_file:
manifest_dict = yaml.safe_load(opened_file)
return manifest_dict
[docs]
def make_checksum_for_files(self, path="", with_meta=False,
ignore_control_files=True):
"""Create a cryptographic hash (currently MD5) for multiple files.
All files in the directory are sorted and included in the checksum
with the option to exclude control files, i.e. the manifest file and
checksum files.
Parameters
----------
path: :class:`str`
path of directory which contains the files.
with_meta: :class:`bool`
whether to include metadata for checksum creation.
ignore_control_files: :class:`bool`
whether to ignore manifest and checksum files for checksum
creation.
Returns
-------
checksum: :class:`str`
checksum (currently MD5)
"""
source_files = os.listdir(path)
source_files.sort()
md5 = hashlib.md5()
for file_name in source_files:
complete_file_name = os.path.join(path, file_name)
if os.path.isfile(complete_file_name) and \
(ignore_control_files is False or
(file_name not in (self.manifest_name,
self.checksum_data_name,
self.checksum_name,
self.data_movement_name))):
if with_meta or ("." + file_name.split(".")[-1] not in
self.metadata_extensions):
with open(complete_file_name, "rb") as opened_file:
for line in opened_file.readlines():
md5.update(line)
checksum = md5.hexdigest()
return checksum
[docs]
@staticmethod
def make_checksum_for_file(path=""):
"""Create a hash (currently MD5) for a file at a given path.
Parameters
----------
path: :class:`str`
path of file for which a checksum should be created.
Returns
-------
checksum: :class:`str`
checksum (currently MD5)
"""
md5 = hashlib.md5()
with open(path, "rb") as opened_file:
for line in opened_file.readlines():
md5.update(line)
checksum = md5.hexdigest()
return checksum
[docs]
def make_checksum_file(self, path="", with_meta=False,
ignore_control_files=True):
"""Create a file containing a hash for files in target directory.
Creates a checksum for files if present at the target directory and
writes it to a checksum file, raises an exception otherwise.
Parameters
----------
path: :class:`str`
path to the data (files) for which a checksum should be created
with_meta: :class:`bool`
whether to include metadata for checksum creation.
ignore_control_files: :class:`bool`
whether to ignore manifest and checksum files for checksum
creation.
Returns
-------
checksum: :class:`str`
checksum (currently MD5)
"""
checksum = self.make_checksum_for_files(path, with_meta,
ignore_control_files)
if with_meta:
file_name = os.path.join(path, self.checksum_name)
else:
file_name = os.path.join(path, self.checksum_data_name)
with open(file_name, "w+") as opened_file:
opened_file.write(checksum)
return file_name
[docs]
def make_both_checksum_files(self, path="", ignore_control_files=True):
"""Create files containing hashes for files in target directory.
Wrapper method: Creates two checksums for files if present at the
target directory and writes it to a checksum file, raises an exception
otherwise. One checksum includes metadata, one doesn't.
Parameters
----------
path: :class:`str`
path to the data (files) for which a checksum should be created
ignore_control_files: :class:`bool`
whether to ignore manifest and checksum files for checksum
creation.
"""
self.make_checksum_file(path, True, ignore_control_files)
self.make_checksum_file(path, False, ignore_control_files)
[docs]
def retrieve_checksum(self, loi="", with_meta=False):
"""Return checksum from checksum file for a given LOI.
Parameters
----------
loi: :class:`str`
unique identifier pointing to a datasafe directory, where the
dataset is located for which the checksum should be read.
with_meta: :class:`bool`
whether to return the checksum that includes metadata.
Returns
-------
checksum: :class:`str`
checksum from file
"""
path = self.loi_to_path(loi)
if with_meta:
name = self.checksum_name
else:
name = self.checksum_data_name
file_name = os.path.join(path, name)
if not os.path.isfile(file_name):
raise NoChecksumFilePresentError
with open(file_name, "r") as opened_file:
checksum = opened_file.read()
return checksum
[docs]
def compare_checksum(self, loi="", with_meta=False):
"""Create local checksum and compare with checksum file.
Parameters
----------
loi: :class:`str`
unique identifier pointing to a datasafe directory, where the
data is stored for which the checksums should be compared.
with_meta: :class:`bool`
whether to compare the checksums that include metadata.
Returns
-------
comparison: :class:`bool`
result of the checksum comparison.
"""
checksum_original = self.retrieve_checksum(loi, with_meta)
path = self.loi_to_path(loi)
checksum_control = self.make_checksum_for_files(path, with_meta)
comparison = checksum_control == checksum_original
return comparison
[docs]
def moveto(self, data="", experiment="", sample_id=""):
"""Prepare directory in datasafe and move data there.
This is a wrapper function which calls :meth:`generate` to generate
a directory structure if necessary and creates a local checksum of
the file to be moved. Then moves the file to the datasafe, creates
another checksum.
The two checksums are compared and the result of the comparison is
returned.
Parameters
----------
data: :class:`str`
data (file) that should be moved inside the datasafe.
experiment: :class:`str`
type of experiment performed, e.g. 'cwepr'
sample_id: :class:`str`
unique identifier for the sample measured
Returns
-------
results: :class:`list`
list containing the generated LOI and the result of the checksum
comparison
"""
loi = self.generate(experiment, sample_id)
self.make_both_checksum_files(data)
manifest_writer = manifest.ManifestWriter()
manifest_writer.metadata_extensions = self.metadata_extensions
manifest_writer.set_properties(self)
manifest_writer.write(data, loi)
comparison = self.multi_push(data, loi)
results = [loi, comparison]
return results