Skip to content
Snippets Groups Projects
Commit 57575f95 authored by Jonathan Hartman's avatar Jonathan Hartman
Browse files

Update Main to 0.3.4

parent b4c003e1
Branches
Tags
No related merge requests found
Showing
with 857 additions and 135 deletions
......@@ -233,4 +233,12 @@ trigger-biodiv:
- if: '$CI_COMMIT_MESSAGE =~ /trigger-biodiv/i'
variables:
PROJECT_NAME: "Biodiversity"
VERSION: "dev"
trigger-nz:
extends: .trigger-template
rules:
- if: '$CI_COMMIT_MESSAGE =~ /trigger-nz/i'
variables:
PROJECT_NAME: "new_zealand"
VERSION: "dev"
\ No newline at end of file
[PROJECT]
name = "Example Project"
csv_field_delimiter = ","
[DIRECTORIES]
root = ""
......
......@@ -42,7 +42,9 @@ filterwarnings = [
log_cli = false
log_cli_level = "INFO"
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')"
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"deprecated: marks tests as deprecated (deselect with '-m \"not deprecated\"')",
"no_dir_setup: marks tests as not requiring a directory setup (deselect with '-m \"not no_dir_setup\"')",
]
python_classes = "Test_"
......
......@@ -69,10 +69,13 @@ def check_for_new_files() -> None:
# Check the local folder for data files
for data_file in data_files:
logger.info("Inspecting file %s", data_file)
my_resource = DataFile.create(data_file)
if my_resource.is_valid:
state.add_datafile(my_resource)
else:
try:
my_resource = DataFile.create(data_file)
if my_resource.is_valid:
state.add_datafile(my_resource)
else:
state.add_testcase(DataFileTestCase(data_file, "invalid"))
except NotImplementedError:
state.add_testcase(DataFileTestCase(data_file, "invalid"))
testsuites = state.xml.filter("credentials")
......
# flake8: noqa
# pylint: skip-file
from .find_file_location import find_file_location
from .idata_reader import IDataReader
......@@ -17,6 +17,7 @@ import json
import logging
import time
from .register_sources import register_all_sources
from .state.current_state import CurrentState
from .utils.logging import setup_logging
from .utils.file_io import find_non_serializable_values
......@@ -42,7 +43,7 @@ def get_data_descriptions() -> None:
state = CurrentState.load()
all_descriptions = {}
for data_file in state.files_to_process:
for data_file in state.schema_assignment.data_files:
all_descriptions[data_file.filepath] = {
"file_info": data_file.description,
"fields": data_file.field_descriptions,
......@@ -76,4 +77,5 @@ if __name__ == "__main__":
if args.debug:
args.n_processes = 1
register_all_sources()
get_data_descriptions()
# flake8: noqa
# pylint: skip-file
from .file_auditor import FileAuditor
"""
Contains the FileAuditor class.
This class is used to read in a json file containing file metadata and compare it to the actual
files currently present in any available locations. It can then be used to return a list of files
that have been added, removed, or modified since the last time the json file was updated. We can
also use it to update the json file with the current file metadata.
Example JSON file format:
[
{
"filepath": "file1",
"source": {
"type": "local_directory",
"location": "./my_dir"
},
"size_in_bytes": 12345,
"last_modified": "2020-01-01T00:00:00Z"
},
{
filepath: "file2",
"source": {
"type": "s3_bucket",
"bucket_name": "my_bucket",
},
"size_in_bytes": 67890,
"last_modified": "2020-01-01T00:00:00Z"
}
]
Example usage:
file_auditor = FileAuditor()
"""
from dataclasses import dataclass
import logging
from ..source.reader.ijson_data_reader import IJSONDataReader
from ..source import ISource, find_file_location
from ..objects.resource import Resource, remove_duplicate_resources
from ..utils.string_filter import StringFilter
logger = logging.getLogger(__name__)
@dataclass
class FileAuditor:
"""
Class for auditing files in one or more sources and comparing them to a json file containing
file metadata.
"""
filepath: str = "file_metadata.json"
file_patterns: list[str] = None
ignore_patterns: list[str] = None
reader: IJSONDataReader = None
unchanged_files: list[Resource] = None
new_files: list[Resource] = None
removed_files: list[Resource] = None
modified_files: list[Resource] = None
@property
def file_metadata(self) -> dict:
"""
The file metadata dictionary.
Returns:
dict:
The file metadata dictionary.
"""
if self.reader is None:
return []
try:
return self.reader.read_dict()
except FileNotFoundError:
return []
@property
def all_existing_files(self) -> list[Resource]:
"""
A list of all files in the file metadata.
Returns:
list:
A list of all files in the file metadata.
"""
if self.file_metadata is None:
return []
return [Resource.from_dict(metadata) for metadata in self.file_metadata]
@property
def files_to_process(self) -> list[Resource]:
"""
A list of all files that need to be processed.
Returns:
list:
A list of all files that need to be processed.
"""
return self.new_files + self.modified_files
@property
def all_files(self) -> list[Resource]:
"""
Gets a list of all files that the Auditor has discovered.
This is the union of the unchanged, new, removed, and modified files. If we have not
searched for files yet, this will return an empty list and print a warning.
"""
if self.unchanged_files is None:
logger.warning(
"You must search for files before you can get a list of all files."
)
return []
return (
self.unchanged_files
+ self.new_files
+ self.removed_files
+ self.modified_files
)
def __post_init__(self):
try:
if self.reader is None:
self.reader = find_file_location(self.filepath)
# Check that the reader is a JSON reader
if self.reader.format != "json":
raise ValueError(
f"FileAuditor reader must be a JSON reader, not {self.reader.format}"
)
except FileNotFoundError:
logger.info("No file metadata found. Initializing empty file metadata.")
if self.ignore_patterns is None:
self.ignore_patterns = ["*/schemas.json"]
self.filter = StringFilter(
pattern=self.file_patterns, ignore_pattern=self.ignore_patterns
)
self.unchanged_files = []
self.new_files = []
self.removed_files = []
self.modified_files = []
def get_files_to_process_with_pattern(self, pattern: str) -> list[Resource]:
"""
Get the files that need to be processed that match the given pattern.
Args:
pattern (str): The pattern to match.
Returns:
list:
A list of files that need to be processed that match the given pattern.
"""
my_filter = StringFilter(pattern=pattern, ignore_pattern=self.ignore_patterns)
filepaths = my_filter.filter_list(
[file.filepath for file in self.files_to_process]
)
return [file for file in self.files_to_process if file.filepath in filepaths]
def get_existing_files_as_resources(self, source: ISource) -> list[Resource]:
"""
Given a source, get a list of all files that are in the source and are also in the file
metadata.
All Objects will be returned as Resources.
Args:
source (ISource): The source to search for files.
Returns:
list[Resource]:
A list of all files that are in the source and are also in the file metadata.
"""
existing_filepaths = self.filter.filter_list(
[metadata["filepath"] for metadata in self.file_metadata]
)
existing_files = [
Resource.from_dict(metadata)
for metadata in self.file_metadata
if metadata["source"] == source.to_json()
and metadata["filepath"] in existing_filepaths
]
logger.debug("Found %d existing files in metadata.", len(existing_files))
return existing_files
def get_files_in_source_as_resources(self, source: ISource) -> list[Resource]:
"""
Given a source, get a list of all files that are in the source.
All Objects will be returned as Resources.
Args:
source (ISource): The source to search for files.
Returns:
list[Resource]:
A list of all files that are in the source.
"""
filepaths_in_source = self.filter.filter_list(source.list_files())
files_in_source = [
Resource(filepath=source_file, source=source)
for source_file in source.list_files()
if source_file in filepaths_in_source
]
logger.debug("Found %d files in %s.", len(files_in_source), source)
return files_in_source
def search_all(self, sources: list[ISource]):
"""
Search all given sources for files that have been added, removed, or modified since the last
time the json file was updated.
Args:
sources (list):
A list of sources to search for files.
"""
for source in sources:
self.search(source)
def search(self, source: ISource) -> list[Resource]:
"""
Search the given source for files that have been added, removed, or modified since the last
time the json file was updated.
Args:
source (ISource): The source to search for files.
Returns:
list:
A list of files that have been added, removed, or modified since the last time
the json file was updated.
"""
existing_files = self.get_existing_files_as_resources(source)
files_in_source = self.get_files_in_source_as_resources(source)
self.unchanged_files = self._find_unchanged_files(
existing_files, files_in_source
)
self.modified_files = self._find_modified_files(existing_files, files_in_source)
self.new_files = self._find_new_files(existing_files, files_in_source)
self.removed_files = self._find_removed_files(existing_files, files_in_source)
def _find_unchanged_files(
self, existing_files: list[Resource], files_in_source: list[Resource]
) -> list[Resource]:
"""
Scan through the existing files and find the ones that have not been modified.
"""
unchanged_files = set(self.unchanged_files)
for file in existing_files:
if file in files_in_source:
unchanged_files.add(file)
return remove_duplicate_resources(list(unchanged_files))
def _find_modified_files(
self, existing_files: list[Resource], files_in_source: list[Resource]
) -> list[Resource]:
"""
Scan through the existing files and find the ones that have been modified.
"""
modified_files = set(self.modified_files)
for existing_file in existing_files:
for source_file in files_in_source:
logger.debug("Comparing %s to %s.", existing_file, source_file)
if existing_file.is_similar_to(source_file):
logger.debug("%s has been modified.", existing_file)
modified_files.add(source_file)
return remove_duplicate_resources(list(modified_files))
def _find_new_files(
self, existing_files: list[Resource], files_in_source: list[Resource]
) -> list[Resource]:
"""
Scan through the files in the source and find the ones that have been added.
"""
new_files = set(self.new_files)
for file in files_in_source:
if file not in existing_files and file not in self.modified_files:
logger.debug("%s is a new file.", file)
new_files.add(file)
# import pdb
# pdb.set_trace()
return remove_duplicate_resources(list(new_files))
def _find_removed_files(
self, existing_files: list[Resource], files_in_source: list[Resource]
) -> list[Resource]:
"""
Scan through the existing files and find the ones that have been removed.
"""
removed_files = set(self.removed_files)
for file in existing_files:
# Check if the file is in the modified files list, in which case it's not new
in_modified = Resource.contains_modified_resource(file, self.modified_files)
if file not in files_in_source and not in_modified:
logger.debug("%s has been removed.", file)
removed_files.add(file)
return remove_duplicate_resources(list(removed_files))
def write_json(self, source: ISource) -> None:
"""
Write the file metadata to a json file.
Args:
source (ISource): The source to write the file metadata to.
Returns:
None
"""
# Combine all the lists of files into one list
all_files = (
self.unchanged_files
+ self.new_files
+ self.removed_files
+ self.modified_files
)
# Convert the list of FileEntry objects to a list of dictionaries
file_metadata = [file.to_dict() for file in all_files]
# Write the file metadata to a json file
source.write_json(self.filepath, file_metadata)
......@@ -36,9 +36,17 @@ import logging
from pathlib import Path
import time
from .file_auditor import FileAuditor
from .source import available_sources
from .register_sources import register_all_sources, get_all_sources
from .settings import settings
from .state.current_state import CurrentState
from .objects.schema_assignment import SchemaAssignment
from .objects.schema_assignment import (
SchemaAssignment,
SchemaAssignmentLoader,
SchemaAssignmentManager,
)
from .utils.logging import setup_logging
from .utils.s3_utils import validate_credentials
......@@ -100,6 +108,30 @@ def get_data_files_in_schemas_json(schema_json):
return files
def get_schema_assignment(filepath, potential_sources=None) -> SchemaAssignment:
"""
Given a filepath and a list of potential sources, load the schema assignment
Args:
filepath (str): The path to the schema assignment file
potential_sources (list): A list of sources to search for the file in
Returns:
SchemaAssignment: The loaded schema assignment
"""
try:
# Locate which source contains this object
source = available_sources.find_file(filepath)
# Get a reader for this file
my_reader = source.get_reader(filepath)
except FileNotFoundError:
my_reader = None
# Load the object
return SchemaAssignmentLoader.from_reader(my_reader, potential_sources)
###############################################################################
# Main Function
###############################################################################
......@@ -107,6 +139,10 @@ def main():
"""
Main function for enclosing all operations in this script
"""
# Register all sources
register_all_sources()
# Load in the state object
state = CurrentState.load()
......@@ -117,22 +153,27 @@ def main():
# Load in any data we might need from the schemas assignment file.
# Ensure that it's valid itself, and that the files it references are
# present.
my_schema_assignment = SchemaAssignment.from_file(settings.directories.schemas_file)
my_schema_assignment.find_additional_files(
directories=settings.directories.data,
schemas_directory=settings.directories.schema_directory,
my_schema_assignment = get_schema_assignment(
settings.directories.schemas_file, get_all_sources()
)
# For any data files that are in the state object that do not yet have a schema assigned,
# either locate an existing schema or create a new one
for data_file in state.data_files:
if data_file.schema is None:
my_schema_assignment.locate_compatible_schema(data_file)
# Audit all of the files we can see in our sources
my_file_auditor = FileAuditor(
file_patterns=settings.file_patterns.all_file_patterns
)
my_file_auditor.search_all(get_all_sources())
schema_assignment_manager = SchemaAssignmentManager(my_schema_assignment)
schema_assignment_manager.update_schema_assignment(
my_file_auditor,
schema_file_patterns=settings.file_patterns.schema_file_patterns,
data_file_patterns=settings.file_patterns.data_file_patterns,
)
state.schema_assignment = my_schema_assignment
settings.directories.xml_report.parent.mkdir(parents=True, exist_ok=True)
state.testsuites_to_xml(settings.directories.xml_report.as_posix())
Path(settings.directories.xml_report).parent.mkdir(parents=True, exist_ok=True)
state.testsuites_to_xml(settings.directories.xml_report)
testsuites = state.xml.filter("credentials|locate_data|file_validation")
testsuites.to_file(settings.directories.xml_report)
......
......@@ -19,6 +19,7 @@ import time
import flask
from flask.templating import render_template
from .register_sources import register_all_sources
from .state.current_state import CurrentState
from .settings import settings
from .utils.logging import setup_logging
......@@ -272,17 +273,18 @@ def build_pages() -> None:
move_static_files("fair_ds_ap42.static", Path("out/pages"))
state = CurrentState.load()
state_dict = state.as_dict(settings.project.page_root_url)
# Audit all of the files we can see in our sources
data_file_dicts = []
for data_file in state.schema_assignment.data_files:
data_file_dicts.append(data_file.as_dict(settings.project.page_root_url))
# DELETE ME LATER
with open("out/state.json", "w", encoding="utf-8") as file_obj:
json.dump(state_dict, file_obj, indent=2)
schema_file_dicts = []
for schema_file in state.schema_assignment.schemas:
schema_file_dicts.append(schema_file.as_dict(settings.project.page_root_url))
datafile_links = {
file["filepath"]: file["htmlpath"] for file in state_dict["data_files"]
}
datafile_links = {file["filepath"]: file["htmlpath"] for file in data_file_dicts}
schemafile_links = {
file["filepath"]: file["htmlpath"] for file in state_dict["schema_files"]
file["filepath"]: file["htmlpath"] for file in schema_file_dicts
}
navbar_links = {
......@@ -301,10 +303,10 @@ def build_pages() -> None:
build_main_page(root_directory, state, navbar_links)
build_about_page(root_directory, navbar_links)
for datafile in state_dict["data_files"]:
for datafile in data_file_dicts:
build_datafile_page(root_directory, datafile, navbar_links)
for schemafile in state_dict["schema_files"]:
for schemafile in schema_file_dicts:
build_schema_page(root_directory, schemafile, navbar_links)
......@@ -316,4 +318,5 @@ if __name__ == "__main__":
# Parse command line args
args = _parse_cli()
register_all_sources()
build_pages()
......@@ -15,7 +15,7 @@ def parse_boolean_from_strings(
data: pd.Series,
true_values: list = None,
false_values: list = None,
coerce: bool = True
coerce: bool = True,
) -> pd.Series:
"""
Parses a series of strings into a series of booleans
......
......@@ -79,25 +79,6 @@ def get_coordinate_extents(data: pd.Series) -> dict:
return extents
def get_distances(data: pd.Series) -> list[float]:
"""
Calculate the distances between all pairs of coordinates in a column
Args:
data (pd.Series): A column of coordinate tuples
Returns:
list[float]: A list of distances between all pairs of coordinates
"""
distances = []
for index1, coord1 in enumerate(data):
for _, coord2 in enumerate(data[index1 + 1 :], start=index1 + 1):
dist = haversine_distance(coord1, coord2)
distances.append(dist)
return distances
def get_coordinate_center(data: pd.Series) -> tuple:
"""
Calculate the center of a set of coordinates
......@@ -138,28 +119,15 @@ class GeodataColumnDescriber(IDataColumnDescriber):
"Percent Unique": column_data.nunique() / len(column_data),
"Missing": int(num_missing),
"Percent Missing": percent_missing,
"Extents": None,
"Central Point": None,
"Average Distance Between Points": None,
"Standard Deviation Distance Between Points": None,
"Minimum Distance Between Points": None,
"Maximum Distance Between Points": None,
}
if percent_missing < 1:
distances = get_distances(column_data)
center_lat, center_lon = get_coordinate_center(column_data)
description.update(
{
"Central Point Latitude": center_lat,
"Central Point Longitude": center_lon,
"Average Distance Between Points": round(np.mean(distances), 2),
"Standard Deviation Distance Between Points": round(
np.std(distances), 2
),
"Minimum Distance Between Points": round(min(distances), 2),
"Maximum Distance Between Points": round(max(distances), 2),
}
)
description = {**description, **get_coordinate_extents(column_data)}
......
......@@ -23,12 +23,14 @@ from ..data_column.util import create_data_column
from ..errors import DataFileTestCase
from ..resource import Resource
from ..schema import Schema
from ...source import get_source, ISource, get_s3_bucket_from_settings
from ...data_reader import IDataReader, find_file_location
from ...source import get_source, ISource
from ...source.reader.itabular_data_reader import ITabularDataReader
from ...visualizations import MissingnessMatrix, Heatmap, CorrelationPlot
from ...settings import settings
from ...settings.file_settings import FileSettings
from .datafile_metadata import DataFileMetadata
logger = logging.getLogger(__name__)
......@@ -40,7 +42,7 @@ class DataFile(Resource):
# Static Methods =========================================================
@classmethod
def create(cls, filepath: Union[str, Path], delimiter=None, encoding=None):
def create(cls, filepath: Union[str, Path]):
"""Factory method for obtaining a DataFile
Args:
......@@ -54,35 +56,31 @@ class DataFile(Resource):
"""
my_source = get_source(filepath)
try:
my_reader = find_file_location(
filepath, s3_bucket=get_s3_bucket_from_settings()
)
file_settings = settings.per_file.get_file_settings(filepath)
my_reader.delimiter = (
delimiter if delimiter is not None else file_settings.delimiter
)
my_reader.encoding = (
encoding if encoding is not None else file_settings.encoding
)
except (FileNotFoundError, NotImplementedError):
my_reader = None
my_datafile = DataFile(filepath=filepath, source=my_source, reader=my_reader)
my_datafile = DataFile(filepath=filepath, source=my_source)
logger.info("Created DataFile Resource %s", my_datafile)
return my_datafile
# Magic Methods ==========================================================
def __eq__(self, other):
if not isinstance(other, DataFile):
return False
return hash(self) == hash(other)
@classmethod
def from_resource(cls, obj: Resource) -> "DataFile":
"""
Convert a Resource object to a DataFile object
def __hash__(self):
return hash(self.__key__())
Args:
obj (Resource):
A Resource object to convert to a DataFile
Returns:
DataFile
"""
return DataFile(
filepath=obj.filepath,
source=obj.source,
size_in_bytes=obj.size_in_bytes,
last_modified=obj.last_modified,
file_settings=settings.per_file.get_file_settings(obj.filepath),
)
# Magic Methods ==========================================================
def __key__(self):
return (
self.filepath,
......@@ -92,6 +90,14 @@ class DataFile(Resource):
self.last_modified,
)
def __hash__(self):
return hash(self.__key__())
def __eq__(self, other):
if isinstance(other, DataFile):
return self.__key__() == other.__key__()
return NotImplemented
def __repr__(self):
return str(self)
......@@ -102,7 +108,15 @@ class DataFile(Resource):
return my_str
# Constructor ============================================================
def __init__(self, filepath: str, source: ISource, reader: IDataReader):
def __init__(
self,
filepath: str,
source: ISource,
size_in_bytes: int = None,
last_modified: str = None,
file_settings: FileSettings = None,
metadata: DataFileMetadata = None,
):
"""
Constructor
......@@ -114,10 +128,21 @@ class DataFile(Resource):
format (Format):
The format of this object (e.g. csv, parquet)
"""
super().__init__(filepath=filepath, source=source, reader=reader)
self.reader = reader
super().__init__(
filepath=filepath,
source=source,
size_in_bytes=size_in_bytes,
last_modified=last_modified,
)
if file_settings is None:
file_settings = settings.per_file.get_file_settings(self.filepath)
if isinstance(self.reader, ITabularDataReader):
self.reader.delimiter = file_settings.delimiter
self.schema = None
self._description = None
self.metadata = metadata
self._field_descriptions = None
self._correlations = Correlation(self, "pearson")
self.validator = DataFileValidator(datafile=self)
......@@ -137,15 +162,21 @@ class DataFile(Resource):
@property
def description(self) -> dict[str, Any]:
"""Returns metadata about this file"""
if not self._description:
self._description = self._get_description()
return self._description
if self.metadata is None:
self.metadata = DataFileMetadata.from_reader(self.reader)
return {
"observations": self.metadata.length,
"num_fields": self.metadata.num_fields,
}
@property
def field_descriptions(self) -> dict[str, dict]:
"""Returns metadata about each column in this file"""
if not self._field_descriptions:
self._field_descriptions = self._get_field_descriptions()
self._field_descriptions = {
field: self.get_column(field).description for field in self.fields
}
return self._field_descriptions
@property
......@@ -231,15 +262,6 @@ class DataFile(Resource):
logger.debug("datafile page url: %s", url)
return url
# Private Methods ========================================================
def _get_description(self) -> dict[str, Any]:
obs = max(desc["Count"] for _, desc in self.field_descriptions.items())
return {"observations": obs, "num_fields": len(self.fields)}
def _get_field_descriptions(self):
return {field: self.get_column(field).description for field in self.fields}
# Class Methods ==========================================================
def as_dict(self, root_url: str = "") -> dict:
"""Returns a dictionary representation of this object"""
......@@ -292,6 +314,10 @@ class DataFile(Resource):
None
"""
self.schema = schema
if self.schema is None:
logger.warning("A none value was passed to assign_schema")
return
logger.info("assigning schema %s to %s", schema.filepath, self.filepath)
if not self.compatible_schema(schema):
my_error = (
......@@ -417,7 +443,8 @@ class DataFile(Resource):
df[field.name],
field.true_values,
field.false_values,
coerce=False)
coerce=False,
)
return df
......
from ...source.reader.itabular_data_reader import ITabularDataReader
from dataclasses import dataclass
@dataclass
class DataFileMetadata:
fields: list[str]
length: int
@property
def num_fields(self):
return len(self.fields)
@classmethod
def from_reader(cls, reader: ITabularDataReader):
fields = reader.get_fields()
length = reader.read_column(fields[0]).shape[0]
return cls(fields, length)
......@@ -35,7 +35,7 @@ class SchemasAssignmentFileTestCase(BaseTestCase):
testcase_id="schema_assignment_file_missing",
name="No Schema Assignment File Present",
fail_type="WARNING",
msg=f"No Schema Assignment File Present: {filename}",
msg="No Schema Assignment File Present",
testsuite_id="locate_data",
)
elif case == "invalid":
......
......@@ -3,3 +3,4 @@
from .resource import Resource
from .resource_list import ResourceList
from .duplicate_handler import remove_duplicate_resources
"""
This module contains the functions for handling duplicate resources.
"""
from . import Resource
from ...source import LocalDirectory
def remove_duplicate_resources(resources: list[Resource]) -> list[Resource]:
"""
Scan through the list of resources and identify which ones are the same file. Remove
whichever one is lower in the directory tree.
Args:
resources (list[Resource]): The list of resources to scan through
Returns:
list[Resource]: The list of resources with duplicates removed
"""
unique_resources = []
for resource in resources:
is_duplicate = False
for existing_resource in unique_resources.copy():
if type(resource.source) != type(existing_resource.source):
continue
if isinstance(resource.source, LocalDirectory):
is_relative = resource.source.is_child_or_parent_of(
existing_resource.source
)
is_file_duplicate = resource.filepath.endswith(
existing_resource.filepath
) or existing_resource.filepath.endswith(resource.filepath)
if is_relative and is_file_duplicate:
is_duplicate = True
break
if not is_duplicate:
unique_resources.append(resource)
return unique_resources
......@@ -5,16 +5,12 @@ This interface represents a generic resource that we want to process in some way
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
import datetime
@dataclass(kw_only=True)
class IResource(ABC):
"""Represents a data resource that can be accessed"""
filepath: str
@property
@abstractmethod
def size_in_bytes(self) -> int:
......
......@@ -7,10 +7,33 @@ This could be a local file or a remote file.
from dataclasses import dataclass
import datetime
from pathlib import Path
import random
import string
from .iresource import IResource
from ...source import ISource
from ...data_reader import IDataReader
from ...source import ISource, VirtualSource
from ...source.source_factory import SourceFactory
def generate_random_filename(length=8, extension="file"):
"""
Generate a random filename.
Used when we have a VirtualSource.
Args:
length (int):
The length of the random string to generate.
extension (str):
The extension to use for the filename.
Returns:
str:
The random filename.
"""
random_chars = "".join(random.choice(string.ascii_letters) for _ in range(length))
filename = f"{random_chars}.{extension}"
return filename
@dataclass(kw_only=True)
......@@ -19,22 +42,93 @@ class Resource(IResource):
Represents a generic resource that we want to process in some way-
"""
filepath: str
source: ISource
reader: IDataReader = None
size_in_bytes: int = None
last_modified: datetime.datetime = None
@property
def size_in_bytes(self) -> int:
if self.reader is None:
return 0
@classmethod
def from_dict(cls, data: dict) -> "Resource":
"""
Class method to create a Resource from a dictionary.
return self.reader.get_size_in_bytes()
The dictionary should have the following structure:
{
"filepath": "file1.csv",
"source": {
"type": "local_directory",
"location": "./my_dir",
},
"size_in_bytes": 100,
"last_modified": "2020-01-01T00:00:00Z",
}
@property
def last_modified(self) -> datetime.datetime:
if self.reader is None:
return datetime.datetime.now()
Args:
data (dict):
The dictionary to create the Resource from.
Returns:
Resource:
The Resource created from the dictionary.
"""
source = SourceFactory.from_dict(**data["source"])
date = datetime.datetime.strptime(data["last_modified"], "%Y-%m-%dT%H:%M:%SZ")
return self.reader.get_date_last_modified()
return cls(
filepath=data["filepath"],
source=source,
size_in_bytes=data["size_in_bytes"],
last_modified=date,
)
@classmethod
def contains_modified_resource(
cls, resource: "Resource", resources: list["Resource"]
) -> bool:
"""
Class method to check if the provided list of Resources contains a modified version of the
provided Resource.
Args:
resource (Resource):
The Resource to check for.
resources (list[Resource]):
The list of Resources to check in.
Returns:
bool:
True if the list contains a modified version of the provided Resource, False
otherwise.
"""
for entry in resources:
if resource.is_similar_to(entry):
return True
return False
def __post_init__(self):
# Ensure filepath is a string, not a Path object
if isinstance(self.filepath, Path):
self.filepath = self.filepath.as_posix()
if isinstance(self.source, VirtualSource):
self.filepath = generate_random_filename()
self.reader = self.source.get_reader(self.filepath)
if self.size_in_bytes is None:
self.size_in_bytes = self.reader.get_size_in_bytes()
if self.last_modified is None:
self.last_modified = self.reader.get_date_last_modified()
def __hash__(self) -> int:
# Create a hash from the the object attributes.
# Convert the source dictionary to a hashable tuple
source = hash(sum(hash(str(value)) for value in self.source.to_json().values()))
# Create a hash from the filepath, source, size_in_bytes, and last_modified
return hash((self.filepath, source, self.size_in_bytes, self.last_modified))
@property
def exists(self) -> bool:
......@@ -42,9 +136,6 @@ class Resource(IResource):
return False
return self.reader.exists()
def __post_init__(self):
self.filepath = Path(self.filepath).as_posix() if self.filepath else None
def is_similar_to(self, other: IResource) -> bool:
"""
Check if this resource is similar to the provided resource.
......@@ -56,8 +147,28 @@ class Resource(IResource):
Returns:
bool
"""
assert isinstance(other, IResource)
if not isinstance(other, Resource):
return False
return (
self.filepath == other.filepath
and self.reader.source == other.reader.source
and self.source == other.source
and (
self.size_in_bytes != other.size_in_bytes
or self.last_modified != other.last_modified
)
)
def to_dict(self) -> dict:
"""
Get a dictionary representation of the Resource.
Returns:
dict:
A dictionary representation of the Resource.
"""
return {
"filepath": self.filepath,
"source": self.source.to_json(),
"size_in_bytes": self.size_in_bytes,
"last_modified": self.last_modified.strftime("%Y-%m-%dT%H:%M:%S"),
}
"""
Factory Class for generating Resource objects.
Usage:
my_source = ISource()
# Create a ResourceFactory
factory = ResourceFactory(
source=my_source,
data_file_patterns=["*.csv"],
schema_file_patterns=["*.json"])
# Create a Resource from a filepath
resource = factory.get_resource("my_file.csv")
>>> Resource(filepath="my_file.csv", ...)
# Create a Resource from a filepath with a specific type
resource = factory.get_resource_by_extension("my_file.csv")
>>> DataFile(filepath="my_file.csv", ...)
"""
from pathlib import Path
from ...source import ISource
from ...objects import DataFile, Schema
from .resource import Resource, IResource
class ResourceFactory:
"""
Factory class for creating Resource objects from a filepath and a Source.
"""
def __init__(
self,
source: ISource,
data_file_patterns: list[str] = None,
schema_file_patterns: list[str] = None,
):
self.source = source
self.data_file_patterns = (
data_file_patterns if data_file_patterns else ["*.csv"]
)
self.schema_file_patterns = (
schema_file_patterns if schema_file_patterns else ["*.json"]
)
def _check_if_filepath_exists(self, filepath):
if not self.source.exists(filepath):
raise FileNotFoundError(
f"File {filepath} does not exist in source {self.source}."
)
def get_resource(self, filepath: str) -> Resource:
"""
Given a filepath, return a Resource object from the Factory Source
Args:
filepath (str): The filepath of the Resource to create
Returns:
Resource: The Resource object created from the filepath
Raises:
FileNotFoundError: If the filepath does not exist in the Sou
"""
self._check_if_filepath_exists(filepath)
my_resource = Resource(
filepath=filepath,
source=self.source,
size_in_bytes=self.source.get_size_in_bytes(filepath),
last_modified=self.source.get_date_last_modified(filepath),
)
return my_resource
def get_datafile(self, filepath: str) -> DataFile:
"""
Given a filepath, return a Datafile object from the Factory Source
Args:
filepath (str): The filepath of the Resource to create
Returns:
Resource: The Resource object created from the filepath
Raises:
FileNotFoundError: If the filepath does not exist in the Sou
"""
self._check_if_filepath_exists(filepath)
my_datafile = DataFile(
filepath=filepath,
source=self.source,
size_in_bytes=self.source.get_size_in_bytes(filepath),
last_modified=self.source.get_date_last_modified(filepath),
)
return my_datafile
def get_schema(self, filepath: str) -> Schema:
"""
Given a filepath, return a Schema object from the Factory Source
Args:
filepath (str): The filepath of the Resource to create
Returns:
Resource: The Resource object created from the filepath
Raises:
FileNotFoundError: If the filepath does not exist in the Sou
"""
self._check_if_filepath_exists(filepath)
my_schema = Schema(
filepath=filepath,
source=self.source,
size_in_bytes=self.source.get_size_in_bytes(filepath),
last_modified=self.source.get_date_last_modified(filepath),
)
return my_schema
def get_resource_by_extension(self, filepath: str) -> IResource:
"""
Given a filepath, return a Resource object from the Factory Source. If the filepath matches
a pattern in the data_file_patterns, return a DataFile object. If the filepath matches a
pattern in the schema_file_patterns, return a Schema object. Otherwise, return a normal
Resource object.
Args:
filepath (str): The filepath of the Resource to create
Returns:
Resource: The Resource object created from the filepath
Raises:
FileNotFoundError: If the filepath does not exist in the Source
"""
if not self.source.exists(filepath):
raise FileNotFoundError(
f"File {filepath} does not exist in source {self.source}."
)
# Check if the filepath matches one of our data file patterns
for pattern in self.data_file_patterns:
if Path(filepath).match(pattern):
return self.get_datafile(filepath)
# Check if the filepath matches one of our schema file patterns
for pattern in self.schema_file_patterns:
if Path(filepath).match(pattern):
return self.get_schema(filepath)
# In all other cases, try to create a normal Resource object
return self.get_resource(filepath)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment