Skip to content

Commit

Permalink
Some utils for stream records (#493)
Browse files Browse the repository at this point in the history
* Some utils for stream records

* Bump version

* Set logger
  • Loading branch information
einarmo authored Nov 19, 2024
1 parent 6e0064e commit de68c22
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cognite.Extensions/CdfMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public static Summary.Child Instances(ViewIdentifier view, string operation)
"Number and duration of sequence row requests to CDF", "endpoint");
public static Counter NumberDatapoints { get; } = Metrics.CreateCounter(
"extractor_utils_cdf_datapoints", "Number of data points uploaded to CDF");
public static Summary StreamRecords { get; } = Metrics.CreateSummary("extractor_utils_cdf_stream_records",
"Number and duration of requests to endpoints for streams and records", "endpoint");

public static Counter AssetsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_assets_skipped",
"Number of assets skipped due to errors");
Expand Down
2 changes: 1 addition & 1 deletion Cognite.Extensions/Cognite.Extensions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Identity.Client" Version="4.66.1" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<PackageReference Include="CogniteSdk" Version="4.9.0" />
<PackageReference Include="CogniteSdk" Version="4.10.0" />
<PackageReference Include="prometheus-net" Version="8.2.1" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.11" />
Expand Down
2 changes: 2 additions & 0 deletions Cognite.Extensions/CogniteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extensions.DataModels;
using Cognite.Extractor.Common;
using CogniteSdk;
using CogniteSdk.Alpha;
Expand Down Expand Up @@ -613,6 +614,7 @@ public static void AddExtensionLoggers(this IServiceProvider provider)
RawExtensions.SetLogger(logger);
EventExtensions.SetLogger(logger);
SequenceExtensions.SetLogger(logger);
StreamRecordExtensions.SetLogger(logger);
}
}

Expand Down
99 changes: 99 additions & 0 deletions Cognite.Extensions/StreamRecordExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extractor.Common;
using CogniteSdk;
using CogniteSdk.Beta;
using CogniteSdk.Resources.Beta;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Cognite.Extensions
{
/// <summary>
/// Extension utility methods for the beta streamrecords resource.
/// </summary>
public static class StreamRecordExtensions
{
private static ILogger _logger = new NullLogger<Client>();

internal static void SetLogger(ILogger logger)
{
_logger = logger;
}

/// <summary>
/// Retrieve a stream, or create it if it does not exist.
/// </summary>
/// <param name="streams">Stream resource</param>
/// <param name="stream">Stream to create</param>
/// <param name="token">Cancellation token</param>
/// <returns>Created or retrieved stream.</returns>
/// <exception cref="ArgumentNullException"></exception>
public static async Task<Stream> GetOrCreateStreamAsync(
this StreamRecordsResource streams,
StreamWrite stream,
CancellationToken token
)
{
if (stream is null) throw new ArgumentNullException(nameof(stream));
try
{
using (CdfMetrics.StreamRecords.WithLabels("retrieve_stream"))
{
var res = await streams.RetrieveStreamAsync(stream.ExternalId, token).ConfigureAwait(false);
return res;
}
}
catch (ResponseException ex) when (ex.Code == 404)
{
}
using (CdfMetrics.StreamRecords.WithLabels("create_stream"))
{
_logger.LogInformation("Creating new stream with ID {Stream}", stream.ExternalId);
return await streams.CreateStreamAsync(stream, token).ConfigureAwait(false);
}
}

/// <summary>
/// Insert the given stream records into <paramref name="stream"/>. The stream
/// must exist.
/// </summary>
/// <param name="streams">Stream resource</param>
/// <param name="stream">Stream to ingest into</param>
/// <param name="records">Stream records to insert</param>
/// <param name="chunkSize">Maximum number of records per request</param>
/// <param name="throttleSize">Maximum number of parallel requests</param>
/// <param name="token">Cancellation token</param>
public static async Task InsertRecordsAsync(
this StreamRecordsResource streams,
string stream,
ICollection<StreamRecordWrite> records,
int chunkSize,
int throttleSize,
CancellationToken token
)
{
var chunks = records.ChunkBy(chunkSize);

var generators = chunks
.Select<IEnumerable<StreamRecordWrite>, Func<Task>>(
chunk => async () =>
{
using (CdfMetrics.StreamRecords.WithLabels("ingest_instances"))
{
await streams.IngestAsync(stream, chunk, token).ConfigureAwait(false);
}
}
);
int numTasks = 0;
await generators
.RunThrottled(throttleSize, (_) =>
_logger.LogDebug("{MethodName} completed {Num}/{Total} tasks", nameof(InsertRecordsAsync), ++numTasks,
Math.Ceiling((double)records.Count / chunkSize)), token)
.ConfigureAwait(false);
}
}
}
46 changes: 46 additions & 0 deletions ExtractorUtils/Cognite/CogniteDestination.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
Expand All @@ -8,6 +9,7 @@
using Cognite.Extensions.DataModels.CogniteExtractorExtensions;
using Cognite.Extractor.StateStorage;
using CogniteSdk;
using CogniteSdk.Beta;
using Microsoft.Extensions.Logging;
using TimeRange = Cognite.Extractor.Common.TimeRange;

