Skip to content

Commit

Permalink
Use copy on write strategy for device metadata (#489)
Browse files Browse the repository at this point in the history
* Use copy on write strategy for metadata

* let client code sync writers to avoid stale data

* add unit tests

* No need to deep copy metadata values on write

* Make default values more consistent with map data

* update context method

* Abandon constructor approach

* consistent use of named return vars in short funcs

* Benchmark example usage of metadata in parallel

* update changelog

* add new release
  • Loading branch information
joe94 authored Jun 30, 2020
1 parent 64a391c commit fa6a57d
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 187 deletions.
16 changes: 12 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- fixed `ConsulWatch` in xresolver by storing and watching the correct part of the url [#490](https://github.com/xmidt-org/webpa-common/pull/490)
- fixed consul service discovery to pass QueryOptions [#490](https://github.com/xmidt-org/webpa-common/pull/490)

## [v1.10.2]
### Fixed
- Fixed `ConsulWatch` in xresolver by storing and watching the correct part of the url. [#490](https://github.com/xmidt-org/webpa-common/pull/490)
- Fixed consul service discovery to pass QueryOptions. [#490](https://github.com/xmidt-org/webpa-common/pull/490)

### Changed
- Device metadata implementation is now thread-safe and optimized for reads. [#489](https://github.com/xmidt-org/webpa-common/pull/489)


## [v1.10.1]
### Fixed
- Device metadata didn't return a read-only view of its map claims resulting in data races [#483](https://github.com/xmidt-org/webpa-common/pull/483)
- Device metadata didn't return a read-only view of its map claims resulting in data races. [#483](https://github.com/xmidt-org/webpa-common/pull/483)


## [v1.10.0]
Expand Down Expand Up @@ -99,7 +106,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- The first official release. We will be better about documenting changes
moving forward.

[Unreleased]: https://github.com/xmidt-org/webpa-common/compare/v1.10.1...HEAD
[Unreleased]: https://github.com/xmidt-org/webpa-common/compare/v1.10.2...HEAD
[v1.10.2]: https://github.com/xmidt-org/webpa-common/compare/v1.10.1...v1.10.2
[v1.10.1]: https://github.com/xmidt-org/webpa-common/compare/v1.10.0...v1.10.1
[v1.10.0]: https://github.com/xmidt-org/webpa-common/compare/v1.9.0...v1.10.0
[v1.9.0]: https://github.com/xmidt-org/webpa-common/compare/v1.8.1...v1.9.0
Expand Down
6 changes: 3 additions & 3 deletions device/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func WithIDRequest(id ID, original *http.Request) *http.Request {
}

// WithDeviceMetadata returns a new context with the given metadata as a value.
func WithDeviceMetadata(parent context.Context, metadata Metadata) context.Context {
func WithDeviceMetadata(parent context.Context, metadata *Metadata) context.Context {
return context.WithValue(parent, metadataKey, metadata)
}

// GetDeviceMetadata returns the device metadata from the context if any.
func GetDeviceMetadata(ctx context.Context) (metadata Metadata, ok bool) {
metadata, ok = ctx.Value(metadataKey).(Metadata)
func GetDeviceMetadata(ctx context.Context) (metadata *Metadata, ok bool) {
metadata, ok = ctx.Value(metadataKey).(*Metadata)
return
}
8 changes: 4 additions & 4 deletions device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Interface interface {

// Metadata returns a key value store object for information that's useful to guide interactions
// with a device such as security credentials.
Metadata() Metadata
Metadata() *Metadata

// CloseReason returns the metadata explaining why a device was closed. If this device
// is not closed, this method's return is undefined.
Expand Down Expand Up @@ -119,7 +119,7 @@ type device struct {
compliance convey.Compliance
conveyClosure conveymetric.Closure

metadata Metadata
metadata *Metadata

closeReason atomic.Value
}
Expand All @@ -131,7 +131,7 @@ type deviceOptions struct {
QueueSize int
ConnectedAt time.Time
Logger log.Logger
Metadata Metadata
Metadata *Metadata
}

// newDevice is an internal factory function for devices
Expand Down Expand Up @@ -309,7 +309,7 @@ func (d *device) ConveyCompliance() convey.Compliance {
return d.compliance
}

func (d *device) Metadata() Metadata {
func (d *device) Metadata() *Metadata {
return d.metadata
}

Expand Down
2 changes: 1 addition & 1 deletion device/device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestDevice(t *testing.T) {
QueueSize: record.expectedQueueSize,
ConnectedAt: expectedConnectedAt,
Logger: logging.NewTestLogger(nil, t),
Metadata: NewDeviceMetadata(),
Metadata: new(Metadata),
})
)

Expand Down
7 changes: 3 additions & 4 deletions device/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ func (m *manager) Connect(response http.ResponseWriter, request *http.Request, r
}

metadata, ok := GetDeviceMetadata(ctx)

if !ok {
metadata = NewDeviceMetadata()
metadata = new(Metadata)
}

cvy, cvyErr := m.conveyTranslator.FromHeader(request.Header)
Expand All @@ -164,7 +163,7 @@ func (m *manager) Connect(response http.ResponseWriter, request *http.Request, r
Logger: m.logger,
})

if len(metadata.JWTClaims()) < 1 {
if len(metadata.Claims()) < 1 {
d.errorLog.Log(logging.MessageKey(), "missing security information")
}

Expand Down Expand Up @@ -311,7 +310,7 @@ func (m *manager) readPump(d *device, r ReadCloser, closeOnce *sync.Once) {
}

deviceMetadata := event.Device.Metadata()
message.PartnerIDs = []string{deviceMetadata.JWTClaims().PartnerID()}
message.PartnerIDs = []string{deviceMetadata.PartnerIDClaim()}

if message.Type == wrp.SimpleEventMessageType {
message.SessionID = deviceMetadata.SessionID()
Expand Down
143 changes: 69 additions & 74 deletions device/metadata.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package device

import (
"encoding/json"
"sync"
"sync/atomic"

"github.com/segmentio/ksuid"
"github.com/spf13/cast"
Expand All @@ -28,107 +29,101 @@ func init() {
}

// Metadata contains information such as security credentials
// related to a device.
type Metadata map[string]interface{}

// JWTClaims returns the JWT claims attached to a device. If no claims exist,
// they are initialized appropiately.
func (m Metadata) JWTClaims() JWTClaims { // returns the type and such type has getter/setter
if jwtClaims, ok := m[JWTClaimsKey].(JWTClaims); ok {
return deepCopyMap(jwtClaims)
}
return deepCopyMap(m.initJWTClaims())
}

// SetJWTClaims sets the JWT claims attached to a device.
func (m Metadata) SetJWTClaims(jwtClaims JWTClaims) {
m[JWTClaimsKey] = jwtClaims
// related to a device. Read operations are optimized with a
// copy-on-write strategy. Client code must further synchronize concurrent
// writers to avoid stale data.
// Metadata uses an atomic.Value internally and thus it should not be copied
// after creation.
type Metadata struct {
v atomic.Value
once sync.Once
}

// SessionID returns the UUID associated with a device's current connection
// to the cluster.
func (m Metadata) SessionID() string {
if sessionID, ok := m[SessionIDKey].(string); ok {
return sessionID
}

return m.initSessionID()
}

func (m Metadata) initSessionID() string {
sessionID := ksuid.New().String()
m[SessionIDKey] = sessionID
return sessionID
// to the cluster if one has been set. The zero value is returned as default.
func (m *Metadata) SessionID() (sessionID string) {
sessionID, _ = m.loadData()[SessionIDKey].(string)
return
}

func (m Metadata) initJWTClaims() JWTClaims {
jwtClaims := JWTClaims(make(map[string]interface{}))
m.SetJWTClaims(jwtClaims)
return jwtClaims
// SetSessionID sets the UUID associated the device's current connection to the cluster.
// It uses sync.Once to ensure the sessionID is unchanged through the metadata's lifecycle.
func (m *Metadata) SetSessionID(sessionID string) {
m.once.Do(func() {
m.copyAndStore(SessionIDKey, sessionID)
})
}

// Load allows retrieving values from a device's metadata
func (m Metadata) Load(key string) interface{} {
return m[key]
// Load returns the value associated with the given key in the metadata map.
// It is not recommended modifying values returned by reference.
func (m *Metadata) Load(key string) interface{} {
return m.loadData()[key]
}

// Store allows writing values into the device's metadata given
// a key. Boolean results indicates whether the operation was successful.
// Note: operations will fail for reserved keys.
func (m Metadata) Store(key string, value interface{}) bool {
// Store updates the key value mapping in the device metadata map.
// A boolean result is given indicating whether the operation was successful.
// Operations will fail for reserved keys.
// To avoid updating keys with stale data/value, client code will need to
// synchronize the entire transaction of reading, copying, modifying and
// writing back the value.
func (m *Metadata) Store(key string, value interface{}) bool {
if reservedMetadataKeys[key] {
return false
}
m[key] = value
m.copyAndStore(key, value)
return true
}

// NewDeviceMetadata returns a metadata object ready for use.
func NewDeviceMetadata() Metadata {
return NewDeviceMetadataWithClaims(make(map[string]interface{}))
// SetClaims updates the claims associated with the device that's
// owner of the metadata.
// To avoid updating the claims with stale data, client code will need to
// synchronize the entire transaction of reading, copying, modifying and
// writing back the value.
func (m *Metadata) SetClaims(claims map[string]interface{}) {
m.copyAndStore(JWTClaimsKey, deepCopyMap(claims))
}

// NewDeviceMetadataWithClaims returns a metadata object ready for use with the
// given claims.
func NewDeviceMetadataWithClaims(claims map[string]interface{}) Metadata {
m := make(Metadata)
m.SetJWTClaims(deepCopyMap(claims))
m.initSessionID()
return m
// Claims returns the claims attached to a device. The returned map
// should not be modified to avoid any race conditions. To update the claims,
// take a look at the ClaimsCopy() function
func (m *Metadata) Claims() (claims map[string]interface{}) {
claims, _ = m.loadData()[JWTClaimsKey].(map[string]interface{})
return
}

// JWTClaims defines the interface of a device's security claims.
// One current use case is providing security credentials the device
// presented at registration time.
type JWTClaims map[string]interface{}
// ClaimsCopy returns a deep copy of the claims. Use this, along with the
// SetClaims() method to update the claims.
func (m *Metadata) ClaimsCopy() map[string]interface{} {
return deepCopyMap(m.Claims())
}

// Trust returns the device's trust level claim
// TrustClaim returns the device's trust level claim.
// By Default, a device is untrusted (trust = 0).
func (c JWTClaims) Trust() int {
if trust, ok := c[TrustClaimKey].(int); ok {
return trust
}
return 0
func (m *Metadata) TrustClaim() (trust int) {
trust, _ = m.Claims()[TrustClaimKey].(int)
return
}

// PartnerID returns the partner ID claim.
// PartnerIDClaim returns the partner ID claim.
// If no claim is found, the zero value is returned.
func (c JWTClaims) PartnerID() string {
if partnerID, ok := c[PartnerIDClaimKey].(string); ok {
return partnerID
}
return "" // no partner by default
func (m *Metadata) PartnerIDClaim() (partnerID string) {
partnerID, _ = m.Claims()[PartnerIDClaimKey].(string)
return
}

func (m *Metadata) loadData() (data map[string]interface{}) {
data, _ = m.v.Load().(map[string]interface{})
return
}

// SetTrust modifies the trust level of the device which owns these
// claims.
func (c JWTClaims) SetTrust(trust int) {
c[TrustClaimKey] = trust
func (m *Metadata) storeData(data map[string]interface{}) {
m.v.Store(data)
}

// MarshalJSON allows easy JSON representation of the JWTClaims underlying claims map.
func (c JWTClaims) MarshalJSON() ([]byte, error) {
return json.Marshal(c)
func (m *Metadata) copyAndStore(key string, val interface{}) {
data := deepCopyMap(m.loadData())
data[key] = val
m.storeData(data)
}

func deepCopyMap(m map[string]interface{}) map[string]interface{} {
Expand Down
Loading

0 comments on commit fa6a57d

Please sign in to comment.