Select Git revision
-
Petar Hristov authoredPetar Hristov authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
BaseRdfStoreConnector.cs 18.24 KiB
using Coscine.Metadata.Util;
using Polly;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using VDS.RDF;
using VDS.RDF.Query;
using VDS.RDF.Storage;
using VDS.RDF.Update;
using VDS.RDF.Writing;
namespace Coscine.Metadata
{
/// <summary>
/// Provides the base RDF functionality
/// </summary>
public class BaseRdfStoreConnector : IDisposable
{
/// <summary>
/// The SPARQL Update Endpoint
/// </summary>
public SparqlRemoteUpdateEndpoint UpdateEndpoint { get; init; }
/// <summary>
/// The SPARQL Query Endpoint
/// </summary>
public SparqlRemoteEndpoint QueryEndpoint { get; init; }
/// <summary>
/// A Connector for writing and reading SPARQL
/// </summary>
public ReadWriteSparqlConnector ReadWriteSparqlConnector { get; init; }
/// <summary>
/// How many items can be retrieved at a time from the metadata store
/// </summary>
public int QUERY_LIMIT { get; } = 1000;
/// <summary>
/// Default base constructur
/// </summary>
/// <param name="sparqlEndpoint">URL of the SPARQL Query and Update endpoint</param>
public BaseRdfStoreConnector(string sparqlEndpoint = "http://localhost:8890/sparql")
{
UpdateEndpoint = new SparqlRemoteUpdateEndpoint(new Uri(string.Format(sparqlEndpoint)));
QueryEndpoint = new SparqlRemoteEndpoint(new Uri(string.Format(sparqlEndpoint)));
ReadWriteSparqlConnector = new ReadWriteSparqlConnector(QueryEndpoint, UpdateEndpoint);
}
/// <summary>
/// Checks if the metadatastore has the given graph.
/// </summary>
/// <param name="graphName">Uri formatted as string</param>
/// <returns>bool</returns>
public bool HasGraph(string graphName)
{
return HasGraph(new Uri(graphName));
}
/// <summary>
/// Checks if the metadatastore has the given graph.
/// </summary>
/// <param name="graphUri"></param>
/// <returns>bool</returns>
public bool HasGraph(Uri graphUri)
{
var queryString = new SparqlParameterizedString
{
CommandText = "ASK WHERE { GRAPH @graph { ?s ?p ?o } }"
};
queryString.SetUri("graph", graphUri);
var result = WrapRequest(() => QueryEndpoint.QueryWithResultSet(queryString.ToString()));
return result.Result;
}
/// <summary>
/// Returns a graph from the metadatastore.
/// Returns an empty graph, if the graph does not exists.
/// </summary>
/// <param name="graphName">Uri formatted as string</param>
/// <returns></returns>
public IGraph GetGraph(string graphName)
{
return GetGraph(new Uri(graphName));
}
/// <summary>
/// Returns a graph from the metadatastore.
/// Returns an empty graph, if the graph does not exists.
/// </summary>
/// <param name="graphUri"></param>
/// <returns></returns>
public IGraph GetGraph(Uri graphUri)
{
var graph = new Graph
{
BaseUri = graphUri
};
// Get the number of triples inside the graph
var queryString = new SparqlParameterizedString
{
CommandText = "SELECT COUNT(*) AS ?count WHERE { GRAPH @graph { ?s ?p ?o } }"
};
queryString.SetUri("graph", graphUri);
var countResult = WrapRequest(() => QueryEndpoint.QueryWithResultSet(queryString.ToString()));
var literalNode = (ILiteralNode?)countResult.FirstOrDefault()?.Value("count");
var nodeNumberOfResult = literalNode?.Value;
if (!long.TryParse(nodeNumberOfResult, out var numberOfResult))
{
return graph;
}
// iterates over results because a query limit exists (pagination)
for (var offset = 0; offset < numberOfResult; offset += QUERY_LIMIT)
{
queryString = new SparqlParameterizedString
{
CommandText = $"SELECT ?s ?p ?o WHERE {{ GRAPH @graph {{ ?s ?p ?o }} }} LIMIT {QUERY_LIMIT} OFFSET {offset}"
};
queryString.SetUri("graph", graphUri);
using var results = WrapRequest(() => QueryEndpoint.QueryWithResultSet(queryString.ToString()));
if (results is null)
{
continue;
}
foreach (var result in results)
{
graph.Assert(result.Value("s"), result.Value("p"), result.Value("o"));
}
}
return graph;
}
/// <summary>
/// Returns an empty graph which has an assertion and retraction list
/// </summary>
/// <param name="graphName">Uri formatted as string</param>
/// <returns></returns>
public SmallUpdateGraph GetEmptySmallUpdateGraph(string graphName)
{
return GetEmptySmallUpdateGraph(new Uri(graphName));
}
/// <summary>
/// Returns an empty graph which has an assertion and retraction list
/// </summary>
/// <param name="graphUri"></param>
/// <returns></returns>
public SmallUpdateGraph GetEmptySmallUpdateGraph(Uri graphUri)
{
return new SmallUpdateGraph()
{
BaseUri = graphUri
};
}
/// <summary>
/// Adds the given graph to the metadatastore, makes use of the SmallUpdateGraph, if possible.
/// </summary>
/// <param name="graph"></param>
public void AddGraph(IGraph graph)
{
if (graph is SmallUpdateGraph smallUpdateGraph)
{
AddToGraph(graph, smallUpdateGraph.AssertList);
RemoveFromGraph(graph, smallUpdateGraph.RetractList);
}
else
{
var exists = HasGraph(graph.BaseUri);
if (exists)
{
// Clear the existing graph from the store
ClearGraph(graph.BaseUri);
}
AddToGraph(graph, graph.Triples);
}
}
/// <summary>
/// Adds the given graph to the metadatastore, makes use of the SmallUpdateGraph, if possible.
/// </summary>
/// <param name="graph"></param>
public Task AddGraphAsync(IGraph graph)
{
return Task.Run(() => AddGraph(graph));
}
/// <summary>
/// Adds the given triples to the given graph in the metadatastore.
/// </summary>
/// <param name="graph"></param>
/// <param name="triples"></param>
public void AddToGraph(IGraph graph, IEnumerable<Triple> triples)
{
// Chunking since the size otherwise can be too large
foreach (var triplesChunk in triples.Chunk(100))
{
WrapRequest(() => ReadWriteSparqlConnector.UpdateGraph(graph.BaseUri, triplesChunk, new List<Triple>()));
}
}
/// <summary>
/// Adds the given triples to the given graph in the metadatastore.
/// </summary>
/// <param name="graph"></param>
/// <param name="triples"></param>
public Task AddToGraphAsync(IGraph graph, IEnumerable<Triple> triples)
{
return Task.Run(() => AddToGraph(graph, triples));
}
/// <summary>
/// Same as AddGraph
/// </summary>
/// <param name="graph"></param>
public void UpdateGraph(IGraph graph)
{
AddGraph(graph);
}
/// <summary>
/// Same as AddGraphAsync
/// </summary>
/// <param name="graph"></param>
public async Task UpdateGraphAsync(IGraph graph)
{
await AddGraphAsync(graph);
}
/// <summary>
/// Parses given RDF strings and returns a graph object
/// </summary>
/// <param name="data"></param>
/// <param name="parser"></param>
/// <returns></returns>
public IGraph ParseGraph(string data, IRdfReader parser)
{
var graph = new Graph();
parser.Load(graph, new StringReader(data));
return graph;
}
/// <summary>
/// Converts Graphs to string
/// </summary>
/// <param name="graph"></param>
/// <param name="writer"></param>
/// <returns></returns>
public string SerializeGraph(IGraph graph, BaseRdfWriter writer)
{
return VDS.RDF.Writing.StringWriter.Write(graph, writer);
}
/// <summary>
/// Clears the given graph
/// </summary>
/// <param name="graphName">Uri formatted as string</param>
public void ClearGraph(string graphName)
{
ClearGraph(new Uri(graphName));
}
/// <summary>
/// Clears the given graph
/// </summary>
/// <param name="graphUri"></param>
public void ClearGraph(Uri graphUri)
{
var queryString = new SparqlParameterizedString
{
CommandText = "CLEAR GRAPH @graph"
};
queryString.SetUri("graph", graphUri);
WrapRequest(() => QueryEndpoint.QueryRaw(queryString.ToString()));
}
/// <summary>
/// Clears the given graph
/// </summary>
/// <param name="graphUri"></param>
public Task ClearGraphAsync(Uri graphUri)
{
return Task.Run(() => ClearGraph(graphUri));
}
/// <summary>
/// Deletes the given graph from the metadata store
/// </summary>
/// <param name="graphName">Uri formatted as string</param>
public void DeleteGraph(string graphName)
{
DeleteGraph(new Uri(graphName));
}
/// <summary>
/// Deletes the given graph from the metadata store
/// </summary>
/// <param name="graphUri"></param>
public void DeleteGraph(Uri graphUri)
{
WrapRequest(() => ReadWriteSparqlConnector.DeleteGraph(graphUri.AbsoluteUri));
}
/// <summary>
/// Deletes the given graph from the metadata store
/// </summary>
/// <param name="graphUri"></param>
public Task DeleteGraphAsync(Uri graphUri)
{
return Task.Run(() => DeleteGraph(graphUri));
}
/// <summary>
/// Retracts the given triples from the given graph in the metadatastore
/// </summary>
/// <param name="graph"></param>
/// <param name="triples"></param>
public void RemoveFromGraph(IGraph graph, IEnumerable<Triple> triples)
{
// Chunking since the size otherwise can be too large
foreach (var triplesChunk in triples.Chunk(100))
{
WrapRequest(() => ReadWriteSparqlConnector.UpdateGraph(graph.BaseUri, new List<Triple>(), triplesChunk));
}
}
/// <summary>
/// Retracts the given triples from the given graph in the metadatastore
/// </summary>
/// <param name="graph"></param>
/// <param name="triples"></param>
public Task RemoveFromGraphAsync(IGraph graph, IEnumerable<Triple> triples)
{
return Task.Run(() => RemoveFromGraph(graph, triples));
}
/// <summary>
/// Creates a named graph in the metadatastore
/// </summary>
/// <param name="graphName">Uri formatted as string</param>
public void CreateNamedGraph(string graphName)
{
CreateNamedGraph(new Uri(graphName));
}
/// <summary>
/// Creates a named graph in the metadatastore
/// </summary>
/// <param name="graphUri"></param>
public void CreateNamedGraph(Uri graphUri)
{
var queryString = new SparqlParameterizedString
{
CommandText = "CREATE GRAPH @graph"
};
queryString.SetUri("graph", graphUri);
WrapRequest(() => QueryEndpoint.QueryRaw(queryString.ToString()));
}
/// <summary>
/// Parses given string data and creates a given graph in the metadata store
/// </summary>
/// <param name="graphUri"></param>
/// <param name="data"></param>
/// <param name="parser"></param>
public void ParseAndAddGraph(Uri graphUri, string data, IRdfReader parser)
{
if (HasGraph(graphUri))
{
ClearGraph(graphUri);
}
else
{
CreateNamedGraph(graphUri);
}
var graph = ParseGraph(data, parser);
AddGraph(graph);
}
/// <summary>
/// Parses given string data and creates a given graph in the metadata store
/// </summary>
/// <param name="data"></param>
/// <param name="parser"></param>
public void ParseAndAddGraph(string data, IRdfReader parser)
{
var graph = ParseGraph(data, parser);
var graphName = graph.BaseUri.ToString();
if (HasGraph(graphName))
{
ClearGraph(graphName);
}
else
{
CreateNamedGraph(graphName);
}
AddGraph(graph);
}
/// <summary>
/// Get Triples that follow certain subject, predicate or object patterns
/// </summary>
/// <param name="subject"></param>
/// <param name="predicate"></param>
/// <param name="obj"></param>
/// <returns></returns>
public IEnumerable<Triple> GetTriplesByObject(Uri? subject, Uri? predicate, Uri obj)
{
return GetTriplesByObjectWrapper(subject, predicate, (cmdString) => cmdString.SetUri("o", obj));
}
/// <summary>
/// Get Triples that follow certain subject, predicate or object patterns
/// </summary>
/// <param name="subject"></param>
/// <param name="predicate"></param>
/// <param name="obj"></param>
/// <returns></returns>
public IEnumerable<Triple> GetTriplesByObject(Uri? subject, Uri? predicate, string obj)
{
return GetTriplesByObjectWrapper(subject, predicate, (cmdString) => cmdString.SetLiteral("o", obj));
}
private IEnumerable<Triple> GetTriplesByObjectWrapper(Uri? subject, Uri? predicate, Action<SparqlParameterizedString> objWrapper)
{
var cmdString = new SparqlParameterizedString
{
CommandText = "SELECT ?s ?p ?o WHERE { ?s ?p ?o . " +
((subject != null) ? "@s " : "?s ") +
((predicate != null) ? "@p " : "?p ") +
"@o }"
};
if (subject != null)
{
cmdString.SetUri("s", subject);
}
if (predicate != null)
{
cmdString.SetUri("p", predicate);
}
objWrapper(cmdString);
var resultSet = WrapRequest(() => QueryEndpoint.QueryWithResultSet(cmdString.ToString()));
var triples = new List<Triple>();
foreach (SparqlResult r in resultSet)
{
triples.Add(new Triple(r.Value("s"), r.Value("p"), r.Value("o")));
}
return triples;
}
/// <summary>
/// Lists graph filtered by the given identifier
/// </summary>
/// <param name="identifier"></param>
/// <returns></returns>
public IEnumerable<Uri> ListGraphs(string identifier)
{
var cmdString = new SparqlParameterizedString
{
CommandText = @"SELECT DISTINCT ?g
WHERE { GRAPH ?g { ?s ?p ?o }
FILTER(contains(str(?g), @graph)) }"
};
cmdString.SetLiteral("graph", identifier);
var resultSet = WrapRequest(() => QueryEndpoint.QueryWithResultSet(cmdString.ToString()));
var graphs = new List<Uri>();
foreach (SparqlResult r in resultSet)
{
var uriNode = r.Value("g") as UriNode;
if (uriNode is not null)
{
graphs.Add(uriNode.Uri);
}
}
return graphs;
}
/// <summary>
/// Creates graph if needed (silent disables the "error" for existing graph).
/// </summary>
/// <param name="graphIri"></param>
public void EnsureGraph(string graphIri)
{
var commandString = new SparqlParameterizedString
{
CommandText = "CREATE SILENT GRAPH @graphIri"
};
commandString.SetUri("graphIri", new Uri(graphIri));
WrapRequest(() => QueryEndpoint.QueryRaw(commandString.ToString()));
}
/// <summary>
/// Retry Virtuoso Requests since they sometimes just fail
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public static void WrapRequest(Action action)
{
Policy
.Handle<Exception>()
.WaitAndRetry(5, _ => TimeSpan.FromMilliseconds(200))
.Execute(() => action.Invoke());
}
/// <summary>
/// Retry Virtuoso Requests since they sometimes just fail
/// </summary>
/// <typeparam name="W"></typeparam>
/// <param name="function"></param>
/// <returns></returns>
public static W WrapRequest<W>(Func<W> function)
{
return Policy
.Handle<Exception>()
.WaitAndRetry(5, _ => TimeSpan.FromMilliseconds(200))
.ExecuteAndCapture(() => function.Invoke()).Result;
}
/// <summary>
/// Disposes all held connections.
/// </summary>
public void Dispose()
{
ReadWriteSparqlConnector.Dispose();
GC.SuppressFinalize(this);
}
}
}