Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some utils for stream records #493

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cognite.Extensions/CdfMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public static Summary.Child Instances(ViewIdentifier view, string operation)
"Number and duration of sequence row requests to CDF", "endpoint");
public static Counter NumberDatapoints { get; } = Metrics.CreateCounter(
"extractor_utils_cdf_datapoints", "Number of data points uploaded to CDF");
public static Summary StreamRecords { get; } = Metrics.CreateSummary("extractor_utils_cdf_stream_records",
"Number and duration of requests to endpoints for streams and records", "endpoint");

public static Counter AssetsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_assets_skipped",
"Number of assets skipped due to errors");
Expand Down
2 changes: 1 addition & 1 deletion Cognite.Extensions/Cognite.Extensions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Identity.Client" Version="4.66.1" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<PackageReference Include="CogniteSdk" Version="4.9.0" />
<PackageReference Include="CogniteSdk" Version="4.10.0" />
<PackageReference Include="prometheus-net" Version="8.2.1" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.11" />
Expand Down
2 changes: 2 additions & 0 deletions Cognite.Extensions/CogniteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extensions.DataModels;
using Cognite.Extractor.Common;
using CogniteSdk;
using CogniteSdk.Alpha;
Expand Down Expand Up @@ -613,6 +614,7 @@ public static void AddExtensionLoggers(this IServiceProvider provider)
RawExtensions.SetLogger(logger);
EventExtensions.SetLogger(logger);
SequenceExtensions.SetLogger(logger);
StreamRecordExtensions.SetLogger(logger);
}
}

Expand Down
99 changes: 99 additions & 0 deletions Cognite.Extensions/StreamRecordExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extractor.Common;
using CogniteSdk;
using CogniteSdk.Beta;
using CogniteSdk.Resources.Beta;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Cognite.Extensions
{
/// <summary>
/// Extension utility methods for the beta streamrecords resource.
/// </summary>
public static class StreamRecordExtensions
{
private static ILogger _logger = new NullLogger<Client>();

internal static void SetLogger(ILogger logger)
{
_logger = logger;
}

/// <summary>
/// Retrieve a stream, or create it if it does not exist.
/// </summary>
/// <param name="streams">Stream resource</param>
/// <param name="stream">Stream to create</param>
/// <param name="token">Cancellation token</param>
/// <returns>Created or retrieved stream.</returns>
/// <exception cref="ArgumentNullException"></exception>
public static async Task<Stream> GetOrCreateStreamAsync(
this StreamRecordsResource streams,
StreamWrite stream,
CancellationToken token
)
{
if (stream is null) throw new ArgumentNullException(nameof(stream));
try
{
using (CdfMetrics.StreamRecords.WithLabels("retrieve_stream"))
{
var res = await streams.RetrieveStreamAsync(stream.ExternalId, token).ConfigureAwait(false);
return res;
}
}
catch (ResponseException ex) when (ex.Code == 404)
{
}
using (CdfMetrics.StreamRecords.WithLabels("create_stream"))
{
_logger.LogInformation("Creating new stream with ID {Stream}", stream.ExternalId);
return await streams.CreateStreamAsync(stream, token).ConfigureAwait(false);
}
}

/// <summary>
/// Insert the given stream records into <paramref name="stream"/>. The stream
/// must exist.
/// </summary>
/// <param name="streams">Stream resource</param>
/// <param name="stream">Stream to ingest into</param>
/// <param name="records">Stream records to insert</param>
/// <param name="chunkSize">Maximum number of records per request</param>
/// <param name="throttleSize">Maximum number of parallel requests</param>
/// <param name="token">Cancellation token</param>
public static async Task InsertRecordsAsync(
this StreamRecordsResource streams,
string stream,
ICollection<StreamRecordWrite> records,
int chunkSize,
int throttleSize,
CancellationToken token
)
{
var chunks = records.ChunkBy(chunkSize);

var generators = chunks
.Select<IEnumerable<StreamRecordWrite>, Func<Task>>(
chunk => async () =>
{
using (CdfMetrics.StreamRecords.WithLabels("ingest_instances"))
{
await streams.IngestAsync(stream, chunk, token).ConfigureAwait(false);
}
}
);
int numTasks = 0;
await generators
.RunThrottled(throttleSize, (_) =>
_logger.LogDebug("{MethodName} completed {Num}/{Total} tasks", nameof(InsertRecordsAsync), ++numTasks,
Math.Ceiling((double)records.Count / chunkSize)), token)
.ConfigureAwait(false);
}
}
}
46 changes: 46 additions & 0 deletions ExtractorUtils/Cognite/CogniteDestination.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
Expand All @@ -8,6 +9,7 @@
using Cognite.Extensions.DataModels.CogniteExtractorExtensions;
using Cognite.Extractor.StateStorage;
using CogniteSdk;
using CogniteSdk.Beta;
using Microsoft.Extensions.Logging;
using TimeRange = Cognite.Extractor.Common.TimeRange;

Expand Down Expand Up @@ -951,5 +953,49 @@ public async Task<CogniteResult<SequenceRowError>> InsertSequenceRowsAsync(
token).ConfigureAwait(false);
}
#endregion

#region stream_records
/// <summary>
/// Retrieve a stream, or create it if it does not exist.
/// </summary>
/// <param name="stream">Stream to create or retrieve</param>
/// <param name="token">Cancellation token</param>
/// <returns>Created or retrieved stream</returns>
/// <exception cref="ArgumentNullException"></exception>
public async Task<CogniteSdk.Beta.Stream> GetOrCreateStreamAsync(
StreamWrite stream,
CancellationToken token)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));
_logger.LogDebug("Getting or creating stream with ID {Stream}", stream.ExternalId);
return await _client.Beta.StreamRecords.GetOrCreateStreamAsync(stream, token).ConfigureAwait(false);
}

/// <summary>
/// Insert the given stream records into <paramref name="stream"/>. The stream
/// must exist.
/// </summary>
/// <param name="stream">Stream to ingest into</param>
/// <param name="records">Stream records to insert</param>
/// <param name="token">Cancellation token</param>
/// <exception cref="ArgumentNullException"></exception>
public async Task InsertRecordsAsync(
string stream,
ICollection<StreamRecordWrite> records,
CancellationToken token)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));
if (records == null) throw new ArgumentNullException(nameof(records));
_logger.LogDebug("Inserting {Count} records into the stream {Stream}",
records.Count, stream);
await _client.Beta.StreamRecords.InsertRecordsAsync(
stream,
records,
_config.CdfChunking.StreamRecords,
_config.CdfThrottling.StreamRecords,
token
).ConfigureAwait(false);
}
#endregion
}
}
10 changes: 10 additions & 0 deletions ExtractorUtils/Configuration/BaseConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public class ChunkingConfig
/// Maximum number of data modeling instances per get/create instance request
/// </summary>
public int Instances { get; set; } = 1000;

/// <summary>
/// Maximum number of stream records per ingest request.
/// </summary>
public int StreamRecords { get; set; } = 1000;
}

/// <summary>
Expand Down Expand Up @@ -279,6 +284,11 @@ public class ThrottlingConfig
/// Maximum number of parallel requests per data modeling instances operation
/// </summary>
public int Instances { get; set; } = 4;

/// <summary>
/// Maximum number of parallel requests per stream record ingest operation.
/// </summary>
public int StreamRecords { get; set; } = 4;
}
/// <summary>
/// Configure automatic retries on requests to CDF.
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.28.0
1.29.0
Loading