Skip to content

Commit

Permalink
Fix HCS latency regression (0.97)
Browse files Browse the repository at this point in the history
Fix the HCS E2E latency regression by invoking RedisPublisher before SqlEntityListener

Signed-off-by: Steven Sheehy <[email protected]>
  • Loading branch information
steven-sheehy authored Jan 24, 2024
1 parent 99bd6a9 commit 2a527ad
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
@ConditionOnEntityRecordParser
@CustomLog
@Named
@Order(2)
@Order(1)
public class NotifyingPublisher implements BatchPublisher {

private static final String SQL = "select pg_notify('topic_message', ?)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@ConditionOnEntityRecordParser
@CustomLog
@Named
@Order(1)
@Order(0) // Triggering the async publishing before other operations can reduce latency
public class RedisPublisher implements BatchPublisher {

private static final String TOPIC_FORMAT = "topic.%d";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@

@CustomLog
@Named
@Order(0)
@Order(2)
@ConditionOnEntityRecordParser
@RequiredArgsConstructor
public class SqlEntityListener implements EntityListener, RecordStreamFileListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

@CustomLog
@Named
@Order(1)
@Order(3)
@ConditionOnEntityRecordParser
public class TopicMessageLookupEntityListener implements EntityListener, RecordStreamFileListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package com.hedera.mirror.importer.parser.record.entity.redis;

import static org.assertj.core.api.Assertions.assertThat;

import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.common.domain.topic.StreamMessage;
import com.hedera.mirror.common.domain.topic.TopicMessage;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.parser.record.entity.BatchPublisherTest;
import com.hedera.mirror.importer.parser.record.entity.ParserContext;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.publisher.Flux;
Expand All @@ -29,18 +34,26 @@
class RedisPublisherIntegrationTest extends BatchPublisherTest {

private final ReactiveRedisOperations<String, StreamMessage> redisOperations;
private final List<RecordStreamFileListener> streamFileListeners;

public RedisPublisherIntegrationTest(
RedisPublisher entityListener,
RedisPublisher redisPublisher,
ParserContext parserContext,
RedisProperties properties,
ReactiveRedisOperations<String, StreamMessage> redisOperations) {
super(entityListener, parserContext, properties);
ReactiveRedisOperations<String, StreamMessage> redisOperations,
List<RecordStreamFileListener> streamFileListeners) {
super(redisPublisher, parserContext, properties);
this.redisOperations = redisOperations;
this.streamFileListeners = streamFileListeners;
}

@Override
protected Flux<TopicMessage> subscribe(EntityId topicId) {
return redisOperations.listenToChannel("topic." + topicId.getId()).map(m -> (TopicMessage) m.getMessage());
}

@Test
void publishesFirst() {
assertThat(streamFileListeners).first().isEqualTo(batchPublisher);
}
}

0 comments on commit 2a527ad

Please sign in to comment.