Skip to content

Commit

Permalink
Start modelling firehose (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
lopcode authored Nov 18, 2024
1 parent 26ebeaf commit be14357
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 4 deletions.
5 changes: 4 additions & 1 deletion jetstream/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ dependencies {
implementation(platform("org.slf4j:slf4j-bom:2.0.16"))
implementation("org.slf4j:slf4j-api")
implementation("com.github.luben:zstd-jni:1.5.6-7")
implementation(platform("com.fasterxml.jackson:jackson-bom:2.18.1"))
implementation("com.fasterxml.jackson.core:jackson-core")
implementation("com.fasterxml.jackson.module:jackson-module-blackbird")
}

java {
Expand Down Expand Up @@ -82,7 +85,7 @@ publishing {

pom {
name = "bluesky-jvm-jetstream"
description = "Tools to work with Bluesky's Jetstream system"
description = "Tools to help you build Bluesky integrations in Java, Kotlin, and JVM systems"
url = "https://github.com/lopcode/bluesky-jvm"
licenses {
license {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.lopcode.bluesky.jetstream;

import com.github.luben.zstd.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.module.blackbird.BlackbirdModule;
import com.github.luben.zstd.ZstdDecompressCtx;
import com.github.luben.zstd.ZstdDictDecompress;
import com.github.luben.zstd.ZstdException;
import com.lopcode.bluesky.jetstream.model.JetstreamEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
Expand All @@ -24,6 +31,11 @@ public class Jetstream {
private final int maxWebsocketFrameBufferSizeBytes = 16 * 1024 * 1000;
private final ByteBuffer frameBuffer = ByteBuffer.allocateDirect(maxWebsocketFrameBufferSizeBytes);
private final ByteBuffer decompressBuffer = ByteBuffer.allocateDirect(maxWebsocketFrameBufferSizeBytes);
private final JsonMapper jsonMapper = JsonMapper.builder()
.addModule(new BlackbirdModule())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.build();

private Instant connectedAt;

public void start() throws IOException {
Expand Down Expand Up @@ -143,6 +155,12 @@ void logDecompressedMessage(
} finally {
decompressBuffer.clear();
}
logger.info("{}: {}", messageId, text);
JetstreamEvent event;
try {
event = jsonMapper.readValue(text, JetstreamEvent.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
logger.info("{}: {}", messageId, event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package com.lopcode.bluesky.jetstream.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
defaultImpl = JetstreamEvent.Unknown.class,
property = "kind",
visible = true
)
@JsonSubTypes({
@JsonSubTypes.Type(value = JetstreamEvent.Commit.class, name = "commit"),
@JsonSubTypes.Type(value = JetstreamEvent.Identity.class, name = "identity"),
@JsonSubTypes.Type(value = JetstreamEvent.Account.class, name = "account")
})
public class JetstreamEvent {
@JsonProperty("did")
String did;
@JsonProperty("kind")
String kind;
@JsonProperty("time_us")
long unixTimeMicroseconds;

public static class Commit extends JetstreamEvent {
@JsonProperty("commit")
CommitInfo commit;

@Override
public String toString() {
return "Commit{" +
"commit=" + commit +
", did='" + did + '\'' +
", kind='" + kind + '\'' +
", unixTimeMicroseconds=" + unixTimeMicroseconds +
'}';
}
}

public static class CommitInfo {
@JsonProperty("rev")
String rev;
@JsonProperty("rkey")
String rkey;
@JsonProperty("operation")
String operation;
@JsonProperty("collection")
String collection;
@JsonProperty("cid")
String cid;
@JsonProperty("record")
JsonNode record; // todo: model atproto records

@Override
public String toString() {
return "CommitInfo{" +
"rev='" + rev + '\'' +
", rkey='" + rkey + '\'' +
", operation='" + operation + '\'' +
", collection='" + collection + '\'' +
", cid='" + cid + '\'' +
", record=" + record +
'}';
}
}

public static class Identity extends JetstreamEvent {
@JsonProperty("identity")
IdentityInfo identity;

@Override
public String toString() {
return "Identity{" +
"identity=" + identity +
", did='" + did + '\'' +
", kind='" + kind + '\'' +
", unixTimeMicroseconds=" + unixTimeMicroseconds +
'}';
}
}

public static class IdentityInfo {
@JsonProperty("did")
String did;
@JsonProperty("handle")
String handle;
@JsonProperty("seq")
Long seq;
@JsonProperty("time")
String time;

@Override
public String toString() {
return "IdentityInfo{" +
"did='" + did + '\'' +
", handle='" + handle + '\'' +
", seq=" + seq +
", time='" + time + '\'' +
'}';
}
}

public static class Account extends JetstreamEvent {
@JsonProperty("account")
AccountInfo account;

@Override
public String toString() {
return "Account{" +
"account=" + account +
'}';
}
}

public static class AccountInfo {
@JsonProperty("active")
boolean active;
@JsonProperty("did")
String did;
@JsonProperty("seq")
Long seq;
@JsonProperty("time")
String time;
@JsonProperty("status")
String status;

@Override
public String toString() {
return "AccountInfo{" +
"active=" + active +
", did='" + did + '\'' +
", seq=" + seq +
", time='" + time + '\'' +
", status='" + status + '\'' +
'}';
}
}

public static class Unknown extends JetstreamEvent {
@Override
public String toString() {
return "Unknown{" +
"did='" + did + '\'' +
", kind='" + kind + '\'' +
", unixTimeMicroseconds=" + unixTimeMicroseconds +
'}';
}
}
}

0 comments on commit be14357

Please sign in to comment.