using DynamicData; using System.Linq.Expressions; using System.Text.Json.Serialization; using System.Text.Json; using System.Reactive.Linq; using System.Net.Http.Json; using Microsoft.EntityFrameworkCore; using Polly; using Polly.Bulkhead; namespace ZymonicServices; public abstract class ZymonicFilter where TSearch : class, IZymonicFilterSearch, new() where TSearchAPI : class, IZymonicFilterApiRequest, new() where TResult : class, IZymonicFilterResult where TFilterResults : class, IZymonicFilterResults where TFilterResultsAPI : class, IZymonicFilterApiResponse { // https://github.com/reactivemarbles/DynamicData?tab=readme-ov-file#the-observable-cachedd private readonly SourceCache _items = new SourceCache(result => result.uid); protected SourceCache results { get { return _items; } } public TSearch searchParameters { get; set; } = new TSearch(); private IZymonicDbContextFactory _contextFactory; private IZymonicSettings _settings; private IZymonicAA _authService; private IZymonicLogger _logger; public int resultCount { get; set; } public int pageCount { get; set; } public int maximumFetchTasks { get; set; } = 1; public int CacheResultLimit { get; set; } = 100; public int MaxCacheAgeMinutes { get; set; } = 30; private int cacheResultCount { get; set; } private Dictionary singleHttpClient = new Dictionary(); private ZymonicFilterStatus? filterStatus; private ZymonicEventListener listener; private AsyncBulkheadPolicy _bulkheadPolicy; public ZymonicFilter(IZymonicLogger logger, IZymonicDbContextFactory contextFactory, IZymonicSettings settings, IZymonicAA authService, AsyncBulkheadPolicy bulkheadPolicy) { _contextFactory = contextFactory; _settings = settings; _authService = authService; _logger = logger; _bulkheadPolicy = bulkheadPolicy; // add trace/debug level logging and switch to that. listener = new ZymonicEventListener(_logger, ZymonicEventListener.NetworkingEvents); // Check for settings that override the defaults string? overrideMaximumFetchTasks = GetSettingOverride("MaximumFetchTasks"); if (overrideMaximumFetchTasks is not null) { maximumFetchTasks = int.Parse(overrideMaximumFetchTasks); } string? overrideCacheResultLimit = GetSettingOverride("CacheResultLimit"); if (overrideCacheResultLimit is not null) { CacheResultLimit = int.Parse(overrideCacheResultLimit); } string? overrideMaxCacheAgeMinutes = GetSettingOverride("MaxCacheAgeMinutes"); if (overrideMaxCacheAgeMinutes is not null) { MaxCacheAgeMinutes = int.Parse(overrideMaxCacheAgeMinutes); } } private string? GetSettingOverride(string setting) { string? settingValue = _settings.Setting($"{this.GetType().Name}_{setting}"); if (settingValue is null) { settingValue = _settings.Setting(setting); } else { _logger.LogDebug("{Filter} setting {Setting} has been overriden by a filter specific value {SettingValue}", this.GetType().Name, setting, settingValue); } if (settingValue is not null) { _logger.LogDebug("{Filter} setting {Setting} has been overriden by app specific value {SettingValue}", this.GetType().Name, setting, settingValue); } return settingValue; } // We expose the Connect() since we are interested in a stream of changes. public IObservable> Connect() => _items.Connect(); public IObservable Search(TSearch search, bool debugMode = false, bool requireAuthenticated = true, int SufficientResults = 0, int TimeoutSeconds = 120, bool localOnly = false) { return Observable.Create(async (observer, cancellationToken) => { try { searchParameters = search; IZymonicDbContext context = _contextFactory.CreateDbContext(); // Refresh the cache as soon as the user has initiated a search RefreshCache(searchParameters); // Sleep here if the DB cache had data in - allows time for cancellation by a subsequent search term // but user should still see the cached data if (cacheResultCount > 0) { _logger.LogInformation("{Filter} has {CacheResultCount} results cached so sleeping in case user starts counting", this.GetType().Name, cacheResultCount); await Task.Delay(TimeSpan.FromMilliseconds(500)); cancellationToken.ThrowIfCancellationRequested(); } bool cacheOverride = false; if (localOnly || (SufficientResults > 0 && cacheResultCount > SufficientResults)) { _logger.LogInformation("{Filter} has {CacheResultCount} results cached so returning", this.GetType().Name, cacheResultCount); cacheOverride = true; } int page = 0; int resultsRetrieved = 0; // TODO-Phase2 Limit cache size // TODO-Phase2 Understand why in the case of till settings no results appear in the connected resultset until the second search if (!cacheOverride && !IsCacheFresh(searchParameters)) { _logger.LogInformation("{Filter} Cache is not fresh - running search", this.GetType().Name); // Trigger the API call to update the collection and DB try { // Get the starting time DateTime startTime = DateTime.UtcNow; CacheUpdateStarted(searchParameters, startTime); // Mark the results as not yet updated context.Set().Where(whereClause(searchParameters)).ExecuteUpdate(results => results.SetProperty(result => result.UpdateComplete, false)); // Get the first results and counts Interlocked.Increment(ref page); List apiResults = await GetApiResults(debugMode, requireAuthenticated, cancellationToken, page, TimeoutSeconds).ConfigureAwait(false); Interlocked.Add(ref resultsRetrieved, apiResults.Count); await StoreApiResults(apiResults, cancellationToken).ConfigureAwait(true); RefreshCache(searchParameters); observer.OnNext(new ZymonicSearchProgress(resultsRetrieved, resultCount, null, DateTime.UtcNow)); var downloadTasks = new List>>(); for (int i = 0; i < maximumFetchTasks; i++) { if (pageCount > page) { downloadTasks.Add(Task.Run(() => { Interlocked.Increment(ref page); return GetApiResults(debugMode, requireAuthenticated, cancellationToken, page, TimeoutSeconds); })); } } while (downloadTasks.Any()) { Task> finishedTask = await Task.WhenAny(downloadTasks); downloadTasks.Remove(finishedTask); apiResults = await finishedTask; Interlocked.Add(ref resultsRetrieved, apiResults.Count); await StoreApiResults(apiResults, cancellationToken).ConfigureAwait(true); cancellationToken.ThrowIfCancellationRequested(); RefreshCache(searchParameters); observer.OnNext(new ZymonicSearchProgress(resultsRetrieved, resultCount, null, DateTime.UtcNow)); _logger.LogInformation("Page count is {PageCount} current page is {page}", pageCount, page); if (pageCount > page) { // Run get token with a longer margin here to ensure that we don't try and refresh the token in // a background task and then fail to write to the DB await _authService.GetToken(60).ConfigureAwait(true); downloadTasks.Add(Task.Run(() => { Interlocked.Increment(ref page); return GetApiResults(debugMode, requireAuthenticated, cancellationToken, page, TimeoutSeconds); })); } } // If we've completed a full download remove any records not updated in this run CacheUpdated(searchParameters); _logger.LogInformation("{Filter} deleting any records not updated after our search started at {StartTime} UTC", this.GetType().Name, startTime); context.Set().Where(whereClause(searchParameters)).Where(result => result.dateUpdated < startTime).ExecuteDelete(); // Actually easy solution is to have a limit of cache size that gives an allowed number of results and then // remove the oldest records // Save any changes from update complete or deletion of old records await context.SaveChangesAsync().ConfigureAwait(false); } catch (UnauthenticatedFilterResponseException e) { // Update our settings etc. with any post-logout tasks _authService.Logout(); _logger.LogWarning("{Filter} Server responded with not authenticated unexpectedly.", this.GetType().Name); observer.OnNext(new ZymonicSearchProgress(0, 0, e)); } catch (TaskCanceledException e) { _logger.LogInformation("{Filter} Search Cancelled", this.GetType().Name); observer.OnNext(new ZymonicSearchProgress(0, 0, e)); } catch (TokenException e) { // Update our settings etc. with any post-logout tasks _authService.Logout(); _logger.LogWarning("{Filter} Server responded with unable to refresh token.", this.GetType().Name); observer.OnNext(new ZymonicSearchProgress(0, 0, e)); } } else { _logger.LogInformation("{Filter} not refreshing cache", this.GetType().Name, MaxCacheAgeMinutes); observer.OnNext(new ZymonicSearchProgress(cacheResultCount, cacheResultCount, null, GetCacheLastUpdated(searchParameters))); } // NOTE that if you combine this observable with a search term observable then this Oncompleted is ignored because there may be more search terms coming observer.OnCompleted(); } catch (Exception e) { _logger.LogInformation("{Filter} Error {Exception}", this.GetType().Name, e); } }); } private HttpClient SingleHttpClient(int TimeoutSeconds = 120) { if (!singleHttpClient.ContainsKey(TimeoutSeconds) || singleHttpClient[TimeoutSeconds] is null) { singleHttpClient[TimeoutSeconds] = new HttpClient(new ZymonicAPIHandler(_logger, _authService, new HttpLoggingHandler(_logger))) { BaseAddress = new Uri(_settings.BaseUri()), Timeout = TimeSpan.FromSeconds(TimeoutSeconds) }; } return singleHttpClient[TimeoutSeconds]; } private void RefreshCache(TSearch search) { IZymonicDbContext context = _contextFactory.CreateDbContext(); _items.Clear(); List results = context.Set().Include(p => p.ZZid).Where(whereClause(search)).ToList(); _items.Edit(innerCache => { innerCache.AddOrUpdate(results.Take(CacheResultLimit).ToList()); }); cacheResultCount = results.Count; // Find the results that were flagged as being included in a complete search List UpdateCompleteResults = results.Where(x => x.UpdateComplete).ToList(); } private bool IsCacheFresh(TSearch search) { bool cacheFresh = false; // lookup the filter status IZymonicDbContext context = _contextFactory.CreateDbContext(); filterStatus = context.Set() .OrderByDescending(x => x.SearchCompleted) .FirstOrDefault(x => x.ZName == this.GetType().Name && (x.SearchSignature == SearchParamSignature(search) || x.searchIsParameterless == true)); if (filterStatus is null) { _logger.LogDebug("{Filter} Cache is not fresh - no filter status found", this.GetType().Name); return false; } var cacheExpiresAt = filterStatus.SearchCompleted.Add(TimeSpan.FromMinutes(MaxCacheAgeMinutes)); _logger.LogDebug("{Filter} Cache last updatedated {LastUpdated} and expires at {ExpiresAt}", this.GetType().Name, filterStatus.SearchCompleted, cacheExpiresAt); if (cacheExpiresAt > DateTime.UtcNow) { cacheFresh = true; } return cacheFresh; } private DateTime? GetCacheLastUpdated(TSearch search) { IZymonicDbContext context = _contextFactory.CreateDbContext(); filterStatus = context.Set().FirstOrDefault(x => x.ZName == this.GetType().Name && x.SearchSignature == SearchParamSignature(search)); if (filterStatus is null) { return null; } return filterStatus.SearchCompleted; } private void CacheUpdateStarted(TSearch search, DateTime? startTime = null) { IZymonicDbContext context = _contextFactory.CreateDbContext(); filterStatus = context.Set().FirstOrDefault(x => x.ZName == this.GetType().Name && x.SearchSignature == SearchParamSignature(search)); if (filterStatus is null) { _logger.LogDebug("{Filter} Cache update started - no filter status found creating new one.", this.GetType().Name); filterStatus = new ZymonicFilterStatus { ZName = this.GetType().Name, SearchStarted = startTime ?? DateTime.UtcNow, SearchSignature = SearchParamSignature(search), searchIsParameterless = search.SearchIsParameterless }; context.Set().Add(filterStatus); } else { _logger.LogDebug("{Filter} Cache update started - filter status found and updated with start time", this.GetType().Name); filterStatus.SearchStarted = startTime ?? DateTime.UtcNow; } context.SaveChanges(); } private void CacheUpdated(TSearch search, DateTime? endTime = null) { IZymonicDbContext context = _contextFactory.CreateDbContext(); filterStatus = context.Set().FirstOrDefault(x => x.ZName == this.GetType().Name && x.SearchSignature == SearchParamSignature(search)); if (filterStatus is not null) { filterStatus.SearchCompleted = endTime ?? DateTime.UtcNow; context.SaveChanges(); } } private string SearchParamSignature(TSearch search) { string json = JsonSerializer.Serialize(search); string hash = Convert.ToBase64String(System.Security.Cryptography.SHA256.HashData(System.Text.Encoding.UTF8.GetBytes(json))); return hash; } private async Task StoreApiResults(List apiResults, CancellationToken cancellationToken) { IZymonicDbContext context = _contextFactory.CreateDbContext(); int Updates = 0; int Adds = 0; _logger.LogDebug("{Filter} StoreApiResults Running on thread {Thread}", this.GetType().Name, System.Threading.Thread.CurrentThread.ManagedThreadId.ToString()); foreach (var result in apiResults) { cancellationToken.ThrowIfCancellationRequested(); // Update the timestamp result.dateUpdated = DateTime.UtcNow; var existingResult = context.Set().Find(result.uid); if (existingResult == null) { _logger.LogDebug("Adding {ResultUid} to {Filter} cache", result.uid, this.GetType().Name); context.Set().Add(result); Adds++; } else { _logger.LogDebug("Updating {ResultUid} in {Filter} cache", result.uid, this.GetType().Name); context.Set().Entry(existingResult).CurrentValues.SetValues(result); Updates++; } } _logger.LogInformation("{ResultsUpdated} Updates and {ResultsAdded} Added to {Filter} cache", Updates, Adds, this.GetType().Name); // We await to ensure we don't try and save multiple results at once await context.SaveChangesAsync().ConfigureAwait(false); } private async Task> GetApiResults(bool debugMode, bool requireAuthenticated, CancellationToken ct, int page = 1, int TimeoutSeconds = 120) { List retrievedResults = []; _logger.LogDebug("{Filter} GetApiResults Running on thread {Thread}", this.GetType().Name, System.Threading.Thread.CurrentThread.ManagedThreadId.ToString()); // Set-up the search parameters TSearchAPI searchAPI = new TSearchAPI { ZymonicHeader = new ZymonicAPIHeader(_settings.SystemName(), "filter", debugMode) }; // Since we're now paginating we need to set-up a copy of the search parameters. TSearch apiSearchParameters = (TSearch)searchParameters.Clone(); apiSearchParameters.report_pagenum = page; searchAPI.searchFields = apiSearchParameters; // Json Options JsonSerializerOptions jsonOptions = new() { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, NumberHandling = JsonNumberHandling.AllowReadingFromString, Converters = { new EmptyStringToNumberConverter(), new EmptyStringToNumberConverter(), new EmptyStringToNumberConverter(), new EmptyStringToNumberConverter() } }; try { TFilterResultsAPI? apiDecodedResponse = await ApiWrapper(searchAPI, jsonOptions, ct, TimeoutSeconds).ConfigureAwait(true); if (apiDecodedResponse is null || apiDecodedResponse.filterResults is null || apiDecodedResponse.filterResults.report is null) { throw new ApiResponseMissingOrEmpty(); } if (requireAuthenticated && apiDecodedResponse.Authenticated != "Y") { throw new UnauthenticatedFilterResponseException(); } TFilterResults apiResults = apiDecodedResponse.filterResults; retrievedResults = apiResults.GetResults(); // Update the result and page counts if (page == 1) { resultCount = apiDecodedResponse.filterResults.FilterResultCount?[0]?.Count ?? 0; pageCount = apiDecodedResponse.filterResults.report.navigation.last_page ?? 0; } var current_page = apiDecodedResponse.filterResults.report.navigation.current_page ?? 0; if (current_page != page) { _logger.LogError("Wrong page returned - requested {RequestedPage} returned {ReturnedPage}", page, current_page); throw new WrongPageReturnedException() { PageRequested = page, PageReturned = current_page }; } } catch (Exception ex) { _logger.LogError(ex.ToString()); throw; } return retrievedResults; } public async Task ApiWrapper(TSearchAPI search, JsonSerializerOptions jsonOptions, CancellationToken ct, int TimeoutSeconds = 120) { var retryPolicy = Policy .Handle() .Or() .WaitAndRetryAsync( retryCount: 10, sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)), onRetry: (outcome, timespan, attempt, context) => { _logger.LogWarning("Retrying filter due to failure: {Attempt} - {Timespan}", attempt, timespan); } ); var combinedPolicy = retryPolicy.WrapAsync(_bulkheadPolicy); var outerResponse = await combinedPolicy.ExecuteAsync(async ct => { HttpResponseMessage response = await SingleHttpClient(TimeoutSeconds).PostAsJsonAsync("", search, jsonOptions, ct).ConfigureAwait(true); response.EnsureSuccessStatusCode(); return await response.Content.ReadFromJsonAsync(jsonOptions, ct).ConfigureAwait(true); }, ct); return outerResponse; } public List GetAll() { IZymonicDbContext context = _contextFactory.CreateDbContext(); return context.Set().ToList(); } // Override to provide the mapping between fields in your TSearch entity and in the TResult entity protected virtual Expression> whereClause(TSearch search) { return result => 1 == 1; } } public class UnauthenticatedFilterResponseException : Exception { } public class WrongPageReturnedException : Exception { public int PageRequested; public int PageReturned; } public class ApiResponseMissingOrEmpty : Exception { } public readonly struct ZymonicSearchProgress { public ZymonicSearchProgress(int resultsRetrieved, int totalResults, Exception? e = null, DateTime? lastUpdated = null) { LastUpdated = lastUpdated; ResultsRetrieved = resultsRetrieved; TotalResults = totalResults; error = e; } public int ResultsRetrieved { get; init; } public int TotalResults { get; init; } public Exception? error { get; init; } public DateTime? LastUpdated { get; init; } public override string ToString() { int percentage = (int)100 * ResultsRetrieved / TotalResults; return $"Fetched {ResultsRetrieved} of {TotalResults} ({percentage}%)"; } } public class EmptyStringToNumberConverter : JsonConverter { public override bool HandleNull => true; public override T? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { var underlyingType = Nullable.GetUnderlyingType(typeToConvert) ?? typeToConvert; if (reader.TokenType == JsonTokenType.Null) { return default; } if (reader.TokenType == JsonTokenType.String) { string stringValue = reader.GetString() ?? string.Empty; if (string.IsNullOrEmpty(stringValue)) { return default; } if (underlyingType == typeof(int) && int.TryParse(stringValue, out int intValue)) { return (T)(object)intValue; } if (underlyingType == typeof(double) && double.TryParse(stringValue, out double doubleValue)) { return (T)(object)doubleValue; } if (underlyingType == typeof(decimal) && decimal.TryParse(stringValue, out decimal decimalValue)) { return (T)(object)decimalValue; } if (underlyingType == typeof(float) && float.TryParse(stringValue, out float floatValue)) { return (T)(object)floatValue; } throw new JsonException($"Cannot convert {stringValue} to {underlyingType}."); } if (reader.TokenType == JsonTokenType.Number) { if (underlyingType == typeof(int)) { return (T)(object)reader.GetInt32(); } if (underlyingType == typeof(double)) { return (T)(object)reader.GetDouble(); } if (underlyingType == typeof(decimal)) { return (T)(object)reader.GetDecimal(); } if (underlyingType == typeof(float)) { return (T)(object)reader.GetSingle(); } throw new JsonException($"Cannot convert {reader.GetDouble()} to {underlyingType}."); } throw new JsonException($"Unexpected token type {reader.TokenType} when parsing {typeof(T)}."); } public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options) { writer.WriteNumberValue(Convert.ToDouble(value)); } }