diff --git a/cli/golang/tectonic.go b/cli/golang/tectonic.go index af2eba2b..1bc1a6c0 100644 --- a/cli/golang/tectonic.go +++ b/cli/golang/tectonic.go @@ -1,10 +1,15 @@ package tectonic import ( + "bufio" "bytes" + "encoding/binary" + "errors" "fmt" "net" "strconv" + "strings" + "time" "github.com/pquerna/ffjson/ffjson" ) @@ -21,109 +26,141 @@ type Delta struct { IsBid bool `json:"is_bid"` } -// Tectonic : Main type for single-instance connection to the Tectonic database -type Tectonic struct { - // Connection settings - Host string - Port uint16 // type ensures port selected is valid - Connection net.Conn +const ( + successByte = 0x1 +) - // TODO: Create authentication mechanisms in TectonicDB project, then these will be functional - Username string - Password string +var ( + ErrConnection = errors.New("Connection error") + ErrFailedWrite = errors.New("Failed to write to server") + ErrFailedRead = errors.New("Failed to read from server") + ErrFailedReadSize = errors.New("Failed to read response size") +) - CurrentDB string - CurrentSymbol string - CurrentExchange string +type TectonicError struct { + Message string } -// TectonicDB function prototypes -// **************************** -// Help() ( string, error ) done -// Ping() ( string, error ) done -// Info() ( string, error ) done -// Perf() ( string, error ) done -// BulkAdd(ticks *[]Delta) error done -// BulkAddInto(dbName string, ticks *[]Delta) error done -// Use(dbName string) error done -// Create(dbName string) error done -// Get(amount int) ( *[]Delta, error ) done -// GetFrom(amount int, dbName string, asTick bool) ( *[]Delta, error ) done -// Insert(tick *Delta) error done -// InsertInto(dbName string, tick *Delta) error done -// Count() uint64 done -// CountAll() uint64 done -// Clear() error done -// ClearAll() error done -// Flush() error done -// FlushAll() error done -// Subscribe(dbName, message chan string) error incomplete -// Unsubscribe() error incomplete -// Exists(dbName string) bool done -// -// Locally defined methods: -// **************************** -// Connect() error done -// SendMessage() ( string, error ) done -// **************************** - -// Pool : TODO -type Pool struct{} - -// DefaultTectonic : Default settings for Tectonic structure -var DefaultTectonic = Tectonic{ - Host: "127.0.0.1", - Port: 9001, +func (e *TectonicError) Error() string { + return e.Message } -// Connect : Connects Tectonic instance to the database. Run to initialize -func (t *Tectonic) Connect() error { - var ( - connectAddress = fmt.Sprintf("%s:%d", t.Host, t.Port) - connectErr error - ) +func NewTectonicError(message string) *TectonicError { + return &TectonicError{Message: message} +} + +type TectonicClient struct { + conn net.Conn + host string + port string + currentDB string +} + +func NewTectonicClient(host, port string) (*TectonicClient, error) { + addr := net.JoinHostPort(host, port) + fmt.Printf("Connecting to %s\n", addr) + + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, ErrConnection + } + + return &TectonicClient{ + conn: conn, + host: host, + port: port, + }, nil +} - t.Connection, connectErr = net.Dial("tcp", connectAddress) +func (c *TectonicClient) Cmd(command string) (string, error) { + if err := c.sendCommand(command); err != nil { + return "", err + } + + return c.readResponse() +} - return connectErr +func (c *TectonicClient) sendCommand(command string) error { + writer := bufio.NewWriter(c.conn) + + length := uint32(len(command)) + if err := binary.Write(writer, binary.BigEndian, length); err != nil { + fmt.Println(err) + return ErrFailedWrite + } + if _, err := writer.WriteString(command); err != nil { + fmt.Println(err) + return ErrFailedWrite + } + return writer.Flush() } -// SendMessage : Sends message to TectonicDB -func (t *Tectonic) SendMessage(message string) (string, error) { - var readBuf = make([]byte, (1 << 15)) +func (c *TectonicClient) readResponse() (string, error) { + successBuf := make([]byte, 1) + if _, err := c.conn.Read(successBuf); err != nil { + return "", err + } + success := successBuf[0] == successByte + + // Read response size + var size uint64 + if err := binary.Read(c.conn, binary.BigEndian, &size); err != nil { + return "", err + } + + // Read response data + buf := make([]byte, size) + if _, err := c.conn.Read(buf); err != nil { + return "", err + } + + res := string(buf) + if success { + return res, nil + } - _, _ = t.Connection.Write([]byte(message + "\n")) - _, readErr := t.Connection.Read(readBuf) + return "", c.handleServerError(res) +} - return string(readBuf), readErr +func (c *TectonicClient) handleServerError(response string) error { + if strings.HasPrefix(response, "ERR: DB") { + // More robust error parsing, handling spaces in bookName + parts := strings.SplitN(response, " ", 3) + if len(parts) < 3 { + return NewTectonicError("DB error without book name") + } + bookName := parts[2] + return NewTectonicError(fmt.Sprintf("DBNotFoundErrors", bookName)) + } + return NewTectonicError(fmt.Sprintf("ServerErrors", response)) } // Help : Return help string from Tectonic server -func (t *Tectonic) Help() (string, error) { - return t.SendMessage("HELP") +func (t *TectonicClient) Help() (string, error) { + return t.Cmd("HELP") } // Ping : Sends a ping message to the TectonicDB server -func (t *Tectonic) Ping() (string, error) { - return t.SendMessage("PING") +func (t *TectonicClient) Ping() (string, error) { + return t.Cmd("PING") } // Info : From official documentation: "Returns info about table schemas" -func (t *Tectonic) Info() (string, error) { - return t.SendMessage("INFO") +func (t *TectonicClient) Info() (string, error) { + return t.Cmd("INFO") } // Perf : From official documentation: "Returns the answercount of items over time" -func (t *Tectonic) Perf() (string, error) { - return t.SendMessage("PERF") +func (t *TectonicClient) Perf() (string, error) { + return t.Cmd("PERF") } -// BulkAdd : Batch-inserts deltas stored in an array into the TectonicDB server. If you want +// BulkInsert : Batch-inserts deltas stored in an array into the TectonicDB server. If you want // to select what data-store you want to insert the batch into, consider using the function `BulkAddInto`. -func (t *Tectonic) BulkAdd(ticks *[]Delta) error { - _, _ = t.SendMessage("BULKADD") +func (t *TectonicClient) BulkInsert(ticks []*Delta, dbName string) error { + //_, _ = t.Cmd("BULKADD ") - for _, tick := range *ticks { + for _, tick := range ticks { var ( isTrade = "f" isBid = "f" @@ -135,19 +172,20 @@ func (t *Tectonic) BulkAdd(ticks *[]Delta) error { isBid = "t" } - _, _ = t.SendMessage(fmt.Sprintf("%.3f, %d, %s, %s, %f, %f;", tick.Timestamp, tick.Seq, isTrade, isBid, tick.Price, tick.Size)) + if _, err := t.Cmd(fmt.Sprintf("INSERT %.3f, %d, %s, %s, %f, %f; INTO %s", tick.Timestamp, tick.Seq, isTrade, isBid, tick.Price, tick.Size, dbName)); err != nil { + fmt.Println(err) + } } - _, recvErr := t.SendMessage("DDAKLUB") - - return recvErr + return nil } -// BulkAddInto : Batch-inserts deltas stored in an array to the specified store -func (t *Tectonic) BulkAddInto(dbName string, ticks *[]Delta) error { - _, _ = t.SendMessage("BULKADD INTO " + dbName) - - for _, tick := range *ticks { +// BulkAdd : Batch-add deltas stored in an array to the specified store +func (t *TectonicClient) BulkAdd(dbName string, ticks []*Delta) error { + if t.currentDB != dbName { + t.Use(dbName) + } + for _, tick := range ticks { var ( isTrade = "f" isBid = "f" @@ -159,50 +197,58 @@ func (t *Tectonic) BulkAddInto(dbName string, ticks *[]Delta) error { isBid = "t" } - _, _ = t.SendMessage(fmt.Sprintf("%.3f, %d, %s, %s, %f, %f;", tick.Timestamp, tick.Seq, isTrade, isBid, tick.Price, tick.Size)) + if _, err := t.Cmd(fmt.Sprintf("ADD %.3f, %d, %s, %s, %f, %f;", tick.Timestamp, tick.Seq, isTrade, isBid, tick.Price, tick.Size)); err != nil { + fmt.Println(err) + return err + } } - _, recvErr := t.SendMessage("DDAKLUB") - - return recvErr + return nil } // Use : "Switch the current store" -func (t *Tectonic) Use(dbName string) error { - _, readErr := t.SendMessage("USE " + dbName) +func (t *TectonicClient) Use(dbName string) error { + _, readErr := t.Cmd("USE " + dbName) if readErr == nil { - t.CurrentDB = dbName + t.currentDB = dbName } return readErr } // Create : "Create store" -func (t *Tectonic) Create(dbName string) error { - _, readErr := t.SendMessage("CREATE " + dbName) +func (t *TectonicClient) Create(dbName string) error { + _, readErr := t.Cmd("CREATE " + dbName) return readErr } // Get : "Returns `amount` items from current store" -func (t *Tectonic) Get(amount uint64) (*[]Delta, error) { +func (t *TectonicClient) Get(amount uint64) ([]*Delta, error) { // We use a buffer here to make it easier to maintain var ( msgBuf = bytes.Buffer{} - msgJSON = []Delta{} + msgJSON []*Delta ) msgBuf.WriteString("GET ") msgBuf.WriteString(strconv.Itoa(int(amount))) msgBuf.WriteString(" AS JSON") - msgRecv, recvErr := t.SendMessage(msgBuf.String()) - ffjson.Unmarshal(bytes.Trim([]byte(msgRecv[9:]), "\x00"), &msgJSON) // We get back a message starting with `\uFFFE` - Trim that and all null chars in array + msgRecv, recvErr := t.Cmd(msgBuf.String()) + fmt.Println(msgRecv, recvErr) + if recvErr != nil { + return nil, recvErr + } + msgRecv = "[" + msgRecv + "]" + if err := ffjson.Unmarshal(bytes.Trim([]byte(msgRecv), "\x00"), &msgJSON); err != nil { + fmt.Println(err) + } // We get back a message starting with `\uFFFE` - Trim that and all null chars in array - return &msgJSON, recvErr + return msgJSON, recvErr } // GetFrom : Returns items from specified store -func (t *Tectonic) GetFrom(dbName string, amount uint64, asTick bool) (*[]Delta, error) { +func (t *TectonicClient) GetFrom(dbName string, amount uint64) ([]Delta, error) { // We use a buffer here to make it easier to maintain var ( msgBuf = bytes.Buffer{} @@ -214,14 +260,20 @@ func (t *Tectonic) GetFrom(dbName string, amount uint64, asTick bool) (*[]Delta, msgBuf.WriteString(dbName) msgBuf.WriteString(" AS JSON") - msgRecv, recvErr := t.SendMessage(msgBuf.String()) - ffjson.Unmarshal(bytes.Trim([]byte(msgRecv[9:]), "\x00"), &msgJSON) // We get back a message starting with `\uFFFE` - Trim that and all null chars in array + msgRecv, recvErr := t.Cmd(msgBuf.String()) + if recvErr != nil { + return nil, recvErr + } + msgRecv = "[" + msgRecv + "]" + if err := ffjson.Unmarshal(bytes.Trim([]byte(msgRecv), "\x00"), &msgJSON); err != nil { + fmt.Println(err) + } - return &msgJSON, recvErr + return msgJSON, nil } // Insert : Inserts a single tick into the currently selected datastore -func (t *Tectonic) Insert(tick *Delta) error { +func (t *TectonicClient) Insert(tick *Delta) error { var ( isTrade = "f" isBid = "f" @@ -234,13 +286,13 @@ func (t *Tectonic) Insert(tick *Delta) error { } tickString := fmt.Sprintf("%.3f, %d, %s, %s, %f, %f;", tick.Timestamp, tick.Seq, isTrade, isBid, tick.Price, tick.Size) - _, err := t.SendMessage("INSERT " + tickString) + _, err := t.Cmd("INSERT " + tickString) return err } // InsertInto : Inserts a single tick into the datastore specified by `dbName` -func (t *Tectonic) InsertInto(dbName string, tick *Delta) error { +func (t *TectonicClient) InsertInto(dbName string, tick *Delta) error { var ( isTrade = "f" isBid = "f" @@ -253,64 +305,89 @@ func (t *Tectonic) InsertInto(dbName string, tick *Delta) error { } tickString := fmt.Sprintf("%.3f, %d, %s, %s, %f, %f;", tick.Timestamp, tick.Seq, isTrade, isBid, tick.Price, tick.Size) - _, err := t.SendMessage("INSERT " + tickString + " INTO " + dbName) + _, err := t.Cmd("INSERT " + tickString + " INTO " + dbName) return err } // Count : "Count of items in current store" -func (t *Tectonic) Count() uint64 { - msg, _ := t.SendMessage("COUNT") +func (t *TectonicClient) Count() uint64 { + msg, _ := t.Cmd("COUNT") count, _ := strconv.Atoi(msg) return uint64(count) } // CountAll : "Returns total count from all stores" -func (t *Tectonic) CountAll() uint64 { - msg, _ := t.SendMessage("COUNT ALL") +func (t *TectonicClient) CountAll() uint64 { + msg, _ := t.Cmd("COUNT ALL") count, _ := strconv.Atoi(msg) return uint64(count) } // Clear : Deletes everything in current store (BE CAREFUL WITH THIS METHOD) -func (t *Tectonic) Clear() (string, error) { - return t.SendMessage("CLEAR") +func (t *TectonicClient) Clear() (string, error) { + return t.Cmd("CLEAR") } // ClearAll : "Drops everything in memory" -func (t *Tectonic) ClearAll() (string, error) { - return t.SendMessage("CLEAR ALL") +func (t *TectonicClient) ClearAll() (string, error) { + return t.Cmd("CLEAR ALL") } // Flush : "Flush current store to disk" -func (t *Tectonic) Flush() (string, error) { - return t.SendMessage("FLUSH") +func (t *TectonicClient) Flush() (string, error) { + return t.Cmd("FLUSH") } // FlushAll : "Flush everything form memory to disk" -func (t *Tectonic) FlushAll() (string, error) { - return t.SendMessage("FLUSH ALL") +func (t *TectonicClient) FlushAll() (string, error) { + return t.Cmd("FLUSH ALL") } -// TODO: Implement Subscribe/Unsubscribe. I figure it isn't used *that* much, so we -// can implement these later. -/* Subscribe : Listen to stream of events - * func (t *Tectonic) Subscribe(dbName, message chan string) (string, error) { - * - * } - * - * // Unsubscribe : Stop receiving messages from subscription - * func (t *Tectonic) Unsubscribe() (string, error) { - * - * } - */ - // Exists : Checks if datastore exists -func (t *Tectonic) Exists(dbName string) bool { - msg, _ := t.SendMessage("EXISTS " + dbName) +func (t *TectonicClient) Exists(dbName string) bool { + msg, _ := t.Cmd("EXISTS " + dbName) // EXISTS command returns `1` for an existing datastore, and `ERR:...` otherwise return msg[0] == 1 } + +func main() { + // Example use + client, err := NewTectonicClient("localhost", "9001") + if err != nil { + fmt.Println(err) + return + } + defer client.conn.Close() + + response, err := client.Cmd("HELP") + if err != nil { + fmt.Println(err) + } else { + fmt.Println(response) + } + fmt.Println(client.Exists("test")) + fmt.Println(client.Info()) + fmt.Println(client.Count()) + fmt.Println(client.Count()) + + deltas := make([]*Delta, 0, 100) + for i := 0; i < 100; i++ { + d := &Delta{ + Timestamp: float64(time.Now().UnixMilli()) / 1000, + Price: 2000 + float64(i), + Size: 100 + float64(i), + Seq: uint32(i), + IsBid: true, + IsTrade: false, + } + deltas = append(deltas, d) + } + if err := client.BulkAdd("test", deltas); err != nil { + fmt.Println(err) + } + +} diff --git a/cli/java/tectonic.java b/cli/java/tectonic.java new file mode 100644 index 00000000..e1c4c197 --- /dev/null +++ b/cli/java/tectonic.java @@ -0,0 +1,119 @@ +import java.io.*; +import java.net.Socket; + +public class TectonicClient { + private static final byte SUCCESS_BYTE = 0x1; + + private Socket conn; + private String host; + private String port; + private String currentDB; + + public TectonicClient(String host, String port) throws IOException { + this.host = host; + this.port = port; + connect(); + } + + private void connect() throws IOException { + String addr = host + ":" + port; + System.out.println("Connecting to " + addr); + conn = new Socket(host, Integer.parseInt(port)); + } + + private void reconnect() throws IOException { + if (conn != null) { + conn.close(); + } + for (int i = 0; i < 3; i++) { // Retry 3 times + try { + Thread.sleep(2000); // Sleep for 2 seconds before reconnecting + connect(); + return; + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + throw new IOException("Failed to reconnect after several attempts"); + } + + private void sendCommand(String command) throws IOException { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream()))) { + int length = command.length(); + writer.write(String.format("%d%s", length, command)); + writer.flush(); + } + } + + private String readResponse() throws IOException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + int success = reader.read(); + boolean isSuccess = success == SUCCESS_BYTE; + + // Read response size + long size = readLong(reader); + + // Read response data + char[] buf = new char[(int) size]; + reader.read(buf); + + String response = new String(buf); + if (isSuccess) { + return response; + } else { + throw new TectonicError(handleServerError(response)); + } + } + } + + private long readLong(BufferedReader reader) throws IOException { + try { + StringBuilder sizeStr = new StringBuilder(); + char c; + while ((c = (char) reader.read()) != -1) { + if (Character.isDigit(c)) { + sizeStr.append(c); + } else { + break; + } + } + return Long.parseLong(sizeStr.toString()); + } catch (NumberFormatException e) { + throw new IOException("Failed to read response size", e); + } + } + + private String handleServerError(String response) { + if (response.startsWith("ERR: DB")) { + String[] parts = response.split(" ", 3); + if (parts.length < 3) { + return "DB error without book name"; + } + String bookName = parts[2]; + return String.format("DBNotFoundError: %s", bookName); + } + return String.format("ServerError: %s", response); + } + + public String cmd(String command) throws IOException { + try { + sendCommand(command); + return readResponse(); + } catch (IOException e) { + reconnect(); // Reconnect on error + throw e; + } + } + + // Other methods similar to Go implementation... + + public static void main(String[] args) { + try { + TectonicClient client = new TectonicClient("localhost", "9001"); + String result = client.cmd("INFO"); + System.out.println(result); + } catch (IOException e) { + e.printStackTrace(); + } + } +}