Skip to content

Commit

Permalink
Add CreateSchemaWithIDAndVersion() to the sr package
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Nov 18, 2024
1 parent 81ceb1a commit 143aeab
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions pkg/sr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ type (
// a Kafka topic, and whether this is for a key or value. For example,
// "foo-key" would be the subject for the foo topic for serializing the
// key field of a record.
Subject string `json:"subject"`
Subject string `json:"subject,omitempty"`

// Version is the version of this subject.
Version int `json:"version"`
Version int `json:"version,omitempty"`

// ID is the globally unique ID of the schema.
ID int `json:"id"`
ID int `json:"id,omitempty"`

Schema
}
Expand Down Expand Up @@ -352,18 +352,38 @@ func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema,
//
// This supports param [Normalize].
func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error) {
return cl.CreateSchemaWithIDAndVersion(ctx, subject, s, -1, -1)
}

// CreateSchemaWithIDAndVersion attempts to create a schema with a fixed ID and
// version ID in the given subject. If the id is set to -1 or 0, this method is
// equivalent to CreateSchema(). If the versionID is set to -1 or 0, it will be
// omitted when creating the schema.
//
// This supports param [Normalize].
func (cl *Client) CreateSchemaWithIDAndVersion(ctx context.Context, subject string, s Schema, id, versionID int) (SubjectSchema, error) {
// POST /subjects/{subject}/versions => returns ID
// Newer SR returns the full SubjectSchema, but old does not, so we
// re-request to find the full information.
path := pathSubjectWithVersion(subject)
var id struct {
var into struct {
ID int `json:"id"`
}
if err := cl.post(ctx, path, s, &id); err != nil {
return SubjectSchema{}, err
if id == -1 {
if err := cl.post(ctx, path, s, &into); err != nil {
return SubjectSchema{}, err
}
} else {
ss := SubjectSchema{Schema: s, ID: id}
if versionID != -1 {
ss.Version = versionID
}
if err := cl.post(ctx, path, ss, &into); err != nil {
return SubjectSchema{}, err
}
}

usages, err := cl.SchemaUsagesByID(ctx, id.ID)
usages, err := cl.SchemaUsagesByID(ctx, into.ID)
if err != nil {
return SubjectSchema{}, err
}
Expand All @@ -372,7 +392,7 @@ func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (S
return usage, nil
}
}
return SubjectSchema{}, fmt.Errorf("created schema under id %d, but unable to find SubjectSchema", id.ID)
return SubjectSchema{}, fmt.Errorf("created schema under id %d, but unable to find SubjectSchema", into.ID)
}

// LookupSchema checks to see if a schema is already registered and if so,
Expand Down

0 comments on commit 143aeab

Please sign in to comment.