using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; namespace SemanticSearchImplementation { /// <summary> /// Implements necessary functions to use Elasticsearch as a search engine. /// </summary> /// <inheritdoc cref="ISearchClient"/> public class ElasticsearchSearchClient : ISearchClient { // API private const string ALIASES = "_aliases"; private const string SEARCH = "_search"; private const string MAPPING = "_mapping"; private const string BULK = "_bulk"; private static readonly HttpClient client = new HttpClient(); private readonly string baseUrl; private string _index; public ElasticsearchSearchClient(string server = "localhost", string port = "9200") { baseUrl = $"http://{server}:{port}/"; client.DefaultRequestHeaders.Add("Accept", "application/json"); } public void ChangeIndex(string index) { _index = index; } public async Task<IDictionary<string, string>> GetMappingAsync() { var response = await client.GetAsync(baseUrl + _index + "/" + MAPPING); JObject jObject = JObject.Parse(await response.Content.ReadAsStringAsync()); var fieldMappings = (JObject)jObject[_index]["mappings"]["properties"]; IDictionary<string, string> mapping = new Dictionary<string, string>(); foreach (var prop in fieldMappings) { mapping.Add(prop.Key, (string) prop.Value["type"]); } return mapping; } /// <summary> /// Queries the document ID for a metadata graph. /// </summary> /// <param name="graphName">ID of the metadata graph.</param> /// <returns>ID of the Elasticsearch document.</returns> private async Task<string> GetIdFromGraphNameAsync(string graphName) { var content = new JObject() { new JProperty("query", new JObject { new JProperty("term", new JObject { new JProperty(ElasticsearchIndexMapper.LABEL_GRAPHNAME, new JObject { new JProperty("value", graphName) }) }) }) }; var response = await client.PostAsync(baseUrl + _index + "/" + SEARCH, CreateJsonContent(content)); JObject jObject = JObject.Parse(await response.Content.ReadAsStringAsync()); if ((int)jObject["hits"]["total"]["value"] == 1) { return (string)jObject["hits"]["hits"][0]["_id"]; } else { return null; } } /// <summary> /// Handles response of a HTTP request. /// </summary> /// <param name="response">The response of a HTTP request.</param> private void HandleResponse(HttpResponseMessage response) { if (!response.IsSuccessStatusCode) { // TODO Console.WriteLine(response.StatusCode); Console.WriteLine(response.Content); Console.WriteLine(response.RequestMessage); } } // create json content from JObject private HttpContent CreateJsonContent(JObject content) { return new StringContent(content.ToString(), Encoding.UTF8, "application/json"); } // create json content from string private HttpContent CreateJsonContent(string content) { return new StringContent(content, Encoding.UTF8, "application/json"); } public async Task CreateIndexAsync(JObject content, string index) { var response = await client.PutAsync(baseUrl + index, CreateJsonContent(content)); response.EnsureSuccessStatusCode(); } public async Task AddDocumentsAsync(IEnumerable<JObject> documents) { var contentList = new List<string>(); foreach (var document in documents) { // create new document contentList.Add($"{{ \"index\" : {{ }} }}"); contentList.Add(document.ToString().Replace("\n", "").Replace("\r", "")); } var content = String.Join("\n", contentList) + "\n"; var response = await client.PostAsync(baseUrl + _index + "/" + BULK, CreateJsonContent(content)); HandleResponse(response); } public async Task SwitchAliasAsync(string from, string to) { var jObject = new JObject { new JProperty("actions", new JArray { new JObject { new JProperty("remove", new JObject { new JProperty("alias", ElasticsearchIndexMapper.DEFAULT_ALIAS_NAME), new JProperty("index", from) }) }, new JObject { new JProperty("add", new JObject { new JProperty("alias", ElasticsearchIndexMapper.DEFAULT_ALIAS_NAME), new JProperty("index", to) }) } }) }; var response = await client.PostAsync(baseUrl + ALIASES, CreateJsonContent(jObject)); response.EnsureSuccessStatusCode(); } public async Task DeleteIndexAsync(string index) { var response = await client.DeleteAsync(baseUrl + index); HandleResponse(response); } public async Task AddDocumentAsync(string graphName, IDictionary<string, JObject> documents) { var contentList = new List<string>(); foreach (var document in documents) { if (String.Equals(document.Key, graphName)) { var id = await GetIdFromGraphNameAsync(document.Key); // use id if this document already exists var useId = String.IsNullOrEmpty(id) ? "" : $"\"_id\":\"{id}\""; // create new document contentList.Add($"{{ \"index\" : {{ {useId} }} }}"); contentList.Add(document.Value.ToString().Replace("\n", "").Replace("\r", "")); } else { contentList = await AddOtherDocuments(contentList, document); } } await BulkRequestAsync(contentList); } /// <summary> /// Creates the content list for a bulk request with the given documents. /// </summary> /// <param name="contentList">A list containing the content for the bulk request.</param> /// <param name="document">IDs of metadata graphs (key) and their correcsponding content as JSON object (value).</param> /// <returns>The task result contains the created content list for the bulk request.</returns> private async Task<List<string>> AddOtherDocuments(List<string> contentList, KeyValuePair<string, JObject> document) { var id = await GetIdFromGraphNameAsync(document.Key); // it could take some time while new document is indexed and ready for search while (String.IsNullOrEmpty(id)) { Console.WriteLine($"Wait for elasticsearch _id of graph {document.Key}"); Thread.Sleep(100); id = await GetIdFromGraphNameAsync(document.Key); } // update existing document contentList.Add($"{{ \"update\" : {{ \"_id\" : \"{id}\" }} }}"); contentList.Add($"{{ \"doc\": {document.Value.ToString()} }}".Replace("\n", "").Replace("\r", "")); return contentList; } public async Task DeleteDocumentAsync(string graphName, IDictionary<string, JObject> documents) { var contentList = new List<string>(); contentList.Add($"{{ \"delete\" : {{ \"_id\":\"{await GetIdFromGraphNameAsync(graphName)}\" }} }}"); foreach (var document in documents) { contentList = await AddOtherDocuments(contentList, document); } await BulkRequestAsync(contentList); } /// <summary> /// Executes a bulk request with the given list of content. /// </summary> /// <param name="contentList">An enumerator of content rows for the bulk request.</param> /// <returns>A task that represents the asynchronous save operation.</returns> private async Task BulkRequestAsync(IEnumerable<string> contentList) { var content = String.Join("\n", contentList) + "\n"; // bulk request because constructed additional triples could change other documents.. var response = await client.PostAsync(baseUrl + _index + "/" + BULK, CreateJsonContent(content)); HandleResponse(response); } // search public async Task<IDictionary<string, double>> SearchAsync(string query, IEnumerable<string> projects, bool advanced, int size, int from, string sorting) { // track_total_hits = true around query to get total value of results var searchType = "simple_query_string"; if (advanced) { searchType = "query_string"; } JObject queryJObject = new JObject() { new JProperty("bool", new JObject { new JProperty("must", new JObject { new JProperty(searchType, new JObject { new JProperty("query", query) }) }), new JProperty("filter", new JObject { CreateVisibilityFilter(projects) }) }) }; JObject content = new JObject() { {"size", size}, {"from", from}, {"sort", JArray.Parse(sorting)}, {"_source", ElasticsearchIndexMapper.LABEL_GRAPHNAME}, {"query", queryJObject} }; return await Search(content); } /// <summary> /// Creates the visibility filter for search to allow a user only to see public metadata or metadata of own projects. /// </summary> /// <param name="projects">An enumerator containing the projects which are allowed.</param> /// <returns>A JProperty containing the specified visibility filter.</returns> private JProperty CreateVisibilityFilter(IEnumerable<string> projects) { return new JProperty("bool", new JObject { new JProperty("must", new JObject { new JProperty("bool", new JObject { new JProperty("should", new JArray { new JObject { new JProperty("terms", new JObject { new JProperty(ElasticsearchIndexMapper.LABEL_BELONGS_TO_PROJECT, new JArray(projects)) }) }, new JObject { new JProperty("term", new JObject { new JProperty(ElasticsearchIndexMapper.LABEL_IS_PUBLIC, true) }) } }) }) }) }); } /// <summary> /// Runs the search and handles the response. /// </summary> /// <param name="content">A JSON object containing the body of the search request.</param> /// <returns>The task result contains a dictionary containing the IDs of the found metadata graphs (key) /// and the corresponding ranking (value).</returns> private async Task<IDictionary<string, double>> Search(JObject content) { var response = await client.PostAsync(baseUrl + ElasticsearchIndexMapper.DEFAULT_ALIAS_NAME + "/" + SEARCH, CreateJsonContent(content)); if (response.IsSuccessStatusCode) { JObject jObject = JObject.Parse(await response.Content.ReadAsStringAsync()); return GetSearchResults(jObject); } else { // TODO: for malformed query // ["error"]["root_cause"]["reason"] // ["error"]["root_cause"]["type"] HandleResponse(response); return new Dictionary<string, double>(); } } /// <summary> /// Filters the plain search results of a search request. /// </summary> /// <param name="results">Plain JSON result of a search request.</param> /// <returns>A dictionary containing the ID of the metadata graphs (key) and the corresponding rankings (value),</returns> private IDictionary<string, double> GetSearchResults(JObject results) { IDictionary<string, double> graphNames = new Dictionary<string, double>(); if ((int)results["hits"]["total"]["value"] > 0) { foreach (var result in results["hits"]["hits"]) { graphNames.Add((string)result["_source"][ElasticsearchIndexMapper.LABEL_GRAPHNAME], (double)result["_score"]); } } return graphNames; } } }