refactor(VCS)!: moves over to sync style -- WIP/unstable

This commit is contained in:
Lewis Wynne 2025-12-19 16:11:58 +00:00
parent 1bd862e5e0
commit 9c82ee70d4
7 changed files with 282 additions and 287 deletions

View file

@ -76,8 +76,7 @@ func delDb(cmd *cobra.Command, args []string) error {
if err := executeDeletion(path); err != nil { if err := executeDeletion(path); err != nil {
return err return err
} }
msg := fmt.Sprintf("rm-db @%s", dbName) return autoSync()
return autoCommit(store, []string{dbName}, msg)
} }
func executeDeletion(path string) error { func executeDeletion(path string) error {

View file

@ -119,8 +119,7 @@ func del(cmd *cobra.Command, args []string) error {
dbs = append(dbs, spec.DB) dbs = append(dbs, spec.DB)
labels = append(labels, t.display) labels = append(labels, t.display)
} }
msg := fmt.Sprintf("rm %s", strings.Join(labels, ", ")) return autoSync()
return autoCommit(store, dbs, msg)
} }
func init() { func init() {

View file

@ -27,9 +27,12 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"strings"
"unicode/utf8" "unicode/utf8"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"github.com/gobwas/glob"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -96,75 +99,13 @@ func dump(cmd *cobra.Command, args []string) error {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err) return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
} }
var matched bool opts := DumpOptions{
trans := TransactionArgs{ Encoding: mode,
key: targetDB, IncludeSecret: includeSecret,
readonly: true, Matchers: matchers,
sync: true, GlobPatterns: globPatterns,
transact: func(tx *badger.Txn, k []byte) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 64
it := tx.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
if !globMatch(matchers, string(key)) {
continue
} }
meta := item.UserMeta() return dumpDatabase(store, strings.TrimPrefix(targetDB, "@"), cmd.OutOrStdout(), opts)
isSecret := meta&metaSecret != 0
if isSecret && !includeSecret {
continue
}
expiresAt := item.ExpiresAt()
if err := item.Value(func(v []byte) error {
entry := dumpEntry{
Key: string(key),
Secret: isSecret,
}
if expiresAt > 0 {
ts := int64(expiresAt)
entry.ExpiresAt = &ts
}
switch mode {
case "base64":
encodeBase64(&entry, v)
case "text":
if err := encodeText(&entry, key, v); err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
case "auto":
if utf8.Valid(v) {
entry.Encoding = "text"
entry.Value = string(v)
} else {
encodeBase64(&entry, v)
}
}
payload, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
fmt.Fprintln(cmd.OutOrStdout(), string(payload))
matched = true
return nil
}); err != nil {
return fmt.Errorf("cannot dump '%s': %v", targetDB, err)
}
}
return nil
},
}
if err := store.Transaction(trans); err != nil {
return err
}
if len(matchers) > 0 && !matched {
return fmt.Errorf("cannot dump '%s': No matches for pattern %s", targetDB, formatGlobPatterns(globPatterns))
}
return nil
} }
func init() { func init() {
@ -188,3 +129,91 @@ func encodeText(entry *dumpEntry, key []byte, v []byte) error {
entry.Encoding = "text" entry.Encoding = "text"
return nil return nil
} }
// DumpOptions controls how a database is dumped to NDJSON.
type DumpOptions struct {
Encoding string
IncludeSecret bool
Matchers []glob.Glob
GlobPatterns []string
}
// dumpDatabase writes entries from dbName to w as NDJSON.
func dumpDatabase(store *Store, dbName string, w io.Writer, opts DumpOptions) error {
targetDB := "@" + dbName
if opts.Encoding == "" {
opts.Encoding = "auto"
}
var matched bool
trans := TransactionArgs{
key: targetDB,
readonly: true,
sync: true,
transact: func(tx *badger.Txn, k []byte) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
if !globMatch(opts.Matchers, string(key)) {
continue
}
meta := item.UserMeta()
isSecret := meta&metaSecret != 0
if isSecret && !opts.IncludeSecret {
continue
}
expiresAt := item.ExpiresAt()
if err := item.Value(func(v []byte) error {
entry := dumpEntry{
Key: string(key),
Secret: isSecret,
}
if expiresAt > 0 {
ts := int64(expiresAt)
entry.ExpiresAt = &ts
}
switch opts.Encoding {
case "base64":
encodeBase64(&entry, v)
case "text":
if err := encodeText(&entry, key, v); err != nil {
return err
}
case "auto":
if utf8.Valid(v) {
entry.Encoding = "text"
entry.Value = string(v)
} else {
encodeBase64(&entry, v)
}
default:
return fmt.Errorf("unsupported encoding '%s'", opts.Encoding)
}
payload, err := json.Marshal(entry)
if err != nil {
return err
}
_, err = fmt.Fprintln(w, string(payload))
if err == nil {
matched = true
}
return err
}); err != nil {
return err
}
}
return nil
},
}
if err := store.Transaction(trans); err != nil {
return err
}
if len(opts.Matchers) > 0 && !matched {
return fmt.Errorf("No matches for pattern %s", formatGlobPatterns(opts.GlobPatterns))
}
return nil
}

