Skip to content

Commit

Permalink
Various testing and other fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dorner committed Dec 5, 2024
1 parent faa642e commit 6fe16d5
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 33 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ source 'https://rubygems.org'

# Specify your gem's dependencies in boilerplate.gemspec
gemspec
gem 'karafka', git: 'https://github.com/dorner/karafka', branch: 'redraw'
29 changes: 15 additions & 14 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,22 @@ Deimos.configure do
end
```

| Config name | Default | Description |
|--------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------|
| producer_class | nil | ActiveRecordProducer class to use for sending messages. |
| mode | :time_based | Whether to use time-based polling or state-based polling. |
| run_every | 60 | Amount of time in seconds to wait between runs. |
| Config name | Default | Description |
|--------------------------|--------------|---------------------------------------------------------------------------------------------------------------------------------------|
| producer_class | nil | ActiveRecordProducer class to use for sending messages. |
| producer_classes | [] | Array of ActiveRecordProducer classes to use for sending messages. You can use this instead of `producer_class`. |
| mode | :time_based | Whether to use time-based polling or state-based polling. |
| run_every | 60 | Amount of time in seconds to wait between runs. |
| timestamp_column | `:updated_at` | Name of the column to query. Remember to add an index to this column! |
| delay_time | 2 | Amount of time in seconds to wait before picking up records, to allow for transactions to finish. |
| retries | 1 | The number of times to retry for a *non-Kafka* error. |
| full_table | false | If set to true, do a full table dump to Kafka each run. Good for very small tables. Time-based only. |
| start_from_beginning | true | If false, start from the current time instead of the beginning of time if this is the first time running the poller. Time-based only. |
| state_column | nil | If set, this represents the DB column to use to update publishing status. State-based only. |
| publish_timestamp_column | nil | If set, this represents the DB column to use to update when publishing is done. State-based only. |
| published_state | nil | If set, the poller will update the `state_column` to this value when publishing succeeds. State-based only. |
| failed_state | nil | If set, the poller will update the `state_column` to this value when publishing fails. State-based only. |
| poller_class | nil | Poller subclass name to use for publishing to multiple kafka topics from a single poller. |
| delay_time | 2 | Amount of time in seconds to wait before picking up records, to allow for transactions to finish. |
| retries | 1 | The number of times to retry for a *non-Kafka* error. |
| full_table | false | If set to true, do a full table dump to Kafka each run. Good for very small tables. Time-based only. |
| start_from_beginning | true | If false, start from the current time instead of the beginning of time if this is the first time running the poller. Time-based only. |
| state_column | nil | If set, this represents the DB column to use to update publishing status. State-based only. |
| publish_timestamp_column | nil | If set, this represents the DB column to use to update when publishing is done. State-based only. |
| published_state | nil | If set, the poller will update the `state_column` to this value when publishing succeeds. State-based only. |
| failed_state | nil | If set, the poller will update the `state_column` to this value when publishing fails. State-based only. |
| poller_class | nil | Poller subclass name to use for publishing to multiple kafka topics from a single poller. |

## Karafka Routing

Expand Down
1 change: 1 addition & 0 deletions docs/UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ The following events have been **renamed**:

### Additional breaking changes
* `key_config` now defaults to `{none: true}` instead of erroring out if not set.
* `reraise_errors` now defaults to true if the Rails env is set to `test`, and false otherwise.
* `fatal_error?` now receives a Karafka `messages` object instead of a payload hash or array of hashes.
* `watched_attributes` has been moved from the corresponding ActiveRecord class to the ActiveRecordProducer class. The object being watched is passed into the method.
* Removed `TestHelpers.full_integration_test!` and `kafka_test!` as Karafka does not currently support these use cases. If we need them back, we will need to put in changes to the testing library to support them.
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def karafka_config_for(topic: nil, producer: nil)
if topic
karafka_configs.find { |t| t.name == topic}
elsif producer
karafka_configs.find { |t| t.producer_class == producer}
karafka_configs.find { |t| t.producer_classes.include?(producer)}
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def send_events(records, force_send: false)
end

def config
Deimos.karafka_configs.find { |t| t.producer_class == self }
Deimos.karafka_configs.find { |t| t.producer_classes.include?(self) }
end

def encoder
Expand Down
3 changes: 2 additions & 1 deletion lib/deimos/ext/consumer_route.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ module Topic
bulk_import_id_column: :bulk_import_id,
replace_associations: true,
each_message: false,
reraise_errors: Rails.env.test?,
bulk_import_id_generator: proc { SecureRandom.uuid },
fatal_error: proc { false }
)
if args.any?
if args.size.positive?
@deimos_config.public_send("#{field}=", args[0])
end
@deimos_config[field]
Expand Down
18 changes: 13 additions & 5 deletions lib/deimos/ext/producer_route.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
module Deimos
class ProducerRoute < Karafka::Routing::Features::Base
FIELDS = %i(producer_class payload_log disabled)
FIELDS = %i(producer_classes payload_log disabled)

Config = Struct.new(*FIELDS, keyword_init: true)
Config = Struct.new(*FIELDS, keyword_init: true) do
def producer_class=(val)
self.producer_classes = [val]
end

def producer_class
self.producer_classes.first
end
end
module Topic
FIELDS.each do |field|
(FIELDS + [:producer_class]).each do |field|
define_method(field) do |*args|
active(false) if field == :producer_class
active(false) if %i(producer_class producer_classes).include?(field)
@deimos_producer_config ||= Config.new
if args.any?
@deimos_producer_config.public_send("#{field}=", args[0])
_deimos_setup_transcoders if schema && namespace
end
@deimos_producer_config[field]
@deimos_producer_config.send(field)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head
end

def karafka_config
Deimos.karafka_configs.find { |topic| topic.producer_class == self }
Deimos.karafka_configs.find { |topic| topic.producer_classes.include?(self) }
end

def topic
Expand Down
26 changes: 18 additions & 8 deletions lib/deimos/test_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def _frk_failure_message(topic, message, key=nil, partition_key=nil, was_negated
message_key = Deimos::TestHelpers.normalize_message(key)
hash_matcher = RSpec::Matchers::BuiltIn::Match.new(message)
Deimos::TestHelpers.sent_messages.any? do |m|
message.delete(:payload_key) if message.respond_to?(:[]) && message[:payload_key].nil?
m[:payload].delete(:payload_key) if m.respond_to?(:[]) && m[:payload]&.respond_to?(:[]) && m[:payload][:payload_key].nil?
hash_matcher.send(:match, message, m[:payload]) &&
topic == m[:topic] &&
(key.present? ? message_key == m[:key] : true) &&
Expand Down Expand Up @@ -171,6 +173,7 @@ def test_consume_batch(handler_class_or_topic,
unless call_original.nil?
puts "test_consume_batch(call_original: true) is deprecated and will be removed in the future. You can remove the call_original parameter."
end
karafka.consumer_messages.clear
consumer = nil
topic_name = nil
if handler_class_or_topic.is_a?(String)
Expand All @@ -183,15 +186,22 @@ def test_consume_batch(handler_class_or_topic,

Deimos.karafka_config_for(topic: topic_name).each_message(single)

payloads.each_with_index do |payload, i|
karafka.produce(payload, {key: keys[i], partition_key: partition_keys[i], topic: consumer.topic.name})
end
if block_given?
allow_any_instance_of(consumer_class).to receive(:consume_batch) do
yield
end
# don't record messages sent with test_consume_batch
original_messages = karafka.produced_messages.dup
payloads.each_with_index do |payload, i|
karafka.produce(payload, {key: keys[i], partition_key: partition_keys[i], topic: consumer.topic.name})
end
if block_given?
allow_any_instance_of(consumer_class).to receive(:consume_batch) do
yield
end
consumer.consume
end

# sent_messages should only include messages sent by application code, not this method
karafka.produced_messages.clear
karafka.produced_messages.concat(original_messages)

consumer.consume
end
end
end
6 changes: 4 additions & 2 deletions lib/deimos/transcoder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ def decode_key(key)
return nil if key.nil? || self.key_field.nil?

decoded_key = self.backend.decode_key(key, self.key_field)
return decoded_key unless @use_schema_classes
return decoded_key if self.key_field || !@use_schema_classes

Utils::SchemaClass.instance(decoded_key,
schema_key = decoded_key.is_a?(Hash) ? decoded_key : { self.key_field => decoded_key }

Utils::SchemaClass.instance(schema_key,
"#{@schema}_key",
@namespace)
end
Expand Down

0 comments on commit 6fe16d5

Please sign in to comment.