Expand Down Expand Up @@ -951,5 +953,49 @@ public async Task<CogniteResult<SequenceRowError>> InsertSequenceRowsAsync(
token).ConfigureAwait(false);
}
#endregion

#region stream_records
/// <summary>
/// Retrieve a stream, or create it if it does not exist.
/// </summary>
/// <param name="stream">Stream to create or retrieve</param>
/// <param name="token">Cancellation token</param>
/// <returns>Created or retrieved stream</returns>
/// <exception cref="ArgumentNullException"></exception>
public async Task<CogniteSdk.Beta.Stream> GetOrCreateStreamAsync(
StreamWrite stream,
CancellationToken token)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));
_logger.LogDebug("Getting or creating stream with ID {Stream}", stream.ExternalId);
return await _client.Beta.StreamRecords.GetOrCreateStreamAsync(stream, token).ConfigureAwait(false);
}

/// <summary>
/// Insert the given stream records into <paramref name="stream"/>. The stream
/// must exist.
/// </summary>
/// <param name="stream">Stream to ingest into</param>
/// <param name="records">Stream records to insert</param>
/// <param name="token">Cancellation token</param>
/// <exception cref="ArgumentNullException"></exception>
public async Task InsertRecordsAsync(
string stream,
ICollection<StreamRecordWrite> records,
CancellationToken token)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));
if (records == null) throw new ArgumentNullException(nameof(records));
_logger.LogDebug("Inserting {Count} records into the stream {Stream}",
records.Count, stream);
await _client.Beta.StreamRecords.InsertRecordsAsync(
stream,
records,
_config.CdfChunking.StreamRecords,
_config.CdfThrottling.StreamRecords,
token
).ConfigureAwait(false);
}
#endregion
}
}
10 changes: 10 additions & 0 deletions ExtractorUtils/Configuration/BaseConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public class ChunkingConfig
/// Maximum number of data modeling instances per get/create instance request
/// </summary>
public int Instances { get; set; } = 1000;

/// <summary>
/// Maximum number of stream records per ingest request.
/// </summary>
public int StreamRecords { get; set; } = 1000;
}

/// <summary>
Expand Down Expand Up @@ -279,6 +284,11 @@ public class ThrottlingConfig
/// Maximum number of parallel requests per data modeling instances operation
/// </summary>
public int Instances { get; set; } = 4;

/// <summary>
/// Maximum number of parallel requests per stream record ingest operation.
/// </summary>
public int StreamRecords { get; set; } = 4;
}
/// <summary>
/// Configure automatic retries on requests to CDF.
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.28.0
1.29.0

0 comments on commit de68c22

Please sign in to comment.