View file

@ -142,8 +142,7 @@ func mv(cmd *cobra.Command, args []string) error {
} }
if copy { if copy {
msg := fmt.Sprintf("cp %s -> %s", fromSpec.Display(), toSpec.Display()) return autoSync()
return autoCommit(store, []string{fromSpec.DB, toSpec.DB}, msg)
} }
if err := store.Transaction(TransactionArgs{ if err := store.Transaction(TransactionArgs{
@ -157,8 +156,7 @@ func mv(cmd *cobra.Command, args []string) error {
return err return err
} }
msg := fmt.Sprintf("mv %s -> %s", fromSpec.Display(), toSpec.Display()) return autoSync()
return autoCommit(store, []string{fromSpec.DB, toSpec.DB}, msg)
} }
var ( var (

View file

@ -165,8 +165,7 @@ func restore(cmd *cobra.Command, args []string) error {
} }
fmt.Fprintf(cmd.ErrOrStderr(), "Restored %d entries into @%s\n", restored, dbName) fmt.Fprintf(cmd.ErrOrStderr(), "Restored %d entries into @%s\n", restored, dbName)
msg := fmt.Sprintf("restore @%s (%d entries)", dbName, restored) return autoSync()
return autoCommit(store, []string{dbName}, msg)
} }
func restoreInput(cmd *cobra.Command) (io.Reader, io.Closer, error) { func restoreInput(cmd *cobra.Command) (io.Reader, io.Closer, error) {

View file

@ -124,8 +124,8 @@ func set(cmd *cobra.Command, args []string) error {
} }
valSummary := summarizeValue(value) valSummary := summarizeValue(value)
msg := fmt.Sprintf("set %s: %s", spec.Display(), valSummary) _ = fmt.Sprintf("set %s: %s", spec.Display(), valSummary) // placeholder for future messaging
return autoCommit(store, []string{spec.DB}, msg) return autoSync()
} }
func init() { func init() {

View file

@ -2,8 +2,8 @@ package cmd
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -11,7 +11,6 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
"unicode/utf8"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
gap "github.com/muesli/go-app-paths" gap "github.com/muesli/go-app-paths"
@ -31,86 +30,26 @@ var vcsInitCmd = &cobra.Command{
RunE: vcsInit, RunE: vcsInit,
} }
var vcsGitignoreCmd = &cobra.Command{ var vcsSyncCmd = &cobra.Command{
Use: "gitignore", Use: "sync",
Short: "generates a suitable .gitignore file", Short: "export, commit, pull, restore, and push changes",
SilenceUsage: true,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
repoDir, err := ensureVCSInitialized()
if err != nil {
return err
}
return writeGitignore(repoDir, rewrite)
},
}
var vcsSnapshotCmd = &cobra.Command{
Use: "snapshot",
Short: "commit a snapshot into the vcs",
SilenceUsage: true, SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
repoDir, err := ensureVCSInitialized()
if err != nil {
return err
}
store := &Store{} store := &Store{}
stores, err := store.AllStores() repoDir, err := preGitHook(store)
if err != nil { if err != nil {
return err return err
} }
for _, db := range stores { msg := fmt.Sprintf("sync: %s", time.Now().UTC().Format(time.RFC3339))
if err := snapshotDB(store, repoDir, db); err != nil {
return fmt.Errorf("snapshot %q: %w", db, err)
}
}
if err := runGit(repoDir, "add", storeDirName); err != nil { if err := runGit(repoDir, "add", storeDirName); err != nil {
return err return err
} }
if err := runGit(repoDir, "commit", "-m", msg); err != nil {
message := fmt.Sprintf("snapshot: %s", time.Now().UTC().Format(time.RFC3339))
if err := runGit(repoDir, "commit", "-m", message); err != nil {
fmt.Println(err.Error())
return nil
}
return nil
},
}
var vcsLogCmd = &cobra.Command{
Use: "log",
Short: "show git log for pda snapshots",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
repoDir, err := ensureVCSInitialized()
if err != nil {
return err
}
return runGit(repoDir, "log", "--oneline", "--graph", "--decorate")
},
}
var vcsPullCmd = &cobra.Command{
Use: "pull",
Short: "pull snapshots from remote and restore into local store",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
repoDir, err := ensureVCSInitialized()
if err != nil {
return err
}
clean, err := cmd.Flags().GetBool("clean")
if err != nil {
return err return err
} }
pulled := false
hasUpstream, err := repoHasUpstream(repoDir) hasUpstream, err := repoHasUpstream(repoDir)
if err != nil { if err != nil {
return err return err
@ -119,50 +58,56 @@ var vcsPullCmd = &cobra.Command{
if err := runGit(repoDir, "pull"); err != nil { if err := runGit(repoDir, "pull"); err != nil {
return err return err
} }
} pulled = true
} else {
store := &Store{}
if clean {
fmt.Printf("this will remove all existing stores before restoring from version control. continue? (y/n)\n")
var confirm string
if _, err := fmt.Scanln(&confirm); err != nil {
return fmt.Errorf("cannot clean stores: %w", err)
}
if strings.ToLower(confirm) != "y" {
return fmt.Errorf("pull aborted; stores not removed")
}
if err := wipeAllStores(store); err != nil {
return err
}
}
if err := restoreAllSnapshots(store, repoDir); err != nil {
return err
}
return nil
},
}
var vcsPushCmd = &cobra.Command{
Use: "push",
Short: "push local snapshots to remote",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
repoDir, err := ensureVCSInitialized()
if err != nil {
return err
}
hasUpstream, err := repoHasUpstream(repoDir)
if err != nil {
return err
}
if !hasUpstream {
hasOrigin, err := repoHasRemote(repoDir, "origin") hasOrigin, err := repoHasRemote(repoDir, "origin")
if err != nil { if err != nil {
return err return err
} }
if !hasOrigin { if hasOrigin {
return fmt.Errorf("no upstream configured; set a remote before pushing") branch, err := currentBranch(repoDir)
if err != nil {
return err
} }
if branch == "" {
branch = "main"
}
fmt.Printf("running: git pull origin %s\n", branch)
if err := runGit(repoDir, "pull", "origin", branch); err != nil {
return err
}
pulled = true
} else {
fmt.Println("no remote configured; skipping pull")
}
}
if pulled {
conflicted, err := hasMergeConflicts(repoDir)
if err != nil {
return err
}
if conflicted {
return fmt.Errorf("git pull left merge conflicts; resolve and re-run sync")
}
if err := restoreAllSnapshots(store, repoDir); err != nil {
return err
}
}
hasUpstream, err = repoHasUpstream(repoDir)
if err != nil {
return err
}
if hasUpstream {
return runGit(repoDir, "push")
}
hasOrigin, err := repoHasRemote(repoDir, "origin")
if err != nil {
return err
}
if hasOrigin {
branch, err := currentBranch(repoDir) branch, err := currentBranch(repoDir)
if err != nil { if err != nil {
return err return err
@ -173,25 +118,18 @@ var vcsPushCmd = &cobra.Command{
fmt.Printf("running: git push -u origin %s\n", branch) fmt.Printf("running: git push -u origin %s\n", branch)
return runGit(repoDir, "push", "-u", "origin", branch) return runGit(repoDir, "push", "-u", "origin", branch)
} }
return runGit(repoDir, "push")
fmt.Println("no remote configured; skipping push")
return nil
}, },
} }
var ( const storeDirName = "stores"
storeDirName string = "stores"
rewrite bool = false
)
func init() { func init() {
vcsInitCmd.Flags().Bool("clean", false, "Remove existing VCS directory before initialising") vcsInitCmd.Flags().Bool("clean", false, "Remove existing VCS directory before initialising")
vcsCmd.AddCommand(vcsInitCmd) vcsCmd.AddCommand(vcsInitCmd)
vcsGitignoreCmd.Flags().BoolVarP(&rewrite, "rewrite", "r", false, "Rewrite existing .gitignore, if present") vcsCmd.AddCommand(vcsSyncCmd)
vcsCmd.AddCommand(vcsGitignoreCmd)
vcsCmd.AddCommand(vcsSnapshotCmd)
vcsCmd.AddCommand(vcsLogCmd)
vcsPullCmd.Flags().Bool("clean", false, "Remove all existing stores before restoring snapshots")
vcsCmd.AddCommand(vcsPullCmd)
vcsCmd.AddCommand(vcsPushCmd)
rootCmd.AddCommand(vcsCmd) rootCmd.AddCommand(vcsCmd)
} }
@ -200,6 +138,7 @@ func vcsInit(cmd *cobra.Command, args []string) error {
if err != nil { if err != nil {
return err return err
} }
store := &Store{}
clean, err := cmd.Flags().GetBool("clean") clean, err := cmd.Flags().GetBool("clean")
if err != nil { if err != nil {
@ -220,6 +159,21 @@ func vcsInit(cmd *cobra.Command, args []string) error {
if err := os.RemoveAll(repoDir); err != nil { if err := os.RemoveAll(repoDir); err != nil {
return fmt.Errorf("cannot clean vcs dir: %w", err) return fmt.Errorf("cannot clean vcs dir: %w", err)
} }
dbs, err := store.AllStores()
if err == nil && len(dbs) > 0 {
fmt.Printf("remove all existing stores? (y/n)\n")
var confirm string
if _, err := fmt.Scanln(&confirm); err != nil {
return fmt.Errorf("cannot clean stores: %w", err)
}
if strings.ToLower(confirm) != "y" {
return fmt.Errorf("aborted cleaning stores")
}
if err := wipeAllStores(store); err != nil {
return fmt.Errorf("cannot clean stores: %w", err)
}
}
} }
if err := os.MkdirAll(filepath.Join(repoDir), 0o750); err != nil { if err := os.MkdirAll(filepath.Join(repoDir), 0o750); err != nil {
return err return err
@ -244,7 +198,7 @@ func vcsInit(cmd *cobra.Command, args []string) error {
return nil return nil
} }
return vcsGitignoreCmd.RunE(cmd, args) return writeGitignore(repoDir)
} }
func vcsRepoRoot() (string, error) { func vcsRepoRoot() (string, error) {
@ -273,9 +227,23 @@ func ensureVCSInitialized() (string, error) {
return repoDir, nil return repoDir, nil
} }
func writeGitignore(repoDir string, rewrite bool) error { func preGitHook(store *Store) (string, error) {
repoDir, err := ensureVCSInitialized()
if err != nil {
return "", err
}
if store == nil {
store = &Store{}
}
if err := exportAllStores(store, repoDir); err != nil {
return "", err
}
return repoDir, nil
}
func writeGitignore(repoDir string) error {
path := filepath.Join(repoDir, ".gitignore") path := filepath.Join(repoDir, ".gitignore")
if _, err := os.Stat(path); os.IsNotExist(err) || rewrite { if _, err := os.Stat(path); os.IsNotExist(err) {
content := strings.Join([]string{ content := strings.Join([]string{
"# generated by pda", "# generated by pda",
"*", "*",
@ -310,55 +278,58 @@ func snapshotDB(store *Store, repoDir, db string) error {
} }
defer f.Close() defer f.Close()
trans := TransactionArgs{ opts := DumpOptions{
key: "@" + db, Encoding: "auto",
readonly: true, IncludeSecret: false,
sync: true,
transact: func(tx *badger.Txn, k []byte) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
meta := item.UserMeta()
isSecret := meta&metaSecret != 0
expiresAt := item.ExpiresAt()
if err := item.Value(func(v []byte) error {
entry := dumpEntry{
Key: string(key),
Secret: isSecret,
} }
if expiresAt > 0 { if err := dumpDatabase(store, db, f, opts); err != nil {
ts := int64(expiresAt)
entry.ExpiresAt = &ts
}
if utf8.Valid(v) {
entry.Encoding = "text"
entry.Value = string(v)
} else {
encodeBase64(&entry, v)
}
payload, err := json.Marshal(entry)
if err != nil {
return err
}
_, err = fmt.Fprintln(f, string(payload))
return err
}); err != nil {
return err
}
}
return nil
},
}
if err := store.Transaction(trans); err != nil {
return err return err
} }
return f.Sync() return f.Sync()
} }
// exportAllStores writes every Badger store to ndjson files under repoDir/stores
// and removes stale snapshot files for deleted databases.
func exportAllStores(store *Store, repoDir string) error {
stores, err := store.AllStores()
if err != nil {
return err
}
targetDir := filepath.Join(repoDir, storeDirName)
if err := os.MkdirAll(targetDir, 0o750); err != nil {
return err
}
current := make(map[string]struct{})
for _, db := range stores {
current[db] = struct{}{}
if err := snapshotDB(store, repoDir, db); err != nil {
return fmt.Errorf("snapshot %q: %w", db, err)
}
}
entries, err := os.ReadDir(targetDir)
if err != nil {
return err
}
for _, e := range entries {
if e.IsDir() || filepath.Ext(e.Name()) != ".ndjson" {
continue
}
dbName := strings.TrimSuffix(e.Name(), ".ndjson")
if _, ok := current[dbName]; ok {
continue
}
if err := os.Remove(filepath.Join(targetDir, e.Name())); err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
func runGit(dir string, args ...string) error { func runGit(dir string, args ...string) error {
cmd := exec.Command("git", args...) cmd := exec.Command("git", args...)
cmd.Dir = dir cmd.Dir = dir
@ -420,6 +391,8 @@ func restoreAllSnapshots(store *Store, repoDir string) error {
} }
return err return err
} }
snapshotDBs := make(map[string]struct{})
for _, e := range entries { for _, e := range entries {
if e.IsDir() { if e.IsDir() {
continue continue
@ -428,10 +401,35 @@ func restoreAllSnapshots(store *Store, repoDir string) error {
continue continue
} }
dbName := strings.TrimSuffix(e.Name(), ".ndjson") dbName := strings.TrimSuffix(e.Name(), ".ndjson")
snapshotDBs[dbName] = struct{}{}
dbPath, err := store.FindStore(dbName)
if err == nil {
_ = os.RemoveAll(dbPath)
}
if err := restoreSnapshot(store, filepath.Join(targetDir, e.Name()), dbName); err != nil { if err := restoreSnapshot(store, filepath.Join(targetDir, e.Name()), dbName); err != nil {
return fmt.Errorf("restore %q: %w", dbName, err) return fmt.Errorf("restore %q: %w", dbName, err)
} }
} }
localDBs, err := store.AllStores()
if err != nil {
return err
}
for _, db := range localDBs {
if _, ok := snapshotDBs[db]; ok {
continue
}
dbPath, err := store.FindStore(db)
if err != nil {
return err
}
if err := os.RemoveAll(dbPath); err != nil {
return fmt.Errorf("remove db '%s': %w", db, err)
}
}
return nil return nil
} }
@ -512,49 +510,22 @@ func restoreSnapshot(store *Store, path string, dbName string) error {
return nil return nil
} }
func autoCommit(store *Store, dbs []string, message string) error { // hasMergeConflicts returns true if there are files with unresolved merge
// conflicts in the working tree.
func hasMergeConflicts(dir string) (bool, error) {
cmd := exec.Command("git", "diff", "--name-only", "--diff-filter=U")
cmd.Dir = dir
out, err := cmd.Output()
if err != nil {
return false, err
}
return len(bytes.TrimSpace(out)) > 0, nil
}
func autoSync() error {
if !config.Git.AutoCommit { if !config.Git.AutoCommit {
return nil return nil
} }
// Reuse the sync command end-to-end (export, commit, pull/restore, push).
repoDir, err := ensureVCSInitialized() return vcsSyncCmd.RunE(vcsSyncCmd, []string{})
if err != nil {
return err
}
unique := make(map[string]struct{})
for _, db := range dbs {
if db == "" {
db = config.Store.DefaultStoreName
}
unique[db] = struct{}{}
}
for db := range unique {
if err := snapshotOrRemoveDB(store, repoDir, db); err != nil {
return err
}
}
if err := runGit(repoDir, "add", storeDirName); err != nil {
return err
}
return runGit(repoDir, "commit", "--allow-empty", "-m", message)
}
func snapshotOrRemoveDB(store *Store, repoDir, db string) error {
_, err := store.FindStore(db)
var nf errNotFound
if errors.As(err, &nf) {
snapPath := filepath.Join(repoDir, storeDirName, fmt.Sprintf("%s.ndjson", db))
if rmErr := os.Remove(snapPath); rmErr != nil && !os.IsNotExist(rmErr) {
return rmErr
}
return nil
}
if err != nil {
return err
}
return snapshotDB(store, repoDir, db)
} }