diff --git a/cmd/ndjson.go b/cmd/ndjson.go new file mode 100644 index 0000000..fa3e829 --- /dev/null +++ b/cmd/ndjson.go @@ -0,0 +1,159 @@ +package cmd + +import ( + "bufio" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "slices" + "strings" + "time" + "unicode/utf8" +) + +// Entry is the in-memory representation of a stored key-value pair. +type Entry struct { + Key string + Value []byte + ExpiresAt uint64 // Unix timestamp; 0 = never expires +} + +// jsonEntry is the NDJSON on-disk format. +type jsonEntry struct { + Key string `json:"key"` + Value string `json:"value"` + Encoding string `json:"encoding,omitempty"` + ExpiresAt *int64 `json:"expires_at,omitempty"` +} + +// readStoreFile reads all non-expired entries from an NDJSON file. +// Returns empty slice (not error) if file does not exist. +func readStoreFile(path string) ([]Entry, error) { + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + defer f.Close() + + now := uint64(time.Now().Unix()) + var entries []Entry + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024) + lineNo := 0 + for scanner.Scan() { + lineNo++ + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var je jsonEntry + if err := json.Unmarshal(line, &je); err != nil { + return nil, fmt.Errorf("line %d: %w", lineNo, err) + } + entry, err := decodeJsonEntry(je) + if err != nil { + return nil, fmt.Errorf("line %d: %w", lineNo, err) + } + // Skip expired entries + if entry.ExpiresAt > 0 && entry.ExpiresAt <= now { + continue + } + entries = append(entries, entry) + } + return entries, scanner.Err() +} + +// writeStoreFile atomically writes entries to an NDJSON file, sorted by key. +// Expired entries are excluded. Empty entry list writes an empty file. +func writeStoreFile(path string, entries []Entry) error { + // Sort by key for deterministic output + slices.SortFunc(entries, func(a, b Entry) int { + return strings.Compare(a.Key, b.Key) + }) + + tmp := path + ".tmp" + f, err := os.Create(tmp) + if err != nil { + return err + } + defer func() { + f.Close() + os.Remove(tmp) // clean up on failure; no-op after successful rename + }() + + w := bufio.NewWriter(f) + now := uint64(time.Now().Unix()) + for _, e := range entries { + if e.ExpiresAt > 0 && e.ExpiresAt <= now { + continue + } + je := encodeJsonEntry(e) + data, err := json.Marshal(je) + if err != nil { + return fmt.Errorf("key %q: %w", e.Key, err) + } + w.Write(data) + w.WriteByte('\n') + } + if err := w.Flush(); err != nil { + return err + } + if err := f.Sync(); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + return os.Rename(tmp, path) +} + +func decodeJsonEntry(je jsonEntry) (Entry, error) { + var value []byte + switch je.Encoding { + case "", "text": + value = []byte(je.Value) + case "base64": + var err error + value, err = base64.StdEncoding.DecodeString(je.Value) + if err != nil { + return Entry{}, fmt.Errorf("decode base64 for %q: %w", je.Key, err) + } + default: + return Entry{}, fmt.Errorf("unsupported encoding %q for %q", je.Encoding, je.Key) + } + var expiresAt uint64 + if je.ExpiresAt != nil { + expiresAt = uint64(*je.ExpiresAt) + } + return Entry{Key: je.Key, Value: value, ExpiresAt: expiresAt}, nil +} + +func encodeJsonEntry(e Entry) jsonEntry { + je := jsonEntry{Key: e.Key} + if utf8.Valid(e.Value) { + je.Value = string(e.Value) + je.Encoding = "text" + } else { + je.Value = base64.StdEncoding.EncodeToString(e.Value) + je.Encoding = "base64" + } + if e.ExpiresAt > 0 { + ts := int64(e.ExpiresAt) + je.ExpiresAt = &ts + } + return je +} + +// findEntry returns the index of the entry with the given key, or -1. +func findEntry(entries []Entry, key string) int { + for i, e := range entries { + if e.Key == key { + return i + } + } + return -1 +} diff --git a/cmd/ndjson_test.go b/cmd/ndjson_test.go new file mode 100644 index 0000000..3deb593 --- /dev/null +++ b/cmd/ndjson_test.go @@ -0,0 +1,117 @@ +package cmd + +import ( + "path/filepath" + "testing" + "time" +) + +func TestReadWriteRoundtrip(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.ndjson") + + entries := []Entry{ + {Key: "alpha", Value: []byte("hello")}, + {Key: "beta", Value: []byte("world"), ExpiresAt: uint64(time.Now().Add(time.Hour).Unix())}, + {Key: "gamma", Value: []byte{0xff, 0xfe}}, // binary + } + + if err := writeStoreFile(path, entries); err != nil { + t.Fatal(err) + } + + got, err := readStoreFile(path) + if err != nil { + t.Fatal(err) + } + + if len(got) != len(entries) { + t.Fatalf("got %d entries, want %d", len(got), len(entries)) + } + for i := range entries { + if got[i].Key != entries[i].Key { + t.Errorf("entry %d: key = %q, want %q", i, got[i].Key, entries[i].Key) + } + if string(got[i].Value) != string(entries[i].Value) { + t.Errorf("entry %d: value mismatch", i) + } + } +} + +func TestReadStoreFileSkipsExpired(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.ndjson") + + entries := []Entry{ + {Key: "alive", Value: []byte("yes")}, + {Key: "dead", Value: []byte("no"), ExpiresAt: 1}, // expired long ago + } + + if err := writeStoreFile(path, entries); err != nil { + t.Fatal(err) + } + + got, err := readStoreFile(path) + if err != nil { + t.Fatal(err) + } + + if len(got) != 1 || got[0].Key != "alive" { + t.Fatalf("expected only 'alive', got %v", got) + } +} + +func TestReadStoreFileNotExist(t *testing.T) { + got, err := readStoreFile("/nonexistent/path.ndjson") + if err != nil { + t.Fatal(err) + } + if len(got) != 0 { + t.Fatalf("expected empty, got %d entries", len(got)) + } +} + +func TestWriteStoreFileSortsKeys(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.ndjson") + + entries := []Entry{ + {Key: "charlie", Value: []byte("3")}, + {Key: "alpha", Value: []byte("1")}, + {Key: "bravo", Value: []byte("2")}, + } + + if err := writeStoreFile(path, entries); err != nil { + t.Fatal(err) + } + + got, err := readStoreFile(path) + if err != nil { + t.Fatal(err) + } + + if got[0].Key != "alpha" || got[1].Key != "bravo" || got[2].Key != "charlie" { + t.Fatalf("entries not sorted: %v", got) + } +} + +func TestWriteStoreFileAtomic(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.ndjson") + + // Write initial data + if err := writeStoreFile(path, []Entry{{Key: "a", Value: []byte("1")}}); err != nil { + t.Fatal(err) + } + + // Overwrite — should not leave .tmp files + if err := writeStoreFile(path, []Entry{{Key: "b", Value: []byte("2")}}); err != nil { + t.Fatal(err) + } + + // Check no .tmp file remains + matches, _ := filepath.Glob(filepath.Join(dir, "*.tmp")) + if len(matches) > 0 { + t.Fatalf("leftover tmp files: %v", matches) + } +}