Select Git revision
-
Marcel Nellesen authoredMarcel Nellesen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ElasticsearchSearchClient.cs 14.07 KiB
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace SemanticSearchImplementation
{
/// <summary>
/// Implements necessary functions to use Elasticsearch as a search engine.
/// </summary>
/// <inheritdoc cref="ISearchClient"/>
public class ElasticsearchSearchClient : ISearchClient
{
// API
private const string ALIASES = "_aliases";
private const string SEARCH = "_search";
private const string MAPPING = "_mapping";
private const string BULK = "_bulk";
private static readonly HttpClient client = new HttpClient();
private readonly string baseUrl;
private string _index;
public ElasticsearchSearchClient(string server = "localhost", string port = "9200")
{
baseUrl = $"http://{server}:{port}/";
client.DefaultRequestHeaders.Add("Accept", "application/json");
}
public void ChangeIndex(string index)
{
_index = index;
}
public async Task<IDictionary<string, string>> GetMappingAsync()
{
var response = await client.GetAsync(baseUrl + _index + "/" + MAPPING);
JObject jObject = JObject.Parse(await response.Content.ReadAsStringAsync());
var fieldMappings = (JObject)jObject[_index]["mappings"]["properties"];
IDictionary<string, string> mapping = new Dictionary<string, string>();
foreach (var prop in fieldMappings)
{
mapping.Add(prop.Key, (string) prop.Value["type"]);
}
return mapping;
}
/// <summary>
/// Queries the document ID for a metadata graph.
/// </summary>
/// <param name="graphName">ID of the metadata graph.</param>
/// <returns>ID of the Elasticsearch document.</returns>
private async Task<string> GetIdFromGraphNameAsync(string graphName)
{
var content = new JObject() {
new JProperty("query", new JObject
{
new JProperty("term", new JObject
{
new JProperty(ElasticsearchIndexMapper.LABEL_GRAPHNAME, new JObject
{
new JProperty("value", graphName)
})
})
})
};
var response = await client.PostAsync(baseUrl + _index + "/" + SEARCH, CreateJsonContent(content));
JObject jObject = JObject.Parse(await response.Content.ReadAsStringAsync());
if ((int)jObject["hits"]["total"]["value"] == 1)
{
return (string)jObject["hits"]["hits"][0]["_id"];
}
else
{
return null;
}
}
/// <summary>
/// Handles response of a HTTP request.
/// </summary>
/// <param name="response">The response of a HTTP request.</param>
private void HandleResponse(HttpResponseMessage response)
{
if (!response.IsSuccessStatusCode)
{
// TODO
Console.WriteLine(response.StatusCode);
Console.WriteLine(response.Content);
Console.WriteLine(response.RequestMessage);
}
}
// create json content from JObject
private HttpContent CreateJsonContent(JObject content)
{
return new StringContent(content.ToString(), Encoding.UTF8, "application/json");
}
// create json content from string
private HttpContent CreateJsonContent(string content)
{
return new StringContent(content, Encoding.UTF8, "application/json");
}
public async Task CreateIndexAsync(JObject content, string index)
{
var response = await client.PutAsync(baseUrl + index, CreateJsonContent(content));
response.EnsureSuccessStatusCode();
}
public async Task AddDocumentsAsync(IEnumerable<JObject> documents)
{
var contentList = new List<string>();
foreach (var document in documents)
{
// create new document
contentList.Add($"{{ \"index\" : {{ }} }}");
contentList.Add(document.ToString().Replace("\n", "").Replace("\r", ""));
}
var content = String.Join("\n", contentList) + "\n";
var response = await client.PostAsync(baseUrl + _index + "/" + BULK, CreateJsonContent(content));
HandleResponse(response);
}
public async Task SwitchAliasAsync(string from, string to)
{
var jObject = new JObject {
new JProperty("actions", new JArray
{
new JObject
{
new JProperty("remove", new JObject
{
new JProperty("alias", ElasticsearchIndexMapper.DEFAULT_ALIAS_NAME),
new JProperty("index", from)
})
},
new JObject
{
new JProperty("add", new JObject
{
new JProperty("alias", ElasticsearchIndexMapper.DEFAULT_ALIAS_NAME),
new JProperty("index", to)
})
}
})
};
var response = await client.PostAsync(baseUrl + ALIASES, CreateJsonContent(jObject));
response.EnsureSuccessStatusCode();
}
public async Task DeleteIndexAsync(string index)
{
var response = await client.DeleteAsync(baseUrl + index);
HandleResponse(response);
}
public async Task AddDocumentAsync(string graphName, IDictionary<string, JObject> documents)
{
var contentList = new List<string>();
foreach (var document in documents)
{
if (String.Equals(document.Key, graphName))
{
var id = await GetIdFromGraphNameAsync(document.Key);
// use id if this document already exists
var useId = String.IsNullOrEmpty(id) ? "" : $"\"_id\":\"{id}\"";
// create new document
contentList.Add($"{{ \"index\" : {{ {useId} }} }}");
contentList.Add(document.Value.ToString().Replace("\n", "").Replace("\r", ""));
}
else
{
contentList = await AddOtherDocuments(contentList, document);
}
}
await BulkRequestAsync(contentList);
}
/// <summary>
/// Creates the content list for a bulk request with the given documents.
/// </summary>
/// <param name="contentList">A list containing the content for the bulk request.</param>
/// <param name="document">IDs of metadata graphs (key) and their correcsponding content as JSON object (value).</param>
/// <returns>The task result contains the created content list for the bulk request.</returns>
private async Task<List<string>> AddOtherDocuments(List<string> contentList, KeyValuePair<string, JObject> document)
{
var id = await GetIdFromGraphNameAsync(document.Key);
// it could take some time while new document is indexed and ready for search
while (String.IsNullOrEmpty(id))
{
Console.WriteLine($"Wait for elasticsearch _id of graph {document.Key}");
Thread.Sleep(100);
id = await GetIdFromGraphNameAsync(document.Key);
}
// update existing document
contentList.Add($"{{ \"update\" : {{ \"_id\" : \"{id}\" }} }}");
contentList.Add($"{{ \"doc\": {document.Value.ToString()} }}".Replace("\n", "").Replace("\r", ""));
return contentList;
}
public async Task DeleteDocumentAsync(string graphName, IDictionary<string, JObject> documents)
{
var contentList = new List<string>();
contentList.Add($"{{ \"delete\" : {{ \"_id\":\"{await GetIdFromGraphNameAsync(graphName)}\" }} }}");
foreach (var document in documents)
{
contentList = await AddOtherDocuments(contentList, document);
}
await BulkRequestAsync(contentList);
}
/// <summary>
/// Executes a bulk request with the given list of content.
/// </summary>
/// <param name="contentList">An enumerator of content rows for the bulk request.</param>
/// <returns>A task that represents the asynchronous save operation.</returns>
private async Task BulkRequestAsync(IEnumerable<string> contentList)
{
var content = String.Join("\n", contentList) + "\n";
// bulk request because constructed additional triples could change other documents..
var response = await client.PostAsync(baseUrl + _index + "/" + BULK, CreateJsonContent(content));
HandleResponse(response);
}
// search
public async Task<IDictionary<string, double>> SearchAsync(string query, IEnumerable<string> projects, bool advanced, int size, int from, string sorting)
{
// track_total_hits = true around query to get total value of results
var searchType = "simple_query_string";
if (advanced)
{
searchType = "query_string";
}
JObject queryJObject = new JObject()
{
new JProperty("bool", new JObject
{
new JProperty("must", new JObject
{
new JProperty(searchType, new JObject
{
new JProperty("query", query)
})
}),
new JProperty("filter", new JObject {
CreateVisibilityFilter(projects)
})
})
};
JObject content = new JObject()
{
{"size", size},
{"from", from},
{"sort", JArray.Parse(sorting)},
{"_source", ElasticsearchIndexMapper.LABEL_GRAPHNAME},
{"query", queryJObject}
};
return await Search(content);
}
/// <summary>
/// Creates the visibility filter for search to allow a user only to see public metadata or metadata of own projects.
/// </summary>
/// <param name="projects">An enumerator containing the projects which are allowed.</param>
/// <returns>A JProperty containing the specified visibility filter.</returns>
private JProperty CreateVisibilityFilter(IEnumerable<string> projects)
{
return new JProperty("bool", new JObject {
new JProperty("must", new JObject
{
new JProperty("bool", new JObject
{
new JProperty("should", new JArray
{
new JObject
{
new JProperty("terms", new JObject
{
new JProperty(ElasticsearchIndexMapper.LABEL_BELONGS_TO_PROJECT, new JArray(projects))
})
},
new JObject
{
new JProperty("term", new JObject
{
new JProperty(ElasticsearchIndexMapper.LABEL_IS_PUBLIC, true)
})
}
})
})
})
});
}
/// <summary>
/// Runs the search and handles the response.
/// </summary>
/// <param name="content">A JSON object containing the body of the search request.</param>
/// <returns>The task result contains a dictionary containing the IDs of the found metadata graphs (key)
/// and the corresponding ranking (value).</returns>
private async Task<IDictionary<string, double>> Search(JObject content)
{
var response = await client.PostAsync(baseUrl + ElasticsearchIndexMapper.DEFAULT_ALIAS_NAME + "/" + SEARCH, CreateJsonContent(content));
if (response.IsSuccessStatusCode)
{
JObject jObject = JObject.Parse(await response.Content.ReadAsStringAsync());
return GetSearchResults(jObject);
}
else
{
// TODO: for malformed query
// ["error"]["root_cause"]["reason"]
// ["error"]["root_cause"]["type"]
HandleResponse(response);
return new Dictionary<string, double>();
}
}
/// <summary>
/// Filters the plain search results of a search request.
/// </summary>
/// <param name="results">Plain JSON result of a search request.</param>
/// <returns>A dictionary containing the ID of the metadata graphs (key) and the corresponding rankings (value),</returns>
private IDictionary<string, double> GetSearchResults(JObject results)
{
IDictionary<string, double> graphNames = new Dictionary<string, double>();
if ((int)results["hits"]["total"]["value"] > 0)
{
foreach (var result in results["hits"]["hits"])
{
graphNames.Add((string)result["_source"][ElasticsearchIndexMapper.LABEL_GRAPHNAME], (double)result["_score"]);
}
}
return graphNames;
}
}
}