add NDJSON storage backend
This commit is contained in:
parent
4509611185
commit
db4574b887
2 changed files with 276 additions and 0 deletions
159
cmd/ndjson.go
Normal file
159
cmd/ndjson.go
Normal file
|
|
@ -0,0 +1,159 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Entry is the in-memory representation of a stored key-value pair.
|
||||||
|
type Entry struct {
|
||||||
|
Key string
|
||||||
|
Value []byte
|
||||||
|
ExpiresAt uint64 // Unix timestamp; 0 = never expires
|
||||||
|
}
|
||||||
|
|
||||||
|
// jsonEntry is the NDJSON on-disk format.
|
||||||
|
type jsonEntry struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
Value string `json:"value"`
|
||||||
|
Encoding string `json:"encoding,omitempty"`
|
||||||
|
ExpiresAt *int64 `json:"expires_at,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// readStoreFile reads all non-expired entries from an NDJSON file.
|
||||||
|
// Returns empty slice (not error) if file does not exist.
|
||||||
|
func readStoreFile(path string) ([]Entry, error) {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
now := uint64(time.Now().Unix())
|
||||||
|
var entries []Entry
|
||||||
|
scanner := bufio.NewScanner(f)
|
||||||
|
scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024)
|
||||||
|
lineNo := 0
|
||||||
|
for scanner.Scan() {
|
||||||
|
lineNo++
|
||||||
|
line := scanner.Bytes()
|
||||||
|
if len(line) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var je jsonEntry
|
||||||
|
if err := json.Unmarshal(line, &je); err != nil {
|
||||||
|
return nil, fmt.Errorf("line %d: %w", lineNo, err)
|
||||||
|
}
|
||||||
|
entry, err := decodeJsonEntry(je)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("line %d: %w", lineNo, err)
|
||||||
|
}
|
||||||
|
// Skip expired entries
|
||||||
|
if entry.ExpiresAt > 0 && entry.ExpiresAt <= now {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
entries = append(entries, entry)
|
||||||
|
}
|
||||||
|
return entries, scanner.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeStoreFile atomically writes entries to an NDJSON file, sorted by key.
|
||||||
|
// Expired entries are excluded. Empty entry list writes an empty file.
|
||||||
|
func writeStoreFile(path string, entries []Entry) error {
|
||||||
|
// Sort by key for deterministic output
|
||||||
|
slices.SortFunc(entries, func(a, b Entry) int {
|
||||||
|
return strings.Compare(a.Key, b.Key)
|
||||||
|
})
|
||||||
|
|
||||||
|
tmp := path + ".tmp"
|
||||||
|
f, err := os.Create(tmp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
f.Close()
|
||||||
|
os.Remove(tmp) // clean up on failure; no-op after successful rename
|
||||||
|
}()
|
||||||
|
|
||||||
|
w := bufio.NewWriter(f)
|
||||||
|
now := uint64(time.Now().Unix())
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.ExpiresAt > 0 && e.ExpiresAt <= now {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
je := encodeJsonEntry(e)
|
||||||
|
data, err := json.Marshal(je)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("key %q: %w", e.Key, err)
|
||||||
|
}
|
||||||
|
w.Write(data)
|
||||||
|
w.WriteByte('\n')
|
||||||
|
}
|
||||||
|
if err := w.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return os.Rename(tmp, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeJsonEntry(je jsonEntry) (Entry, error) {
|
||||||
|
var value []byte
|
||||||
|
switch je.Encoding {
|
||||||
|
case "", "text":
|
||||||
|
value = []byte(je.Value)
|
||||||
|
case "base64":
|
||||||
|
var err error
|
||||||
|
value, err = base64.StdEncoding.DecodeString(je.Value)
|
||||||
|
if err != nil {
|
||||||
|
return Entry{}, fmt.Errorf("decode base64 for %q: %w", je.Key, err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return Entry{}, fmt.Errorf("unsupported encoding %q for %q", je.Encoding, je.Key)
|
||||||
|
}
|
||||||
|
var expiresAt uint64
|
||||||
|
if je.ExpiresAt != nil {
|
||||||
|
expiresAt = uint64(*je.ExpiresAt)
|
||||||
|
}
|
||||||
|
return Entry{Key: je.Key, Value: value, ExpiresAt: expiresAt}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func encodeJsonEntry(e Entry) jsonEntry {
|
||||||
|
je := jsonEntry{Key: e.Key}
|
||||||
|
if utf8.Valid(e.Value) {
|
||||||
|
je.Value = string(e.Value)
|
||||||
|
je.Encoding = "text"
|
||||||
|
} else {
|
||||||
|
je.Value = base64.StdEncoding.EncodeToString(e.Value)
|
||||||
|
je.Encoding = "base64"
|
||||||
|
}
|
||||||
|
if e.ExpiresAt > 0 {
|
||||||
|
ts := int64(e.ExpiresAt)
|
||||||
|
je.ExpiresAt = &ts
|
||||||
|
}
|
||||||
|
return je
|
||||||
|
}
|
||||||
|
|
||||||
|
// findEntry returns the index of the entry with the given key, or -1.
|
||||||
|
func findEntry(entries []Entry, key string) int {
|
||||||
|
for i, e := range entries {
|
||||||
|
if e.Key == key {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
117
cmd/ndjson_test.go
Normal file
117
cmd/ndjson_test.go
Normal file
|
|
@ -0,0 +1,117 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReadWriteRoundtrip(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
path := filepath.Join(dir, "test.ndjson")
|
||||||
|
|
||||||
|
entries := []Entry{
|
||||||
|
{Key: "alpha", Value: []byte("hello")},
|
||||||
|
{Key: "beta", Value: []byte("world"), ExpiresAt: uint64(time.Now().Add(time.Hour).Unix())},
|
||||||
|
{Key: "gamma", Value: []byte{0xff, 0xfe}}, // binary
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeStoreFile(path, entries); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := readStoreFile(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(got) != len(entries) {
|
||||||
|
t.Fatalf("got %d entries, want %d", len(got), len(entries))
|
||||||
|
}
|
||||||
|
for i := range entries {
|
||||||
|
if got[i].Key != entries[i].Key {
|
||||||
|
t.Errorf("entry %d: key = %q, want %q", i, got[i].Key, entries[i].Key)
|
||||||
|
}
|
||||||
|
if string(got[i].Value) != string(entries[i].Value) {
|
||||||
|
t.Errorf("entry %d: value mismatch", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadStoreFileSkipsExpired(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
path := filepath.Join(dir, "test.ndjson")
|
||||||
|
|
||||||
|
entries := []Entry{
|
||||||
|
{Key: "alive", Value: []byte("yes")},
|
||||||
|
{Key: "dead", Value: []byte("no"), ExpiresAt: 1}, // expired long ago
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeStoreFile(path, entries); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := readStoreFile(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(got) != 1 || got[0].Key != "alive" {
|
||||||
|
t.Fatalf("expected only 'alive', got %v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadStoreFileNotExist(t *testing.T) {
|
||||||
|
got, err := readStoreFile("/nonexistent/path.ndjson")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(got) != 0 {
|
||||||
|
t.Fatalf("expected empty, got %d entries", len(got))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteStoreFileSortsKeys(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
path := filepath.Join(dir, "test.ndjson")
|
||||||
|
|
||||||
|
entries := []Entry{
|
||||||
|
{Key: "charlie", Value: []byte("3")},
|
||||||
|
{Key: "alpha", Value: []byte("1")},
|
||||||
|
{Key: "bravo", Value: []byte("2")},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeStoreFile(path, entries); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := readStoreFile(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got[0].Key != "alpha" || got[1].Key != "bravo" || got[2].Key != "charlie" {
|
||||||
|
t.Fatalf("entries not sorted: %v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteStoreFileAtomic(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
path := filepath.Join(dir, "test.ndjson")
|
||||||
|
|
||||||
|
// Write initial data
|
||||||
|
if err := writeStoreFile(path, []Entry{{Key: "a", Value: []byte("1")}}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Overwrite — should not leave .tmp files
|
||||||
|
if err := writeStoreFile(path, []Entry{{Key: "b", Value: []byte("2")}}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check no .tmp file remains
|
||||||
|
matches, _ := filepath.Glob(filepath.Join(dir, "*.tmp"))
|
||||||
|
if len(matches) > 0 {
|
||||||
|
t.Fatalf("leftover tmp files: %v", matches)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue