Some checks failed
build-packages / meilisearch-dotnet-packages (push) Has been cancelled
126 lines
4.8 KiB
C#
126 lines
4.8 KiB
C#
using System.Collections.ObjectModel;
|
|
using System.Collections.Specialized;
|
|
using Meilisearch;
|
|
using meilisearch.NET.Exceptions;
|
|
using meilisearch.NET.Interfaces;
|
|
using meilisearch.NET.Services.ProcessManagement;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace meilisearch.NET.Services.DocumentManagement;
|
|
|
|
public class DocumentManager:IDocumentManager
|
|
{
|
|
private readonly ILogger<DocumentManager> _logger;
|
|
private readonly MeiliSearchProcessManager _meiliSearchProcessManager;
|
|
private readonly MeilisearchClient _client;
|
|
|
|
private const int THRESHOLD = 100;
|
|
private ObservableCollection<KeyValuePair<string,IDocument>> _documentCollection;
|
|
|
|
public DocumentManager(MeilisearchClient client, ILogger<DocumentManager> logger, MeiliSearchProcessManager meiliSearchProcessManager)
|
|
{
|
|
_meiliSearchProcessManager = meiliSearchProcessManager;
|
|
_logger = logger;
|
|
_client = client;
|
|
_documentCollection = new ObservableCollection<KeyValuePair<string,IDocument>>();
|
|
_documentCollection.CollectionChanged += CheckIfNeedDocumentSync;
|
|
}
|
|
|
|
public async Task AddDocumentAsync(string repositoryId, IDocument document, bool autoCommit = false)
|
|
{
|
|
if (!_meiliSearchProcessManager.IsProcessRunning())
|
|
throw new ProcessNotRunningException();
|
|
|
|
_logger.LogTrace($"Adding document '{document.Id}' to repository '{repositoryId}'...");
|
|
_documentCollection.Add(new KeyValuePair<string, IDocument>(repositoryId, document));
|
|
_logger.LogInformation($"Document {document.Id} added to collection.");
|
|
if (autoCommit)
|
|
{
|
|
await SyncDocumentsToServerAsync();
|
|
}
|
|
}
|
|
public void AddDocument(string repositoryId, IDocument document, bool autoCommit = false)
|
|
{
|
|
if (!_meiliSearchProcessManager.IsProcessRunning())
|
|
throw new ProcessNotRunningException();
|
|
|
|
_logger.LogTrace($"Adding document '{document.Id}' to repository '{repositoryId}'...");
|
|
_documentCollection.Add(new KeyValuePair<string, IDocument>(repositoryId, document));
|
|
_logger.LogInformation($"Document {document.Id} added to collection.");
|
|
if (autoCommit)
|
|
{
|
|
SyncDocumentsToServer();
|
|
}
|
|
}
|
|
public void SyncDocumentsToServer()
|
|
{
|
|
if (!_meiliSearchProcessManager.IsProcessRunning())
|
|
throw new ProcessNotRunningException();
|
|
|
|
var grouped = _documentCollection.GroupBy(pair => pair.Key)
|
|
.ToDictionary(group => group.Key, group => group.Select(pair => pair.Value).ToList());
|
|
foreach (var repository in grouped)
|
|
{
|
|
var repositoryIndex = _client.GetIndexAsync(repository.Key).Result;
|
|
var documents = _documentCollection.ToList();
|
|
_documentCollection.Clear();
|
|
var result = RetryAsync(() => repositoryIndex.AddDocumentsAsync(repository.Value, "id")).Result;
|
|
}
|
|
}
|
|
public async Task SyncDocumentsToServerAsync()
|
|
{
|
|
if (!_meiliSearchProcessManager.IsProcessRunning())
|
|
throw new ProcessNotRunningException();
|
|
|
|
var grouped = _documentCollection.GroupBy(pair => pair.Key)
|
|
.ToDictionary(group => group.Key, group => group.Select(pair => pair.Value).ToList());
|
|
foreach (var repository in grouped)
|
|
{
|
|
var repositoryIndex = await _client.GetIndexAsync(repository.Key);
|
|
var documents = _documentCollection.ToList();
|
|
_documentCollection.Clear();
|
|
var result = RetryAsync(() => repositoryIndex.AddDocumentsAsync(repository.Value, "id")).Result;
|
|
}
|
|
}
|
|
|
|
#region Private Methods
|
|
|
|
private void CheckIfNeedDocumentSync(object? sender, NotifyCollectionChangedEventArgs e)
|
|
{
|
|
CheckIfNeedDocumentSync(THRESHOLD);
|
|
}
|
|
|
|
private void CheckIfNeedDocumentSync(int? threshold = null)
|
|
{
|
|
threshold = threshold ?? 0;
|
|
if(_documentCollection.Count>=threshold)
|
|
{
|
|
_logger.LogInformation("Threshold reached, syncing metadata to server.");
|
|
SyncDocumentsToServer();
|
|
}
|
|
}
|
|
|
|
private async Task<T> RetryAsync<T>(Func<Task<T>> action, int maxRetries = 3, int delayMilliseconds = 1000)
|
|
{
|
|
int retryCount = 0;
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
return await action();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
retryCount++;
|
|
if (retryCount >= maxRetries)
|
|
{
|
|
_logger.LogError($"Operation failed after {maxRetries} retries: {ex.Message}");
|
|
throw;
|
|
}
|
|
_logger.LogWarning($"Operation failed, retrying {retryCount}/{maxRetries}...");
|
|
await Task.Delay(delayMilliseconds);
|
|
}
|
|
}
|
|
}
|
|
#endregion
|
|
} |