migrate from badger to ndjson-native storage

This commit is contained in:
Lewis Wynne 2026-02-10 23:44:23 +00:00
parent db4574b887
commit 7b1356f5af
12 changed files with 442 additions and 618 deletions

View file

@ -80,7 +80,7 @@ func delStore(cmd *cobra.Command, args []string) error {
}
func executeDeletion(path string) error {
if err := os.RemoveAll(path); err != nil {
if err := os.Remove(path); err != nil {
return fmt.Errorf("cannot delete-store '%s': %v", path, err)
}
return nil

View file

@ -23,11 +23,9 @@ THE SOFTWARE.
package cmd
import (
"errors"
"fmt"
"strings"
"github.com/dgraph-io/badger/v4"
"github.com/gobwas/glob"
"github.com/spf13/cobra"
)
@ -71,7 +69,12 @@ func del(cmd *cobra.Command, args []string) error {
return fmt.Errorf("cannot remove: No such key")
}
var processed []resolvedTarget
// Group targets by store for batch deletes.
type storeTargets struct {
targets []resolvedTarget
}
byStore := make(map[string]*storeTargets)
var storeOrder []string
for _, target := range targets {
if interactive || config.Key.AlwaysPromptDelete {
var confirm string
@ -84,29 +87,37 @@ func del(cmd *cobra.Command, args []string) error {
continue
}
}
trans := TransactionArgs{
key: target.full,
readonly: false,
sync: false,
transact: func(tx *badger.Txn, k []byte) error {
if err := tx.Delete(k); errors.Is(err, badger.ErrKeyNotFound) {
return fmt.Errorf("cannot remove '%s': No such key", target.full)
if _, ok := byStore[target.db]; !ok {
byStore[target.db] = &storeTargets{}
storeOrder = append(storeOrder, target.db)
}
if err != nil {
return fmt.Errorf("cannot remove '%s': %v", target.full, err)
}
return nil
},
byStore[target.db].targets = append(byStore[target.db].targets, target)
}
if err := store.Transaction(trans); err != nil {
if len(byStore) == 0 {
return nil
}
for _, dbName := range storeOrder {
st := byStore[dbName]
p, err := store.storePath(dbName)
if err != nil {
return err
}
processed = append(processed, target)
entries, err := readStoreFile(p)
if err != nil {
return err
}
for _, t := range st.targets {
idx := findEntry(entries, t.key)
if idx < 0 {
return fmt.Errorf("cannot remove '%s': No such key", t.full)
}
entries = append(entries[:idx], entries[idx+1:]...)
}
if err := writeStoreFile(p, entries); err != nil {
return err
}
if len(processed) == 0 {
return nil
}
return autoSync()
@ -122,27 +133,24 @@ func init() {
type resolvedTarget struct {
full string
display string
key string
db string
}
func keyExists(store *Store, arg string) (bool, error) {
var notFound bool
trans := TransactionArgs{
key: arg,
readonly: true,
sync: false,
transact: func(tx *badger.Txn, k []byte) error {
if _, err := tx.Get(k); errors.Is(err, badger.ErrKeyNotFound) {
notFound = true
return nil
} else {
return err
}
},
}
if err := store.Transaction(trans); err != nil {
spec, err := store.parseKey(arg, true)
if err != nil {
return false, err
}
return !notFound, nil
p, err := store.storePath(spec.DB)
if err != nil {
return false, err
}
entries, err := readStoreFile(p)
if err != nil {
return false, err
}
return findEntry(entries, spec.Key) >= 0, nil
}
func resolveDeleteTargets(store *Store, exactArgs []string, globPatterns []string, separators []rune) ([]resolvedTarget, error) {
@ -158,6 +166,8 @@ func resolveDeleteTargets(store *Store, exactArgs []string, globPatterns []strin
targets = append(targets, resolvedTarget{
full: full,
display: spec.Display(),
key: spec.Key,
db: spec.DB,
})
}

View file

@ -1,205 +0,0 @@
/*
Copyright © 2025 Lewis Wynne <lew@ily.rs>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"unicode/utf8"
"github.com/dgraph-io/badger/v4"
"github.com/gobwas/glob"
"github.com/spf13/cobra"
)
type dumpEntry struct {
Key string `json:"key"`
Value string `json:"value"`
Encoding string `json:"encoding,omitempty"`
ExpiresAt *int64 `json:"expires_at,omitempty"`
}
var dumpCmd = &cobra.Command{
Use: "export [STORE]",
Short: "Dump all key/value pairs as NDJSON",
Aliases: []string{"dump"},
Args: cobra.MaximumNArgs(1),
RunE: dump,
SilenceUsage: true,
}
func dump(cmd *cobra.Command, args []string) error {
store := &Store{}
targetDB := "@" + config.Store.DefaultStoreName
if len(args) == 1 {
rawArg := args[0]
dbName, err := store.parseDB(rawArg, false)
if err != nil {
return fmt.Errorf("cannot dump '%s': %v", rawArg, err)
}
if _, err := store.FindStore(dbName); err != nil {
var notFound errNotFound
if errors.As(err, &notFound) {
return fmt.Errorf("cannot dump '%s': %v", rawArg, err)
}
return err
}
targetDB = "@" + dbName
}
mode, err := cmd.Flags().GetString("encoding")
if err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
switch mode {
case "auto", "base64", "text":
default:
return fmt.Errorf("cannot dump '%s': unsupported encoding '%s'", targetDB, mode)
}
globPatterns, err := cmd.Flags().GetStringSlice("glob")
if err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
separators, err := parseGlobSeparators(cmd)
if err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
matchers, err := compileGlobMatchers(globPatterns, separators)
if err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
opts := DumpOptions{
Encoding: mode,
Matchers: matchers,
GlobPatterns: globPatterns,
}
return dumpDatabase(store, strings.TrimPrefix(targetDB, "@"), cmd.OutOrStdout(), opts)
}
func init() {
dumpCmd.Flags().StringP("encoding", "e", "auto", "value encoding: auto, base64, or text")
dumpCmd.Flags().StringSliceP("glob", "g", nil, "Filter keys with glob pattern (repeatable)")
dumpCmd.Flags().String("glob-sep", "", fmt.Sprintf("Characters treated as separators for globbing (default %q)", defaultGlobSeparatorsDisplay()))
rootCmd.AddCommand(dumpCmd)
}
func encodeBase64(entry *dumpEntry, v []byte) {
entry.Value = base64.StdEncoding.EncodeToString(v)
entry.Encoding = "base64"
}
func encodeText(entry *dumpEntry, key []byte, v []byte) error {
if !utf8.Valid(v) {
return fmt.Errorf("key %q contains non-UTF8 data; use --encoding=auto or base64", key)
}
entry.Value = string(v)
entry.Encoding = "text"
return nil
}
// DumpOptions controls how a store is dumped to NDJSON.
type DumpOptions struct {
Encoding string
Matchers []glob.Glob
GlobPatterns []string
}
// dumpDatabase writes entries from dbName to w as NDJSON.
func dumpDatabase(store *Store, dbName string, w io.Writer, opts DumpOptions) error {
targetDB := "@" + dbName
if opts.Encoding == "" {
opts.Encoding = "auto"
}
var matched bool
trans := TransactionArgs{
key: targetDB,
readonly: true,
sync: true,
transact: func(tx *badger.Txn, k []byte) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
if !globMatch(opts.Matchers, string(key)) {
continue
}
expiresAt := item.ExpiresAt()
if err := item.Value(func(v []byte) error {
entry := dumpEntry{
Key: string(key),
}
if expiresAt > 0 {
ts := int64(expiresAt)
entry.ExpiresAt = &ts
}
switch opts.Encoding {
case "base64":
encodeBase64(&entry, v)
case "text":
if err := encodeText(&entry, key, v); err != nil {
return err
}
case "auto":
if utf8.Valid(v) {
entry.Encoding = "text"
entry.Value = string(v)
} else {
encodeBase64(&entry, v)
}
default:
return fmt.Errorf("unsupported encoding '%s'", opts.Encoding)
}
payload, err := json.Marshal(entry)
if err != nil {
return err
}
_, err = fmt.Fprintln(w, string(payload))
if err == nil {
matched = true
}
return err
}); err != nil {
return err
}
}
return nil
},
}
if err := store.Transaction(trans); err != nil {
return err
}
if len(opts.Matchers) > 0 && !matched {
return fmt.Errorf("No matches for pattern %s", formatGlobPatterns(opts.GlobPatterns))
}
return nil
}

48
cmd/export.go Normal file
View file

@ -0,0 +1,48 @@
/*
Copyright © 2025 Lewis Wynne <lew@ily.rs>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"fmt"
"github.com/spf13/cobra"
)
var exportCmd = &cobra.Command{
Use: "export [STORE]",
Short: "Export store as NDJSON (alias for list --format ndjson)",
Aliases: []string{"dump"},
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
listFormat = "ndjson"
return list(cmd, args)
},
SilenceUsage: true,
}
func init() {
exportCmd.Flags().StringSliceP("glob", "g", nil, "Filter keys with glob pattern (repeatable)")
exportCmd.Flags().String("glob-sep", "", fmt.Sprintf("Characters treated as separators for globbing (default %q)", defaultGlobSeparatorsDisplay()))
exportCmd.Flags().StringVarP(&listEncoding, "encoding", "e", "auto", "value encoding: auto, base64, or text")
rootCmd.AddCommand(exportCmd)
}

View file

@ -32,7 +32,6 @@ import (
"strings"
"text/template"
"github.com/dgraph-io/badger/v4"
"github.com/spf13/cobra"
)
@ -73,24 +72,27 @@ For example:
func get(cmd *cobra.Command, args []string) error {
store := &Store{}
var v []byte
trans := TransactionArgs{
key: args[0],
readonly: true,
sync: false,
transact: func(tx *badger.Txn, k []byte) error {
item, err := tx.Get(k)
spec, err := store.parseKey(args[0], true)
if err != nil {
return err
}
v, err = item.ValueCopy(nil)
return err
},
}
if err := store.Transaction(trans); err != nil {
return fmt.Errorf("cannot get '%s': %v", args[0], err)
}
p, err := store.storePath(spec.DB)
if err != nil {
return fmt.Errorf("cannot get '%s': %v", args[0], err)
}
entries, err := readStoreFile(p)
if err != nil {
return fmt.Errorf("cannot get '%s': %v", args[0], err)
}
idx := findEntry(entries, spec.Key)
if idx < 0 {
keys := make([]string, len(entries))
for i, e := range entries {
keys[i] = e.Key
}
return fmt.Errorf("cannot get '%s': %v", args[0], suggestKey(spec.Key, keys))
}
v := entries[idx].Value
binary, err := cmd.Flags().GetBool("include-binary")
if err != nil {

View file

@ -23,13 +23,15 @@ THE SOFTWARE.
package cmd
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strconv"
"unicode/utf8"
"github.com/dgraph-io/badger/v4"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
"github.com/spf13/cobra"
@ -43,11 +45,11 @@ func (e *formatEnum) String() string { return string(*e) }
func (e *formatEnum) Set(v string) error {
switch v {
case "table", "tsv", "csv", "html", "markdown":
case "table", "tsv", "csv", "html", "markdown", "ndjson":
*e = formatEnum(v)
return nil
default:
return fmt.Errorf("must be one of \"table\", \"tsv\", \"csv\", \"html\", or \"markdown\"")
return fmt.Errorf("must be one of \"table\", \"tsv\", \"csv\", \"html\", \"markdown\", or \"ndjson\"")
}
}
@ -60,6 +62,7 @@ var (
listTTL bool
listHeader bool
listFormat formatEnum = "table"
listEncoding string
)
type columnKind int
@ -126,8 +129,52 @@ func list(cmd *cobra.Command, args []string) error {
return fmt.Errorf("cannot ls '%s': %v", targetDB, err)
}
showValues := !listNoValues
dbName := targetDB[1:] // strip leading '@'
p, err := store.storePath(dbName)
if err != nil {
return fmt.Errorf("cannot ls '%s': %v", targetDB, err)
}
entries, err := readStoreFile(p)
if err != nil {
return fmt.Errorf("cannot ls '%s': %v", targetDB, err)
}
// Filter by glob
var filtered []Entry
for _, e := range entries {
if globMatch(matchers, e.Key) {
filtered = append(filtered, e)
}
}
if len(matchers) > 0 && len(filtered) == 0 {
return fmt.Errorf("cannot ls '%s': No matches for pattern %s", targetDB, formatGlobPatterns(globPatterns))
}
output := cmd.OutOrStdout()
// NDJSON format: emit JSON lines directly
if listFormat.String() == "ndjson" {
enc := listEncoding
if enc == "" {
enc = "auto"
}
for _, e := range filtered {
je, err := encodeJsonEntryWithEncoding(e, enc)
if err != nil {
return fmt.Errorf("cannot ls '%s': %v", targetDB, err)
}
data, err := json.Marshal(je)
if err != nil {
return fmt.Errorf("cannot ls '%s': %v", targetDB, err)
}
fmt.Fprintln(output, string(data))
}
return nil
}
// Table-based formats
showValues := !listNoValues
tw := table.NewWriter()
tw.SetOutputMirror(output)
tw.SetStyle(table.StyleDefault)
@ -141,61 +188,24 @@ func list(cmd *cobra.Command, args []string) error {
tw.AppendHeader(headerRow(columns))
}
var matchedCount int
trans := TransactionArgs{
key: targetDB,
readonly: true,
sync: true,
transact: func(tx *badger.Txn, k []byte) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
opts.PrefetchValues = showValues
it := tx.NewIterator(opts)
defer it.Close()
var valueBuf []byte
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := string(item.KeyCopy(nil))
if !globMatch(matchers, key) {
continue
}
matchedCount++
for _, e := range filtered {
var valueStr string
if showValues {
if err := item.Value(func(v []byte) error {
valueBuf = append(valueBuf[:0], v...)
return nil
}); err != nil {
return fmt.Errorf("cannot ls '%s': %v", targetDB, err)
valueStr = store.FormatBytes(listBinary, e.Value)
}
valueStr = store.FormatBytes(listBinary, valueBuf)
}
row := make(table.Row, 0, len(columns))
for _, col := range columns {
switch col {
case columnKey:
row = append(row, key)
row = append(row, e.Key)
case columnValue:
row = append(row, valueStr)
case columnTTL:
row = append(row, formatExpiry(item.ExpiresAt()))
row = append(row, formatExpiry(e.ExpiresAt))
}
}
tw.AppendRow(row)
}
return nil
},
}
if err := store.Transaction(trans); err != nil {
return err
}
if len(matchers) > 0 && matchedCount == 0 {
return fmt.Errorf("cannot ls '%s': No matches for pattern %s", targetDB, formatGlobPatterns(globPatterns))
}
applyColumnWidths(tw, columns, output)
renderTable(tw)
@ -300,14 +310,42 @@ func renderTable(tw table.Writer) {
}
}
// encodeJsonEntryWithEncoding encodes an Entry to jsonEntry respecting the encoding mode.
func encodeJsonEntryWithEncoding(e Entry, mode string) (jsonEntry, error) {
switch mode {
case "base64":
je := jsonEntry{Key: e.Key, Encoding: "base64"}
je.Value = base64.StdEncoding.EncodeToString(e.Value)
if e.ExpiresAt > 0 {
ts := int64(e.ExpiresAt)
je.ExpiresAt = &ts
}
return je, nil
case "text":
if !utf8.Valid(e.Value) {
return jsonEntry{}, fmt.Errorf("key %q contains non-UTF8 data; use --encoding=auto or base64", e.Key)
}
je := jsonEntry{Key: e.Key, Encoding: "text"}
je.Value = string(e.Value)
if e.ExpiresAt > 0 {
ts := int64(e.ExpiresAt)
je.ExpiresAt = &ts
}
return je, nil
default: // "auto"
return encodeJsonEntry(e), nil
}
}
func init() {
listCmd.Flags().BoolVarP(&listBinary, "binary", "b", false, "include binary data in text output")
listCmd.Flags().BoolVar(&listNoKeys, "no-keys", false, "suppress the key column")
listCmd.Flags().BoolVar(&listNoValues, "no-values", false, "suppress the value column")
listCmd.Flags().BoolVarP(&listTTL, "ttl", "t", false, "append a TTL column when entries expire")
listCmd.Flags().BoolVar(&listHeader, "header", false, "include header row")
listCmd.Flags().VarP(&listFormat, "format", "o", "output format (table|tsv|csv|markdown|html)")
listCmd.Flags().VarP(&listFormat, "format", "o", "output format (table|tsv|csv|markdown|html|ndjson)")
listCmd.Flags().StringSliceP("glob", "g", nil, "Filter keys with glob pattern (repeatable)")
listCmd.Flags().String("glob-sep", "", fmt.Sprintf("Characters treated as separators for globbing (default %q)", defaultGlobSeparatorsDisplay()))
listCmd.Flags().StringVarP(&listEncoding, "encoding", "e", "auto", "value encoding for ndjson format: auto, base64, or text")
rootCmd.AddCommand(listCmd)
}

145
cmd/mv.go
View file

@ -26,7 +26,6 @@ import (
"fmt"
"strings"
"github.com/dgraph-io/badger/v4"
"github.com/spf13/cobra"
)
@ -48,11 +47,15 @@ var mvCmd = &cobra.Command{
}
func cp(cmd *cobra.Command, args []string) error {
copyMode = true
return mv(cmd, args)
return mvImpl(cmd, args, true)
}
func mv(cmd *cobra.Command, args []string) error {
keepSource, _ := cmd.Flags().GetBool("copy")
return mvImpl(cmd, args, keepSource)
}
func mvImpl(cmd *cobra.Command, args []string, keepSource bool) error {
store := &Store{}
interactive, err := cmd.Flags().GetBool("interactive")
@ -70,33 +73,40 @@ func mv(cmd *cobra.Command, args []string) error {
return err
}
var srcVal []byte
var srcMeta byte
var srcExpires uint64
fromRef := fromSpec.Full()
toRef := toSpec.Full()
var destExists bool
if promptOverwrite {
existsErr := store.Transaction(TransactionArgs{
key: toRef,
readonly: true,
transact: func(tx *badger.Txn, k []byte) error {
if _, err := tx.Get(k); err == nil {
destExists = true
return nil
} else if err == badger.ErrKeyNotFound {
return nil
// Read source
srcPath, err := store.storePath(fromSpec.DB)
if err != nil {
return fmt.Errorf("cannot move '%s': %v", fromSpec.Key, err)
}
return err
},
})
if existsErr != nil {
return fmt.Errorf("cannot move '%s': %v", fromSpec.Key, existsErr)
srcEntries, err := readStoreFile(srcPath)
if err != nil {
return fmt.Errorf("cannot move '%s': %v", fromSpec.Key, err)
}
srcIdx := findEntry(srcEntries, fromSpec.Key)
if srcIdx < 0 {
return fmt.Errorf("cannot move '%s': No such key", fromSpec.Key)
}
srcEntry := srcEntries[srcIdx]
sameStore := fromSpec.DB == toSpec.DB
// Check destination for overwrite prompt
dstPath := srcPath
dstEntries := srcEntries
if !sameStore {
dstPath, err = store.storePath(toSpec.DB)
if err != nil {
return fmt.Errorf("cannot move '%s': %v", fromSpec.Key, err)
}
dstEntries, err = readStoreFile(dstPath)
if err != nil {
return fmt.Errorf("cannot move '%s': %v", fromSpec.Key, err)
}
}
if promptOverwrite && destExists {
dstIdx := findEntry(dstEntries, toSpec.Key)
if promptOverwrite && dstIdx >= 0 {
var confirm string
fmt.Printf("overwrite '%s'? (y/n)\n", toSpec.Display())
if _, err := fmt.Scanln(&confirm); err != nil {
@ -107,66 +117,53 @@ func mv(cmd *cobra.Command, args []string) error {
}
}
readErr := store.Transaction(TransactionArgs{
key: fromRef,
readonly: true,
transact: func(tx *badger.Txn, k []byte) error {
item, err := tx.Get(k)
if err != nil {
return fmt.Errorf("cannot move '%s': %v", fromSpec.Key, err)
}
srcMeta = item.UserMeta()
srcExpires = item.ExpiresAt()
return item.Value(func(v []byte) error {
srcVal = append(srcVal[:0], v...)
return nil
})
},
})
if readErr != nil {
return readErr
// Write destination entry
newEntry := Entry{
Key: toSpec.Key,
Value: srcEntry.Value,
ExpiresAt: srcEntry.ExpiresAt,
}
writeErr := store.Transaction(TransactionArgs{
key: toRef,
readonly: false,
sync: false,
transact: func(tx *badger.Txn, k []byte) error {
entry := badger.NewEntry(k, srcVal).WithMeta(srcMeta)
if srcExpires > 0 {
entry.ExpiresAt = srcExpires
if sameStore {
// Both source and dest in same file
if dstIdx >= 0 {
dstEntries[dstIdx] = newEntry
} else {
dstEntries = append(dstEntries, newEntry)
}
return tx.SetEntry(entry)
},
})
if writeErr != nil {
return writeErr
if !keepSource {
// Remove source - find it again since indices may have changed
idx := findEntry(dstEntries, fromSpec.Key)
if idx >= 0 {
dstEntries = append(dstEntries[:idx], dstEntries[idx+1:]...)
}
if copyMode {
return autoSync()
}
if err := store.Transaction(TransactionArgs{
key: fromRef,
readonly: false,
sync: false,
transact: func(tx *badger.Txn, k []byte) error {
return tx.Delete(k)
},
}); err != nil {
if err := writeStoreFile(dstPath, dstEntries); err != nil {
return err
}
} else {
// Different stores
if dstIdx >= 0 {
dstEntries[dstIdx] = newEntry
} else {
dstEntries = append(dstEntries, newEntry)
}
if err := writeStoreFile(dstPath, dstEntries); err != nil {
return err
}
if !keepSource {
srcEntries = append(srcEntries[:srcIdx], srcEntries[srcIdx+1:]...)
if err := writeStoreFile(srcPath, srcEntries); err != nil {
return err
}
}
}
return autoSync()
}
var (
copyMode bool = false
)
func init() {
mvCmd.Flags().BoolVar(&copyMode, "copy", false, "Copy instead of move (keeps source)")
mvCmd.Flags().Bool("copy", false, "Copy instead of move (keeps source)")
mvCmd.Flags().BoolP("interactive", "i", false, "Prompt before overwriting destination")
rootCmd.AddCommand(mvCmd)
cpCmd.Flags().BoolP("interactive", "i", false, "Prompt before overwriting destination")

View file

@ -24,14 +24,12 @@ package cmd
import (
"bufio"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"github.com/dgraph-io/badger/v4"
"github.com/gobwas/glob"
"github.com/spf13/cobra"
)
@ -78,11 +76,10 @@ func restore(cmd *cobra.Command, args []string) error {
defer closer.Close()
}
db, err := store.open(dbName)
p, err := store.storePath(dbName)
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
defer db.Close()
decoder := json.NewDecoder(bufio.NewReaderSize(reader, 8*1024*1024))
@ -92,9 +89,15 @@ func restore(cmd *cobra.Command, args []string) error {
}
promptOverwrite := interactive || config.Key.AlwaysPromptOverwrite
restored, err := restoreEntries(decoder, db, restoreOpts{
drop, err := cmd.Flags().GetBool("drop")
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
restored, err := restoreEntries(decoder, p, restoreOpts{
matchers: matchers,
promptOverwrite: promptOverwrite,
drop: drop,
})
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
@ -123,55 +126,49 @@ func restoreInput(cmd *cobra.Command) (io.Reader, io.Closer, error) {
return f, f, nil
}
func decodeEntryValue(entry dumpEntry) ([]byte, error) {
switch entry.Encoding {
case "", "text":
return []byte(entry.Value), nil
case "base64":
b, err := base64.StdEncoding.DecodeString(entry.Value)
if err != nil {
return nil, err
}
return b, nil
default:
return nil, fmt.Errorf("unsupported encoding %q", entry.Encoding)
}
}
type restoreOpts struct {
matchers []glob.Glob
promptOverwrite bool
drop bool
}
func restoreEntries(decoder *json.Decoder, db *badger.DB, opts restoreOpts) (int, error) {
wb := db.NewWriteBatch()
defer wb.Cancel()
func restoreEntries(decoder *json.Decoder, storePath string, opts restoreOpts) (int, error) {
var existing []Entry
if !opts.drop {
var err error
existing, err = readStoreFile(storePath)
if err != nil {
return 0, err
}
}
entryNo := 0
restored := 0
for {
var entry dumpEntry
if err := decoder.Decode(&entry); err != nil {
var je jsonEntry
if err := decoder.Decode(&je); err != nil {
if err == io.EOF {
break
}
return 0, fmt.Errorf("entry %d: %w", entryNo+1, err)
}
entryNo++
if entry.Key == "" {
if je.Key == "" {
return 0, fmt.Errorf("entry %d: missing key", entryNo)
}
if !globMatch(opts.matchers, entry.Key) {
if !globMatch(opts.matchers, je.Key) {
continue
}
if opts.promptOverwrite {
exists, err := keyExistsInDB(db, entry.Key)
entry, err := decodeJsonEntry(je)
if err != nil {
return 0, fmt.Errorf("entry %d: %v", entryNo, err)
return 0, fmt.Errorf("entry %d: %w", entryNo, err)
}
if exists {
idx := findEntry(existing, entry.Key)
if opts.promptOverwrite && idx >= 0 {
fmt.Printf("overwrite '%s'? (y/n)\n", entry.Key)
var confirm string
if _, err := fmt.Scanln(&confirm); err != nil {
@ -181,30 +178,20 @@ func restoreEntries(decoder *json.Decoder, db *badger.DB, opts restoreOpts) (int
continue
}
}
}
value, err := decodeEntryValue(entry)
if err != nil {
return 0, fmt.Errorf("entry %d: %w", entryNo, err)
}
writeEntry := badger.NewEntry([]byte(entry.Key), value)
if entry.ExpiresAt != nil {
if *entry.ExpiresAt < 0 {
return 0, fmt.Errorf("entry %d: expires_at must be >= 0", entryNo)
}
writeEntry.ExpiresAt = uint64(*entry.ExpiresAt)
}
if err := wb.SetEntry(writeEntry); err != nil {
return 0, fmt.Errorf("entry %d: %w", entryNo, err)
if idx >= 0 {
existing[idx] = entry
} else {
existing = append(existing, entry)
}
restored++
}
if err := wb.Flush(); err != nil {
if restored > 0 || opts.drop {
if err := writeStoreFile(storePath, existing); err != nil {
return 0, err
}
}
return restored, nil
}
@ -213,21 +200,6 @@ func init() {
restoreCmd.Flags().StringSliceP("glob", "g", nil, "Restore keys matching glob pattern (repeatable)")
restoreCmd.Flags().String("glob-sep", "", fmt.Sprintf("Characters treated as separators for globbing (default %q)", defaultGlobSeparatorsDisplay()))
restoreCmd.Flags().BoolP("interactive", "i", false, "Prompt before overwriting existing keys")
restoreCmd.Flags().Bool("drop", false, "Drop existing entries before restoring (full replace)")
rootCmd.AddCommand(restoreCmd)
}
func keyExistsInDB(db *badger.DB, key string) (bool, error) {
var exists bool
err := db.View(func(tx *badger.Txn) error {
_, err := tx.Get([]byte(key))
if err == nil {
exists = true
return nil
}
if err == badger.ErrKeyNotFound {
return nil
}
return err
})
return exists, err
}

View file

@ -62,7 +62,7 @@ func init() {
listStoresCmd.GroupID = "stores"
delStoreCmd.GroupID = "stores"
dumpCmd.GroupID = "stores"
exportCmd.GroupID = "stores"
restoreCmd.GroupID = "stores"
rootCmd.AddGroup(&cobra.Group{ID: "git", Title: "Git commands:"})

View file

@ -26,8 +26,8 @@ import (
"fmt"
"io"
"strings"
"time"
"github.com/dgraph-io/badger/v4"
"github.com/spf13/cobra"
)
@ -81,12 +81,18 @@ func set(cmd *cobra.Command, args []string) error {
return fmt.Errorf("cannot set '%s': %v", args[0], err)
}
if promptOverwrite {
exists, err := keyExists(store, spec.Full())
p, err := store.storePath(spec.DB)
if err != nil {
return fmt.Errorf("cannot set '%s': %v", args[0], err)
}
if exists {
entries, err := readStoreFile(p)
if err != nil {
return fmt.Errorf("cannot set '%s': %v", args[0], err)
}
idx := findEntry(entries, spec.Key)
if promptOverwrite && idx >= 0 {
fmt.Printf("overwrite '%s'? (y/n)\n", spec.Display())
var confirm string
if _, err := fmt.Scanln(&confirm); err != nil {
@ -96,23 +102,23 @@ func set(cmd *cobra.Command, args []string) error {
return nil
}
}
}
trans := TransactionArgs{
key: args[0],
readonly: false,
sync: false,
transact: func(tx *badger.Txn, k []byte) error {
entry := badger.NewEntry(k, value)
entry := Entry{
Key: spec.Key,
Value: value,
}
if ttl != 0 {
entry = entry.WithTTL(ttl)
}
return tx.SetEntry(entry)
},
entry.ExpiresAt = uint64(time.Now().Add(ttl).Unix())
}
if err := store.Transaction(trans); err != nil {
return err
if idx >= 0 {
entries[idx] = entry
} else {
entries = append(entries, entry)
}
if err := writeStoreFile(p, entries); err != nil {
return fmt.Errorf("cannot set '%s': %v", args[0], err)
}
return autoSync()

View file

@ -32,7 +32,6 @@ import (
"unicode/utf8"
"github.com/agnivade/levenshtein"
"github.com/dgraph-io/badger/v4"
gap "github.com/muesli/go-app-paths"
"golang.org/x/term"
)
@ -50,46 +49,6 @@ func (err errNotFound) Error() string {
type Store struct{}
type TransactionArgs struct {
key string
readonly bool
sync bool
transact func(tx *badger.Txn, key []byte) error
}
func (s *Store) Transaction(args TransactionArgs) error {
spec, err := s.parseKey(args.key, true)
if err != nil {
return err
}
db, err := s.open(spec.DB)
if err != nil {
return err
}
defer db.Close()
if args.sync {
err = db.Sync()
if err != nil {
return err
}
}
tx := db.NewTransaction(!args.readonly)
defer tx.Discard()
if err := args.transact(tx, []byte(spec.Key)); err != nil {
return err
}
if args.readonly {
return nil
}
return tx.Commit()
}
func (s *Store) Print(pf string, includeBinary bool, vs ...[]byte) {
s.PrintTo(os.Stdout, pf, includeBinary, vs...)
}
@ -118,20 +77,39 @@ func (s *Store) formatBytes(includeBinary bool, v []byte) string {
return string(v)
}
func (s *Store) storePath(name string) (string, error) {
if name == "" {
name = config.Store.DefaultStoreName
}
dir, err := s.path()
if err != nil {
return "", err
}
target := filepath.Join(dir, name+".ndjson")
if err := ensureSubpath(dir, target); err != nil {
return "", err
}
return target, nil
}
func (s *Store) AllStores() ([]string, error) {
path, err := s.path()
dir, err := s.path()
if err != nil {
return nil, err
}
dirs, err := os.ReadDir(path)
entries, err := os.ReadDir(dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var stores []string
for _, e := range dirs {
if e.IsDir() {
stores = append(stores, e.Name())
for _, e := range entries {
if e.IsDir() || filepath.Ext(e.Name()) != ".ndjson" {
continue
}
stores = append(stores, strings.TrimSuffix(e.Name(), ".ndjson"))
}
return stores, nil
}
@ -141,12 +119,12 @@ func (s *Store) FindStore(k string) (string, error) {
if err != nil {
return "", err
}
path, err := s.path(n)
p, err := s.storePath(n)
if err != nil {
return "", err
}
info, statErr := os.Stat(path)
if strings.TrimSpace(n) == "" || os.IsNotExist(statErr) || (statErr == nil && !info.IsDir()) {
_, statErr := os.Stat(p)
if strings.TrimSpace(n) == "" || os.IsNotExist(statErr) {
suggestions, err := s.suggestStores(n)
if err != nil {
return "", err
@ -156,7 +134,7 @@ func (s *Store) FindStore(k string) (string, error) {
if statErr != nil {
return "", statErr
}
return path, nil
return p, nil
}
func (s *Store) parseKey(raw string, defaults bool) (KeySpec, error) {
@ -180,27 +158,12 @@ func (s *Store) parseDB(v string, defaults bool) (string, error) {
return strings.ToLower(db), nil
}
func (s *Store) open(name string) (*badger.DB, error) {
if name == "" {
name = config.Store.DefaultStoreName
}
path, err := s.path(name)
if err != nil {
return nil, err
}
return badger.Open(badger.DefaultOptions(path).WithLoggingLevel(badger.ERROR))
}
func (s *Store) path(args ...string) (string, error) {
func (s *Store) path() (string, error) {
if override := os.Getenv("PDA_DATA"); override != "" {
if err := os.MkdirAll(override, 0o750); err != nil {
return "", err
}
target := filepath.Join(append([]string{override}, args...)...)
if err := ensureSubpath(override, target); err != nil {
return "", err
}
return target, nil
return override, nil
}
scope := gap.NewVendorScope(gap.User, "pda", "stores")
dir, err := scope.DataPath("")
@ -210,11 +173,7 @@ func (s *Store) path(args ...string) (string, error) {
if err := os.MkdirAll(dir, 0o750); err != nil {
return "", err
}
target := filepath.Join(append([]string{dir}, args...)...)
if err := ensureSubpath(dir, target); err != nil {
return "", err
}
return target, nil
return dir, nil
}
func (s *Store) suggestStores(target string) ([]string, error) {
@ -236,6 +195,19 @@ func (s *Store) suggestStores(target string) ([]string, error) {
return suggestions, nil
}
func suggestKey(target string, keys []string) error {
minThreshold := 1
maxThreshold := 4
threshold := min(max(len(target)/3, minThreshold), maxThreshold)
var suggestions []string
for _, k := range keys {
if levenshtein.ComputeDistance(target, k) <= threshold {
suggestions = append(suggestions, k)
}
}
return errNotFound{suggestions}
}
func ensureSubpath(base, target string) error {
absBase, err := filepath.Abs(base)
if err != nil {
@ -278,22 +250,17 @@ func formatExpiry(expiresAt uint64) string {
// Keys returns all keys for the provided store name (or default if empty).
// Keys are returned in lowercase to mirror stored key format.
func (s *Store) Keys(dbName string) ([]string, error) {
db, err := s.open(dbName)
p, err := s.storePath(dbName)
if err != nil {
return nil, err
}
defer db.Close()
tx := db.NewTransaction(false)
defer tx.Discard()
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
var keys []string
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
keys = append(keys, string(item.Key()))
entries, err := readStoreFile(p)
if err != nil {
return nil, err
}
keys := make([]string, len(entries))
for i, e := range entries {
keys[i] = e.Key
}
return keys, nil
}

View file

@ -1,9 +1,7 @@
package cmd
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
@ -68,29 +66,31 @@ func writeGitignore(repoDir string) error {
return nil
}
// snapshotDB copies a store's .ndjson file into the VCS directory.
func snapshotDB(store *Store, repoDir, db string) error {
targetDir := filepath.Join(repoDir, storeDirName)
if err := os.MkdirAll(targetDir, 0o750); err != nil {
return err
}
target := filepath.Join(targetDir, fmt.Sprintf("%s.ndjson", db))
f, err := os.Create(target)
srcPath, err := store.storePath(db)
if err != nil {
return err
}
defer f.Close()
opts := DumpOptions{
Encoding: "auto",
data, err := os.ReadFile(srcPath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
if err := dumpDatabase(store, db, f, opts); err != nil {
return err
}
return f.Sync()
target := filepath.Join(targetDir, db+".ndjson")
return os.WriteFile(target, data, 0o640)
}
// exportAllStores writes every Badger store to ndjson files under repoDir/stores
// exportAllStores copies every store's .ndjson file to repoDir/stores
// and removes stale snapshot files for deleted stores.
func exportAllStores(store *Store, repoDir string) error {
stores, err := store.AllStores()
@ -288,6 +288,8 @@ func currentBranch(dir string) (string, error) {
return branch, nil
}
// restoreAllSnapshots copies .ndjson files from VCS snapshot dir into store paths,
// and removes local stores that are not in the snapshot.
func restoreAllSnapshots(store *Store, repoDir string) error {
targetDir := filepath.Join(repoDir, storeDirName)
entries, err := os.ReadDir(targetDir)
@ -310,12 +312,18 @@ func restoreAllSnapshots(store *Store, repoDir string) error {
dbName := strings.TrimSuffix(e.Name(), ".ndjson")
snapshotDBs[dbName] = struct{}{}
dbPath, err := store.FindStore(dbName)
if err == nil {
_ = os.RemoveAll(dbPath)
srcPath := filepath.Join(targetDir, e.Name())
data, err := os.ReadFile(srcPath)
if err != nil {
return fmt.Errorf("restore %q: %w", dbName, err)
}
if err := restoreSnapshot(store, filepath.Join(targetDir, e.Name()), dbName); err != nil {
dstPath, err := store.storePath(dbName)
if err != nil {
return fmt.Errorf("restore %q: %w", dbName, err)
}
if err := os.WriteFile(dstPath, data, 0o640); err != nil {
return fmt.Errorf("restore %q: %w", dbName, err)
}
}
@ -328,11 +336,11 @@ func restoreAllSnapshots(store *Store, repoDir string) error {
if _, ok := snapshotDBs[db]; ok {
continue
}
dbPath, err := store.FindStore(db)
p, err := store.storePath(db)
if err != nil {
return err
}
if err := os.RemoveAll(dbPath); err != nil {
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove store '%s': %w", db, err)
}
}
@ -346,32 +354,13 @@ func wipeAllStores(store *Store) error {
return err
}
for _, db := range dbs {
path, err := store.FindStore(db)
p, err := store.storePath(db)
if err != nil {
return err
}
if err := os.RemoveAll(path); err != nil {
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove store '%s': %w", db, err)
}
}
return nil
}
func restoreSnapshot(store *Store, path string, dbName string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
db, err := store.open(dbName)
if err != nil {
return err
}
defer db.Close()
decoder := json.NewDecoder(bufio.NewReader(f))
_, err = restoreEntries(decoder, db, restoreOpts{})
return err
}