-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add v3 version of NATS JetStream protocol #1095
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the samples, please take a look at how we use a protocol in https://github.com/cloudevents/sdk-go/blob/main/samples/kafka_confluent/sender/main.go
16961e8
to
6f1ff3d
Compare
6f1ff3d
to
df22aee
Compare
0e2da9c
to
2cbe682
Compare
@stephen-totty-hpe Thanks! It looks good to me. Leave it to @embano1 ~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for all the hard work you put into this! Some minor comments on the Options implementation (really like it) and I think we can further reduce the complexity of this code base, especially in the sender/receiver implementations. I suggested to take a look at the HTTP/Kafka (Confluent) protocol implementation for how to simplify the internal logic of this protocol.
2cbe682
to
9d9a30e
Compare
5a16005
to
7f198df
Compare
7f198df
to
fc9cf7a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx again for making the changes! I think this is close to merging, major feedback is now about:
- error types, I think we should use a different approach here to keep the code simple, maintainable, and easy to use for developers, such as using this approach (with exported type though)
- options, room for some developer experience improvements to avoid confusion how to use the protocol
p.subMtx.Lock() | ||
defer p.subMtx.Unlock() | ||
|
||
if p.connOwned && p.conn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still want to track owned connections? Currently the comment makes it sound that even if I specify a custom connection, conn would be closed (which would not be the case). I feel we can remove connOwned checks and handle connection closing for both situations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The v2 version had this concept. The idea is that if a connection was passed in, we would use it and then exit without closing the connection (since it was opened before use).
With a URL, we create a connection and then close it when Close() is called:
p.conn.Close()
I am not sure if users would complain if we closed their "passed in connection".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if users would complain if we closed their "passed in connection".
Well, this is what I was referring to earlier: users are in full control (and it's actually what your doc string says for Close()
). So users can decide when they underlying connection (either ours or their provided) will be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the comments for Close() and added a few new comments. I removed the connOwned
member and only use the url
member.
fc9cf7a
to
58271b5
Compare
p.subMtx.Lock() | ||
defer p.subMtx.Unlock() | ||
|
||
if p.connOwned && p.conn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if users would complain if we closed their "passed in connection".
Well, this is what I was referring to earlier: users are in full control (and it's actually what your doc string says for Close()
). So users can decide when they underlying connection (either ours or their provided) will be closed.
} | ||
|
||
// validationError is returned when an invalid option is given | ||
type validationError struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do you intentionally want to keep this internal? Fine with me to not increase the API surface for now :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left internal. I can go either way.
…samples Signed-off-by: stephen-totty-hpe <[email protected]>
58271b5
to
c73615f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for your contribution and patience along the review! I really like the new NATS JS API you created here!
I think your other JS-related PRs can now be closed?
As been discussed previously, there has been a desire to implement the CloudEventSDK nats jetstream protocol using the newer jetstream package. The newer jetstream package exposes more jetstream specific functionality that should make it more flexible to use.
Much of this code is stolen from the v2 implementation and changed where necessary. It builds upon comments made in:
#1083
Under the covers, this uses a jetstream.Consumer, which can be "normal" or "ordered". This is done using consumer options WithConsumerConfig and WithOrderedConsumerConfig.
Also, the need to know the stream names upfront is not needed. Many of the internal options are available if needed, although the only required option is the ConsumerOption with the correct config.