Skip to content
Snippets Groups Projects

New: Create a Metadata Extraction Cronjob

Merged Benedikt Heinrichs requested to merge Issue/1788-extractionCronjob into dev
Files
39
using Coscine.Configuration;
using Coscine.Database.Models;
using Coscine.Metadata;
using Coscine.ResourceTypes;
using Coscine.ResourceTypes.Base;
using Coscine.ResourceTypes.Base.Models;
using Org.OpenAPITools.Api;
using Org.OpenAPITools.Model;
using VDS.RDF.Query;
using VDS.RDF;
using MetadataExtractorCron.Util;
using VDS.RDF.Parsing;
using System.Globalization;
using System.Security.Cryptography;
namespace MetadataExtractorCron.Extractors;
public class CoscineMetadataExtractor : IMetadataExtractor
{
private readonly string _resourceUrlPrefix = "https://purl.org/coscine/resources";
private readonly IConfiguration _configuration;
private readonly DefaultApi _apiClient;
private readonly RdfStoreConnector _rdfStoreConnector;
private readonly MetadataGraphsCreator _metadataGraphsCreator;
private const string metadataExtractionVersionUrl = "https://purl.org/coscine/terms/metatadataextraction#version";
private const string dcatdistributionUrl = "http://www.w3.org/ns/dcat#distribution";
private const string partOfUri = "http://purl.org/dc/terms/isPartOf";
private const string aUri = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type";
private const string dctermsModifiedUri = "http://purl.org/dc/terms/modified";
private const string rdfSourceUri = "http://www.w3.org/ns/ldp#RDFSource";
private const string trellisGraphUri = "http://www.trellisldp.org/ns/trellis#PreferServerManaged";
public CoscineMetadataExtractor()
{
_configuration = new ConsulConfiguration();
_apiClient = new DefaultApi(
_configuration.GetStringAndWait(
"coscine/local/metadataextractor/url",
"https://metadataextractor.otc.coscine.dev/"
)
);
_rdfStoreConnector = new RdfStoreConnector(_configuration.GetStringAndWait("coscine/local/virtuoso/additional/url"));
_metadataGraphsCreator = new MetadataGraphsCreator(_rdfStoreConnector);
}
public async Task PerformExtraction()
{
var modelVersion = await _apiClient.GetVersionWorkerAsync();
var version = modelVersion._Version;
var metadataExtractionModel = new MetadataExtractionModel();
var resourceModel = new ResourceModel();
foreach (var extraction in metadataExtractionModel.GetAllWhere((extration) => extration.Activated))
{
var resourceId = extraction.ResourceId;
Console.WriteLine($"Working on resource {resourceId}");
var resource = resourceModel.GetByIdIncludingDeleted(resourceId);
var resourceTypeDefinition = ResourceTypeFactory.Instance.GetResourceType(resource);
if (resourceTypeDefinition == null)
{
Console.WriteLine($"Broken resource type for resource {resourceId}");
continue;
}
var resourceTypeOptions = resourceModel.GetResourceTypeOptions(resourceId);
var fileInfos = await ReceiveAllFiles(resourceTypeDefinition, resourceId.ToString(), resourceTypeOptions);
foreach (var file in fileInfos.Where((fileInfo) => fileInfo.HasBody))
{
if (file.BodyBytes > 16 * 1000 * 1000)
{
Console.WriteLine($"Skipping {file.Key} on {resourceId} since it has a too large byte size");
continue;
}
Console.WriteLine($"Iterating over {file.Key} on {resourceId}");
CreateMetadataSetsIfDontExist(resourceId.ToString(), file, fileInfos);
if (!HasCurrentMetadataExtracted(resourceId.ToString(), file))
{
Console.WriteLine($"Extracting metadata for {file.Key} on {resourceId}");
try
{
var extractedMetadata = await ExtractMetadata(resourceId.ToString(), file, resourceTypeDefinition, resourceTypeOptions);
await StoreExtractedMetadata(resourceId.ToString(), file, extractedMetadata, resourceTypeDefinition, resourceTypeOptions);
}
catch (Exception e)
{
Console.WriteLine($"Error extracting metadata for {file.Key} on {resourceId} with error message: {e.Message}, Inner: {(e.InnerException != null ? e.InnerException.Message : "none")}");
}
}
else
{
Console.WriteLine($"Metadata for {file.Key} on {resourceId} already exists");
}
}
}
}
private async Task<IEnumerable<ResourceEntry>> ReceiveAllFiles(BaseResourceType resourceTypeDefinition, string resourceId, Dictionary<string, string>? resourceTypeOptions, string path = "")
{
var fileInfos = new List<ResourceEntry>();
var currentFileInfos = await resourceTypeDefinition.ListEntries(resourceId, path, resourceTypeOptions);
fileInfos.AddRange(currentFileInfos);
foreach (var currentFileInfo in currentFileInfos.Where((currentFileInfo) => !currentFileInfo.HasBody))
{
fileInfos.AddRange(await ReceiveAllFiles(resourceTypeDefinition, resourceId, resourceTypeOptions, currentFileInfo.Key));
}
return fileInfos;
}
private void CreateMetadataSetsIfDontExist(string resourceId, ResourceEntry entry, IEnumerable<ResourceEntry> fileInfos)
{
var resourceGraphName = $"{_resourceUrlPrefix}/{resourceId}";
var newFileGraphName = $"{resourceGraphName}/{entry.Key}";
if (!newFileGraphName.EndsWith("/"))
{
newFileGraphName += "/";
}
var existingGraphs = ListGraphs(newFileGraphName);
if (!existingGraphs.Any())
{
Console.WriteLine($"Creating graphs for {newFileGraphName} since they did not exist before!");
_metadataGraphsCreator.CreateGraphs(resourceId, entry, fileInfos);
}
}
private IEnumerable<Uri> ListGraphs(string id)
{
var cmdString = new SparqlParameterizedString
{
CommandText = @"SELECT DISTINCT ?g
WHERE { GRAPH ?g { ?s ?p ?o }
FILTER(contains(str(?g), @graph)) }"
};
cmdString.SetLiteral("graph", id);
var resultSet = _rdfStoreConnector.QueryEndpoint.QueryWithResultSet(cmdString.ToString());
var graphs = new List<Uri>();
foreach (SparqlResult r in resultSet)
{
var uriNode = r.Value("g") as UriNode;
if (uriNode is not null)
{
graphs.Add(uriNode.Uri);
}
}
return graphs;
}
private bool HasCurrentMetadataExtracted(string resourceId, ResourceEntry entry)
{
var resourceGraphName = $"{_resourceUrlPrefix}/{resourceId}";
var newFileGraphName = $"{resourceGraphName}/{entry.Key}";
if (!newFileGraphName.EndsWith("/"))
{
newFileGraphName += "/";
}
var existingGraphs = ListGraphs(newFileGraphName);
var recentDataVersion = VersionUtil.GetRecentDataVersion(existingGraphs);
var recentDataExtractedVersion = VersionUtil.GetRecentDataExtractedVersion(existingGraphs);
return
recentDataExtractedVersion != null
&& recentDataVersion != null
&& recentDataExtractedVersion.AbsoluteUri.Contains(recentDataVersion.AbsoluteUri)
&& recentDataExtractedVersion.AbsoluteUri != recentDataVersion.AbsoluteUri;
}
private async Task<MetadataOutput> ExtractMetadata(string resourceId, ResourceEntry entry, BaseResourceType resourceTypeDefinition, Dictionary<string, string>? resourceTypeOptions)
{
var loadedEntry = await resourceTypeDefinition.LoadEntry(resourceId, entry.Key, resourceTypeOptions);
if (loadedEntry is null)
{
throw new NullReferenceException("The resulting stream of the loaded entry is null.");
}
var extractedOutputs = await _apiClient.PostMetadataExtractorWorkerAsync(
loadedEntry,
$"{resourceId}/{entry.Key}",
null,
entry.Created?.ToString("o", CultureInfo.InvariantCulture),
entry.Modified?.ToString("o", CultureInfo.InvariantCulture)
);
return extractedOutputs[0];
}
private async Task StoreExtractedMetadata(string resourceId, ResourceEntry entry, MetadataOutput extractedMetadata, BaseResourceType resourceTypeDefinition, Dictionary<string, string>? resourceTypeOptions)
{
var metadataExtractorVersion = (await _apiClient.GetVersionWorkerAsync())._Version;
var resourceGraphName = $"{_resourceUrlPrefix}/{resourceId}";
var newFileGraphName = $"{resourceGraphName}/{entry.Key}";
var newFileGraphNameAddon = newFileGraphName;
if (!newFileGraphNameAddon.EndsWith("/"))
{
newFileGraphNameAddon += "/";
}
var existingGraphs = ListGraphs(newFileGraphNameAddon);
var recentDataVersion = VersionUtil.GetRecentDataVersion(existingGraphs);
var recentMetadataVersion = VersionUtil.GetRecentMetadataVersion(existingGraphs);
await CreateHashData(resourceId, entry, resourceTypeDefinition, resourceTypeOptions, newFileGraphNameAddon, recentDataVersion);
if (recentDataVersion is null)
{
throw new NullReferenceException("The recent data version is null and can't be used.");
}
var recentDataExtractedVersion = new Uri(recentDataVersion.AbsoluteUri + "&extracted=true");
if (recentMetadataVersion is null)
{
throw new NullReferenceException("The recent metadata version is null and can't be used.");
}
var recentMetadataExtractedVersion = new Uri(recentMetadataVersion.AbsoluteUri + "&extracted=true");
var tripleStore = new TripleStore();
tripleStore.LoadFromString(extractedMetadata.Metadata, new TriGParser(TriGSyntax.Recommendation));
FormatResultMetadata(tripleStore, recentDataExtractedVersion, recentMetadataExtractedVersion);
GraphStorer.StoreGraphs(tripleStore.Graphs, _rdfStoreConnector);
var trellisGraph = _rdfStoreConnector.GetGraph(trellisGraphUri);
var triples = new List<Triple>();
AddToTrellis(trellisGraph, rdfSourceUri, newFileGraphName, recentDataExtractedVersion.AbsoluteUri, triples);
AddToTrellis(trellisGraph, rdfSourceUri, newFileGraphName, recentMetadataExtractedVersion.AbsoluteUri, triples);
GraphStorer.AddToGraph(trellisGraph, triples, _rdfStoreConnector);
var newDataFileGraphName = $"{newFileGraphName}/@type=data";
var newMetadataFileGraphName = $"{newFileGraphName}/@type=metadata";
var dataGraph = CreateOrGetGraph(newDataFileGraphName);
var metadataGraph = CreateOrGetGraph(newMetadataFileGraphName);
dataGraph.Assert(new Triple(
dataGraph.CreateUriNode(new Uri(newDataFileGraphName)),
dataGraph.CreateUriNode(new Uri(dcatdistributionUrl)),
dataGraph.CreateUriNode(recentDataExtractedVersion)
));
dataGraph.Assert(new Triple(
dataGraph.CreateUriNode(recentDataExtractedVersion),
dataGraph.CreateUriNode(new Uri(metadataExtractionVersionUrl)),
dataGraph.CreateLiteralNode(metadataExtractorVersion)
));
metadataGraph.Assert(new Triple(
metadataGraph.CreateUriNode(new Uri(newMetadataFileGraphName)),
metadataGraph.CreateUriNode(new Uri(dcatdistributionUrl)),
metadataGraph.CreateUriNode(recentMetadataExtractedVersion)
));
metadataGraph.Assert(new Triple(
metadataGraph.CreateUriNode(recentMetadataExtractedVersion),
metadataGraph.CreateUriNode(new Uri(metadataExtractionVersionUrl)),
metadataGraph.CreateLiteralNode(metadataExtractorVersion)
));
metadataGraph.Assert(new Triple(
metadataGraph.CreateUriNode(recentMetadataVersion),
metadataGraph.CreateUriNode(new Uri("http://purl.org/fdp/fdp-o#isMetadataOf")),
metadataGraph.CreateUriNode(recentDataVersion)
));
var provenanceGraphs = new List<IGraph> { dataGraph, metadataGraph };
GraphStorer.StoreGraphs(provenanceGraphs, _rdfStoreConnector);
}
private async Task CreateHashData(string resourceId, ResourceEntry entry, BaseResourceType resourceTypeDefinition, Dictionary<string, string>? resourceTypeOptions, string newFileGraphNameAddon, Uri? recentDataVersion)
{
var dataGraphName = $"{newFileGraphNameAddon}@type=data";
var dataGraph = CreateOrGetGraph(dataGraphName);
var hashTriples = new List<Triple>();
var loadedEntry = await resourceTypeDefinition.LoadEntry(resourceId, entry.Key, resourceTypeOptions);
if (loadedEntry is null)
{
throw new NullReferenceException("The resulting stream of the loaded entry is null, when trying to hash the data.");
}
var sha512Hash = Convert.ToBase64String(HashUtil.HashData(loadedEntry, HashAlgorithmName.SHA512));
var dataGraphId = recentDataVersion;
var hashGraphId = new Uri($"{dataGraphId?.AbsoluteUri}&hash={Guid.NewGuid()}");
var dataGraphSubject = dataGraph.CreateUriNode(dataGraphId);
var hashSubject = dataGraph.CreateUriNode(hashGraphId);
hashTriples.Add(new Triple(dataGraphSubject,
dataGraph.CreateUriNode(new Uri("http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#hashType")),
hashSubject));
hashTriples.Add(new Triple(hashSubject,
dataGraph.CreateUriNode(new Uri("http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#hashFunction")),
dataGraph.CreateLiteralNode("SHA512")));
hashTriples.Add(new Triple(hashSubject,
dataGraph.CreateUriNode(new Uri("http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#hashValue")),
dataGraph.CreateLiteralNode(sha512Hash, new Uri("http://www.w3.org/2001/XMLSchema#hexBinary"))));
GraphStorer.AddToGraph(dataGraph, hashTriples, _rdfStoreConnector);
}
private static void FormatResultMetadata(TripleStore tripleStore, Uri dataExtractGraph, Uri metadataExtractGraph)
{
foreach (var graph in tripleStore.Graphs.ToArray())
{
if (graph.BaseUri != dataExtractGraph && graph.BaseUri != metadataExtractGraph)
{
tripleStore.Remove(graph.BaseUri);
if (graph.BaseUri.AbsoluteUri.Contains("type=data"))
{
graph.BaseUri = dataExtractGraph;
}
else
{
graph.BaseUri = metadataExtractGraph;
}
tripleStore.Add(graph, true);
}
}
}
private static void AddToTrellis(IGraph trellisGraph, string ldpAssignment, string thePartUri, string graphUri, ICollection<Triple> triples)
{
var setGraphNode = trellisGraph.CreateUriNode(new Uri(graphUri));
var setThePartNode = trellisGraph.CreateUriNode(new Uri(thePartUri));
var triple = new Triple(
setGraphNode,
trellisGraph.CreateUriNode(new Uri(partOfUri)),
setThePartNode
);
if (!trellisGraph.ContainsTriple(triple))
{
triples.Add(triple);
trellisGraph.Assert(triple);
var assignmentTriple = new Triple(
setGraphNode,
trellisGraph.CreateUriNode(new Uri(aUri)),
trellisGraph.CreateUriNode(new Uri(ldpAssignment))
);
triples.Add(assignmentTriple);
trellisGraph.Assert(assignmentTriple);
AddModifiedDate(trellisGraph, graphUri, triples);
}
}
private IGraph CreateOrGetGraph(string graphUrl)
{
var entryAlreadyExists = _rdfStoreConnector.HasGraph(graphUrl);
return entryAlreadyExists
? _rdfStoreConnector.GetGraph(graphUrl)
: new Graph()
{
BaseUri = new Uri(graphUrl)
};
}
private static void AddModifiedDate(IGraph graph, string root, ICollection<Triple> triples)
{
var dcTermsModifiedNode = graph.CreateUriNode(new Uri(dctermsModifiedUri));
var rootNode = graph.CreateUriNode(new Uri(root));
if (!graph.GetTriplesWithSubjectPredicate(rootNode, dcTermsModifiedNode).Any())
{
var triple = new Triple(
rootNode,
dcTermsModifiedNode,
graph.CreateLiteralNode(
DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
new Uri(XmlSpecsHelper.XmlSchemaDataTypeDateTime)
)
);
triples.Add(triple);
graph.Assert(triple);
}
}
}
\ No newline at end of file
Loading