Skip to content

Commit

Permalink
Add support for CDM Assets (#488)
Browse files Browse the repository at this point in the history
* Add support for CDM Assets

* passthrough view identifiers as metric labels

* add testing method that allows capturing intermediary values

* bump version

* make parent summary persistent

* cleanup
  • Loading branch information
ozangoktan authored Nov 6, 2024
1 parent ac961d7 commit f44a1f5
Show file tree
Hide file tree
Showing 11 changed files with 791 additions and 349 deletions.
4 changes: 2 additions & 2 deletions Cognite.Extensions/Assets/AssetUpdateResultHandlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private static bool IsAffected(AssetUpdateItem item, HashSet<Identity> badValues


/// <summary>
/// Clean list of AssetCreate objects based on CogniteError object
/// Clean list of AssetUpdate objects based on CogniteError object
/// </summary>
/// <param name="error">Error that occured with a previous push</param>
/// <param name="items">Assets to clean</param>
Expand All @@ -117,7 +117,7 @@ public static IEnumerable<AssetUpdateItem> CleanFromError(
CogniteError<AssetUpdateItem> error,
IEnumerable<AssetUpdateItem> items)
{
return CleanFromErrorCommon(error, items, IsAffected, item => item, CdfMetrics.TimeSeriesUpdatesSkipped);
return CleanFromErrorCommon(error, items, IsAffected, item => item, CdfMetrics.AssetUpdatesSkipped);
}

/// <summary>
Expand Down
12 changes: 9 additions & 3 deletions Cognite.Extensions/CdfMetrics.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Prometheus;
using CogniteSdk.DataModels;
using Prometheus;

namespace Cognite.Extensions
{
Expand All @@ -8,8 +9,11 @@ static class CdfMetrics
"Number and duration of asset requests to CDF", "endpoint");
public static Summary TimeSeries { get; } = Metrics.CreateSummary("extractor_utils_cdf_timeseries_requests",
"Number and duration of time-series requests to CDF", "endpoint");
public static Summary CoreTimeSeries { get; } = Metrics.CreateSummary("extractor_utils_cdf_core_timeseries_requests",
"Number and duration of core data model time-series requests to CDF", "endpoint");
private static Summary _instances = Metrics.CreateSummary("extractor_utils_cdf_instances_requests", "Number and duration of instance requests CDF data modeling", "endpoint", "view_identifier");
public static Summary.Child Instances(ViewIdentifier view, string operation)
{
return _instances.WithLabels(operation, $"{view.Space}-{view.ExternalId}");
}
public static Summary Datapoints { get; } = Metrics.CreateSummary("extractor_utils_cdf_datapoint_requests",
"Number and duration of datapoint requests to CDF", "endpoint");
public static Summary Events { get; } = Metrics.CreateSummary("extractor_utils_cdf_event_requests",
Expand Down Expand Up @@ -42,5 +46,7 @@ static class CdfMetrics
"Number of asset updates skipped due to errors");
public static Counter TimeSeriesUpdatesSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_timeseries_updates_skipped",
"Number of timeseries updates skipped due to errors");
public static Counter InstanceUpsertsSkipped { get; } = Metrics.CreateCounter("extractor_utils_cdf_instance_upserts_skipped",
"Number of data modeling instance upserts skipped due to errors");
}
}
6 changes: 5 additions & 1 deletion Cognite.Extensions/CogniteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,11 @@ public enum RequestType
/// <summary>
/// Update timeseries
/// </summary>
UpdateTimeSeries
UpdateTimeSeries,
/// <summary>
/// Upsert instances to data modeling
/// </summary>
UpsertInstances,
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Collections.Generic;
using CogniteSdk.DataModels.Core;

