diff --git a/Cognite.Extensions/CdfMetrics.cs b/Cognite.Extensions/CdfMetrics.cs index e45b0d9c..9fedc2e5 100644 --- a/Cognite.Extensions/CdfMetrics.cs +++ b/Cognite.Extensions/CdfMetrics.cs @@ -26,6 +26,8 @@ public static Summary.Child Instances(ViewIdentifier view, string operation) "Number and duration of sequence row requests to CDF", "endpoint"); public static Counter NumberDatapoints { get; } = Metrics.CreateCounter( "extractor_utils_cdf_datapoints", "Number of data points uploaded to CDF"); + public static Summary StreamRecords { get; } = Metrics.CreateSummary("extractor_utils_cdf_stream_records", + "Number and duration of requests to endpoints for streams and records", "endpoint"); public static Counter AssetsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_assets_skipped", "Number of assets skipped due to errors"); diff --git a/Cognite.Extensions/Cognite.Extensions.csproj b/Cognite.Extensions/Cognite.Extensions.csproj index 5065ab24..a4b0900e 100644 --- a/Cognite.Extensions/Cognite.Extensions.csproj +++ b/Cognite.Extensions/Cognite.Extensions.csproj @@ -28,7 +28,7 @@ - + diff --git a/Cognite.Extensions/CogniteUtils.cs b/Cognite.Extensions/CogniteUtils.cs index 8fd1a51b..de208afc 100644 --- a/Cognite.Extensions/CogniteUtils.cs +++ b/Cognite.Extensions/CogniteUtils.cs @@ -6,6 +6,7 @@ using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using Cognite.Extensions.DataModels; using Cognite.Extractor.Common; using CogniteSdk; using CogniteSdk.Alpha; @@ -613,6 +614,7 @@ public static void AddExtensionLoggers(this IServiceProvider provider) RawExtensions.SetLogger(logger); EventExtensions.SetLogger(logger); SequenceExtensions.SetLogger(logger); + StreamRecordExtensions.SetLogger(logger); } } diff --git a/Cognite.Extensions/StreamRecordExtensions.cs b/Cognite.Extensions/StreamRecordExtensions.cs new file mode 100644 index 00000000..3a7b9838 --- /dev/null +++ b/Cognite.Extensions/StreamRecordExtensions.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Cognite.Extractor.Common; +using CogniteSdk; +using CogniteSdk.Beta; +using CogniteSdk.Resources.Beta; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Cognite.Extensions +{ + /// + /// Extension utility methods for the beta streamrecords resource. + /// + public static class StreamRecordExtensions + { + private static ILogger _logger = new NullLogger(); + + internal static void SetLogger(ILogger logger) + { + _logger = logger; + } + + /// + /// Retrieve a stream, or create it if it does not exist. + /// + /// Stream resource + /// Stream to create + /// Cancellation token + /// Created or retrieved stream. + /// + public static async Task GetOrCreateStreamAsync( + this StreamRecordsResource streams, + StreamWrite stream, + CancellationToken token + ) + { + if (stream is null) throw new ArgumentNullException(nameof(stream)); + try + { + using (CdfMetrics.StreamRecords.WithLabels("retrieve_stream")) + { + var res = await streams.RetrieveStreamAsync(stream.ExternalId, token).ConfigureAwait(false); + return res; + } + } + catch (ResponseException ex) when (ex.Code == 404) + { + } + using (CdfMetrics.StreamRecords.WithLabels("create_stream")) + { + _logger.LogInformation("Creating new stream with ID {Stream}", stream.ExternalId); + return await streams.CreateStreamAsync(stream, token).ConfigureAwait(false); + } + } + + /// + /// Insert the given stream records into . The stream + /// must exist. + /// + /// Stream resource + /// Stream to ingest into + /// Stream records to insert + /// Maximum number of records per request + /// Maximum number of parallel requests + /// Cancellation token + public static async Task InsertRecordsAsync( + this StreamRecordsResource streams, + string stream, + ICollection records, + int chunkSize, + int throttleSize, + CancellationToken token + ) + { + var chunks = records.ChunkBy(chunkSize); + + var generators = chunks + .Select, Func>( + chunk => async () => + { + using (CdfMetrics.StreamRecords.WithLabels("ingest_instances")) + { + await streams.IngestAsync(stream, chunk, token).ConfigureAwait(false); + } + } + ); + int numTasks = 0; + await generators + .RunThrottled(throttleSize, (_) => + _logger.LogDebug("{MethodName} completed {Num}/{Total} tasks", nameof(InsertRecordsAsync), ++numTasks, + Math.Ceiling((double)records.Count / chunkSize)), token) + .ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/ExtractorUtils/Cognite/CogniteDestination.cs b/ExtractorUtils/Cognite/CogniteDestination.cs index 6299c99d..ee104a41 100644 --- a/ExtractorUtils/Cognite/CogniteDestination.cs +++ b/ExtractorUtils/Cognite/CogniteDestination.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Text.Json; using System.Threading; @@ -8,6 +9,7 @@ using Cognite.Extensions.DataModels.CogniteExtractorExtensions; using Cognite.Extractor.StateStorage; using CogniteSdk; +using CogniteSdk.Beta; using Microsoft.Extensions.Logging; using TimeRange = Cognite.Extractor.Common.TimeRange; @@ -951,5 +953,49 @@ public async Task> InsertSequenceRowsAsync( token).ConfigureAwait(false); } #endregion + + #region stream_records + /// + /// Retrieve a stream, or create it if it does not exist. + /// + /// Stream to create or retrieve + /// Cancellation token + /// Created or retrieved stream + /// + public async Task GetOrCreateStreamAsync( + StreamWrite stream, + CancellationToken token) + { + if (stream == null) throw new ArgumentNullException(nameof(stream)); + _logger.LogDebug("Getting or creating stream with ID {Stream}", stream.ExternalId); + return await _client.Beta.StreamRecords.GetOrCreateStreamAsync(stream, token).ConfigureAwait(false); + } + + /// + /// Insert the given stream records into . The stream + /// must exist. + /// + /// Stream to ingest into + /// Stream records to insert + /// Cancellation token + /// + public async Task InsertRecordsAsync( + string stream, + ICollection records, + CancellationToken token) + { + if (stream == null) throw new ArgumentNullException(nameof(stream)); + if (records == null) throw new ArgumentNullException(nameof(records)); + _logger.LogDebug("Inserting {Count} records into the stream {Stream}", + records.Count, stream); + await _client.Beta.StreamRecords.InsertRecordsAsync( + stream, + records, + _config.CdfChunking.StreamRecords, + _config.CdfThrottling.StreamRecords, + token + ).ConfigureAwait(false); + } + #endregion } } \ No newline at end of file diff --git a/ExtractorUtils/Configuration/BaseConfig.cs b/ExtractorUtils/Configuration/BaseConfig.cs index 8cbc3cf6..02a39097 100644 --- a/ExtractorUtils/Configuration/BaseConfig.cs +++ b/ExtractorUtils/Configuration/BaseConfig.cs @@ -229,6 +229,11 @@ public class ChunkingConfig /// Maximum number of data modeling instances per get/create instance request /// public int Instances { get; set; } = 1000; + + /// + /// Maximum number of stream records per ingest request. + /// + public int StreamRecords { get; set; } = 1000; } /// @@ -279,6 +284,11 @@ public class ThrottlingConfig /// Maximum number of parallel requests per data modeling instances operation /// public int Instances { get; set; } = 4; + + /// + /// Maximum number of parallel requests per stream record ingest operation. + /// + public int StreamRecords { get; set; } = 4; } /// /// Configure automatic retries on requests to CDF. diff --git a/version b/version index cfc73071..5e57fb89 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.28.0 +1.29.0