namespace Cognite.Extensions.DataModels.CogniteExtractorExtensions
{
/// <summary>
/// CDM Asset extended for use with extractors
/// </summary>
public class CogniteExtractorAsset : CogniteAsset
{
/// <summary>
/// Unstructured metadata extracted from the source system.
/// </summary>
public Dictionary<string, string>? extractedData { get; set; }

/// <summary>
/// Empty Constructor.
/// </summary>
public CogniteExtractorAsset() : base() { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CogniteSdk;
using CogniteSdk.DataModels;
using CogniteSdk.DataModels.Core;
using CogniteSdk.Resources.DataModels;

namespace Cognite.Extensions.DataModels.CogniteExtractorExtensions
{
/// <summary>
/// Extension utility methods for <see cref="Client"/>
/// </summary>
public static class ExtractorAssetExtensions
{
/// <summary>
/// Get or create the assets with the provided <paramref name="assets"/> if they exist in CDF.
/// If one or more do not exist, use the <paramref name="buildAsset"/> function to construct
/// the missing asset objects and upload them to CDF using the chunking of items and throttling
/// passed as parameters
/// If any items fail to be created due to missing assets, duplicated externalId, duplicated
/// legacy name, or missing dataSetId, they can be removed before retrying by setting <paramref name="retryMode"/>
/// </summary>
/// <param name="resource">CogniteSdk CDM Asset resource</param>
/// <param name="assets">Asset instance ids</param>
/// <param name="buildAsset">Function that builds CogniteSdk SourcedNodeWrite objects</param>
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to handle failed requests</param>
/// <param name="sanitationMode">The type of sanitation to apply to assets before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult{TResult, TError}"/> containing errors that occured and a list of the created and found assets</returns>
public static Task<CogniteResult<SourcedNode<T>, SourcedNodeWrite<T>>> GetOrCreateAssetsAsync<T>(
this CoreAssetResource<T> resource,
IEnumerable<InstanceIdentifier> assets,
Func<IEnumerable<InstanceIdentifier>, IEnumerable<SourcedNodeWrite<T>>> buildAsset,
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token) where T : CogniteExtractorAsset
{
Task<IEnumerable<SourcedNodeWrite<T>>> asyncBuildAsset(IEnumerable<InstanceIdentifier> ids)
{
return Task.FromResult(buildAsset(ids));
}
return resource.GetOrCreateAssetsAsync<T>(assets, asyncBuildAsset,
chunkSize, throttleSize, retryMode, sanitationMode, token);
}

/// <summary>
/// Get or create the assets with the provided <paramref name="assets"/> if they exist in CDF.
/// If one or more do not exist, use the <paramref name="buildAsset"/> function to construct
/// the missing asset objects and upload them to CDF using the chunking of items and throttling
/// passed as parameters
/// If any items fail to be created due to missing assets, duplicated externalId, duplicated
/// legacy name, or missing dataSetId, they can be removed before retrying by setting <paramref name="retryMode"/>
/// </summary>
/// <param name="resource">CogniteSdk CDM Asset resource</param>
/// <param name="assets">Asset instance ids</param>
/// <param name="buildAsset">Async function that builds CogniteSdk SourcedNodeWrite objects</param>
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to handle failed requests</param>
/// <param name="sanitationMode">The type of sanitation to apply to assets before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult{TResult, TError}"/> containing errors that occured and a list of the created and found assets</returns>
public static async Task<CogniteResult<SourcedNode<T>, SourcedNodeWrite<T>>> GetOrCreateAssetsAsync<T>(
this CoreAssetResource<T> resource,
IEnumerable<InstanceIdentifier> assets,
Func<IEnumerable<InstanceIdentifier>, Task<IEnumerable<SourcedNodeWrite<T>>>> buildAsset,
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token) where T : CogniteExtractorAsset
{
return await DataModelUtils.GetOrCreateResourcesAsync(resource, assets, buildAsset, DataModelSanitation.CleanInstanceRequest, chunkSize, throttleSize, retryMode, sanitationMode, token).ConfigureAwait(false);
}

/// <summary>
/// Ensures that all assets in <paramref name="assetsToEnsure"/> exists in CDF.
/// Tries to create the assets and returns when all are created or have been removed
/// due to issues with the request.
/// If any items fail to be created due to missing assets, duplicated externalId, duplicated
/// legacy name, or missing dataSetId, they can be removed before retrying by setting <paramref name="retryMode"/>
/// Assets will be returned in the same order as given in <paramref name="assetsToEnsure"/>
/// </summary>
/// <param name="resource">CogniteSdk CDM Asset resource</param>
/// <param name="assetsToEnsure">List of CogniteSdk SourcedNodeWrite objects</param>
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to do retries. Keeping duplicates is not valid for
/// this method.</param>
/// <param name="sanitationMode">The type of sanitation to apply to assets before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult{TResult, TError}"/> containing errors that occured and a list of the created assets</returns>
public static async Task<CogniteResult<SourcedNode<T>, SourcedNodeWrite<T>>> EnsureAssetsExistsAsync<T>(
this CoreAssetResource<T> resource,
IEnumerable<SourcedNodeWrite<T>> assetsToEnsure,
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token) where T : CogniteAssetBase
{
return await DataModelUtils.EnsureResourcesExistsAsync(resource, assetsToEnsure, DataModelSanitation.CleanInstanceRequest, chunkSize, throttleSize, retryMode, sanitationMode, token).ConfigureAwait(false);
}

/// <summary>
/// Get the assets with the provided <paramref name="ids"/>. Ignore any
/// unknown ids
/// </summary>
/// <param name="resource">CogniteSdk CDM Asset resource</param>
/// <param name="ids">List of <see cref="Identity"/> objects</param>
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="token">Cancellation token</param>
/// <returns></returns>
public static async Task<IEnumerable<SourcedNode<T>>> GetAssetsByIdsIgnoreErrors<T>(
this CoreAssetResource<T> resource,
IEnumerable<Identity> ids,
int chunkSize,
int throttleSize,
CancellationToken token) where T : CogniteAssetBase
{
return await DataModelUtils.GetResourcesByIdsIgnoreErrors<T, CoreAssetResource<T>>(resource, ids, chunkSize, throttleSize, token).ConfigureAwait(false);
}

/// <summary>
/// Upsert assets.
/// If any items fail to be created due to duplicated instance ids, they can be removed before retrying by setting <paramref name="retryMode"/>
/// Assets will be returned in the same order as given in <paramref name="items"/>
/// </summary>
/// <param name="resource">CogniteSdk CDM Asset resource</param>
/// <param name="items">List of asset updates</param>
/// <param name="chunkSize">Maximum number of assets per request</param>
/// <param name="throttleSize">Maximum number of parallel requests</param>
/// <param name="retryMode">How to handle retries</param>
/// <param name="sanitationMode">What kind of pre-request sanitation to perform</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult{TResult, TError}"/> containing errors that occured and a list of the updated assets</returns>
public static async Task<CogniteResult<SlimInstance, SourcedNodeWrite<T>>> UpsertAsync<T>(
this CoreAssetResource<T> resource,
IEnumerable<SourcedNodeWrite<T>> items,
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token) where T : CogniteAssetBase
{
return await DataModelUtils.UpsertAsync(resource, items, DataModelSanitation.CleanInstanceRequest, chunkSize, throttleSize, retryMode, sanitationMode, token).ConfigureAwait(false);
}
}
}
Loading

0 comments on commit f44a1f5

Please sign in to comment.