From 9c82ee70d4088d8df60e670427907dc5e511c34a Mon Sep 17 00:00:00 2001 From: lew Date: Fri, 19 Dec 2025 16:11:58 +0000 Subject: [PATCH] refactor(VCS)!: moves over to sync style -- WIP/unstable --- cmd/del-db.go | 3 +- cmd/del.go | 3 +- cmd/dump.go | 165 ++++++++++++--------- cmd/mv.go | 6 +- cmd/restore.go | 3 +- cmd/set.go | 4 +- cmd/vcs.go | 385 +++++++++++++++++++++++-------------------------- 7 files changed, 282 insertions(+), 287 deletions(-) diff --git a/cmd/del-db.go b/cmd/del-db.go index 87fd27d..e563f33 100644 --- a/cmd/del-db.go +++ b/cmd/del-db.go @@ -76,8 +76,7 @@ func delDb(cmd *cobra.Command, args []string) error { if err := executeDeletion(path); err != nil { return err } - msg := fmt.Sprintf("rm-db @%s", dbName) - return autoCommit(store, []string{dbName}, msg) + return autoSync() } func executeDeletion(path string) error { diff --git a/cmd/del.go b/cmd/del.go index 5a59da1..a2e6072 100644 --- a/cmd/del.go +++ b/cmd/del.go @@ -119,8 +119,7 @@ func del(cmd *cobra.Command, args []string) error { dbs = append(dbs, spec.DB) labels = append(labels, t.display) } - msg := fmt.Sprintf("rm %s", strings.Join(labels, ", ")) - return autoCommit(store, dbs, msg) + return autoSync() } func init() { diff --git a/cmd/dump.go b/cmd/dump.go index 781d3bc..fe0f7f0 100644 --- a/cmd/dump.go +++ b/cmd/dump.go @@ -27,9 +27,12 @@ import ( "encoding/json" "errors" "fmt" + "io" + "strings" "unicode/utf8" "github.com/dgraph-io/badger/v4" + "github.com/gobwas/glob" "github.com/spf13/cobra" ) @@ -96,75 +99,13 @@ func dump(cmd *cobra.Command, args []string) error { return fmt.Errorf("cannot dump '%s': %v", targetDB, err) } - var matched bool - trans := TransactionArgs{ - key: targetDB, - readonly: true, - sync: true, - transact: func(tx *badger.Txn, k []byte) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 64 - it := tx.NewIterator(opts) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - key := item.KeyCopy(nil) - if !globMatch(matchers, string(key)) { - continue - } - meta := item.UserMeta() - isSecret := meta&metaSecret != 0 - if isSecret && !includeSecret { - continue - } - expiresAt := item.ExpiresAt() - if err := item.Value(func(v []byte) error { - entry := dumpEntry{ - Key: string(key), - Secret: isSecret, - } - if expiresAt > 0 { - ts := int64(expiresAt) - entry.ExpiresAt = &ts - } - switch mode { - case "base64": - encodeBase64(&entry, v) - case "text": - if err := encodeText(&entry, key, v); err != nil { - return fmt.Errorf("cannot dump '%s': %v", targetDB, err) - } - case "auto": - if utf8.Valid(v) { - entry.Encoding = "text" - entry.Value = string(v) - } else { - encodeBase64(&entry, v) - } - } - payload, err := json.Marshal(entry) - if err != nil { - return fmt.Errorf("cannot dump '%s': %v", targetDB, err) - } - fmt.Fprintln(cmd.OutOrStdout(), string(payload)) - matched = true - return nil - }); err != nil { - return fmt.Errorf("cannot dump '%s': %v", targetDB, err) - } - } - return nil - }, + opts := DumpOptions{ + Encoding: mode, + IncludeSecret: includeSecret, + Matchers: matchers, + GlobPatterns: globPatterns, } - - if err := store.Transaction(trans); err != nil { - return err - } - - if len(matchers) > 0 && !matched { - return fmt.Errorf("cannot dump '%s': No matches for pattern %s", targetDB, formatGlobPatterns(globPatterns)) - } - return nil + return dumpDatabase(store, strings.TrimPrefix(targetDB, "@"), cmd.OutOrStdout(), opts) } func init() { @@ -188,3 +129,91 @@ func encodeText(entry *dumpEntry, key []byte, v []byte) error { entry.Encoding = "text" return nil } + +// DumpOptions controls how a database is dumped to NDJSON. +type DumpOptions struct { + Encoding string + IncludeSecret bool + Matchers []glob.Glob + GlobPatterns []string +} + +// dumpDatabase writes entries from dbName to w as NDJSON. +func dumpDatabase(store *Store, dbName string, w io.Writer, opts DumpOptions) error { + targetDB := "@" + dbName + if opts.Encoding == "" { + opts.Encoding = "auto" + } + + var matched bool + trans := TransactionArgs{ + key: targetDB, + readonly: true, + sync: true, + transact: func(tx *badger.Txn, k []byte) error { + it := tx.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.KeyCopy(nil) + if !globMatch(opts.Matchers, string(key)) { + continue + } + meta := item.UserMeta() + isSecret := meta&metaSecret != 0 + if isSecret && !opts.IncludeSecret { + continue + } + expiresAt := item.ExpiresAt() + if err := item.Value(func(v []byte) error { + entry := dumpEntry{ + Key: string(key), + Secret: isSecret, + } + if expiresAt > 0 { + ts := int64(expiresAt) + entry.ExpiresAt = &ts + } + switch opts.Encoding { + case "base64": + encodeBase64(&entry, v) + case "text": + if err := encodeText(&entry, key, v); err != nil { + return err + } + case "auto": + if utf8.Valid(v) { + entry.Encoding = "text" + entry.Value = string(v) + } else { + encodeBase64(&entry, v) + } + default: + return fmt.Errorf("unsupported encoding '%s'", opts.Encoding) + } + payload, err := json.Marshal(entry) + if err != nil { + return err + } + _, err = fmt.Fprintln(w, string(payload)) + if err == nil { + matched = true + } + return err + }); err != nil { + return err + } + } + return nil + }, + } + + if err := store.Transaction(trans); err != nil { + return err + } + + if len(opts.Matchers) > 0 && !matched { + return fmt.Errorf("No matches for pattern %s", formatGlobPatterns(opts.GlobPatterns)) + } + return nil +} diff --git a/cmd/mv.go b/cmd/mv.go index 39c02b5..d902b3c 100644 --- a/cmd/mv.go +++ b/cmd/mv.go @@ -142,8 +142,7 @@ func mv(cmd *cobra.Command, args []string) error { } if copy { - msg := fmt.Sprintf("cp %s -> %s", fromSpec.Display(), toSpec.Display()) - return autoCommit(store, []string{fromSpec.DB, toSpec.DB}, msg) + return autoSync() } if err := store.Transaction(TransactionArgs{ @@ -157,8 +156,7 @@ func mv(cmd *cobra.Command, args []string) error { return err } - msg := fmt.Sprintf("mv %s -> %s", fromSpec.Display(), toSpec.Display()) - return autoCommit(store, []string{fromSpec.DB, toSpec.DB}, msg) + return autoSync() } var ( diff --git a/cmd/restore.go b/cmd/restore.go index 933b0ec..b42c983 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -165,8 +165,7 @@ func restore(cmd *cobra.Command, args []string) error { } fmt.Fprintf(cmd.ErrOrStderr(), "Restored %d entries into @%s\n", restored, dbName) - msg := fmt.Sprintf("restore @%s (%d entries)", dbName, restored) - return autoCommit(store, []string{dbName}, msg) + return autoSync() } func restoreInput(cmd *cobra.Command) (io.Reader, io.Closer, error) { diff --git a/cmd/set.go b/cmd/set.go index 72ef5c4..19462a9 100644 --- a/cmd/set.go +++ b/cmd/set.go @@ -124,8 +124,8 @@ func set(cmd *cobra.Command, args []string) error { } valSummary := summarizeValue(value) - msg := fmt.Sprintf("set %s: %s", spec.Display(), valSummary) - return autoCommit(store, []string{spec.DB}, msg) + _ = fmt.Sprintf("set %s: %s", spec.Display(), valSummary) // placeholder for future messaging + return autoSync() } func init() { diff --git a/cmd/vcs.go b/cmd/vcs.go index 951e7e0..f98b2b7 100644 --- a/cmd/vcs.go +++ b/cmd/vcs.go @@ -2,8 +2,8 @@ package cmd import ( "bufio" + "bytes" "encoding/json" - "errors" "fmt" "io" "os" @@ -11,7 +11,6 @@ import ( "path/filepath" "strings" "time" - "unicode/utf8" "github.com/dgraph-io/badger/v4" gap "github.com/muesli/go-app-paths" @@ -31,86 +30,26 @@ var vcsInitCmd = &cobra.Command{ RunE: vcsInit, } -var vcsGitignoreCmd = &cobra.Command{ - Use: "gitignore", - Short: "generates a suitable .gitignore file", - SilenceUsage: true, - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, args []string) error { - repoDir, err := ensureVCSInitialized() - if err != nil { - return err - } - - return writeGitignore(repoDir, rewrite) - }, -} - -var vcsSnapshotCmd = &cobra.Command{ - Use: "snapshot", - Short: "commit a snapshot into the vcs", +var vcsSyncCmd = &cobra.Command{ + Use: "sync", + Short: "export, commit, pull, restore, and push changes", SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - repoDir, err := ensureVCSInitialized() - if err != nil { - return err - } - store := &Store{} - stores, err := store.AllStores() + repoDir, err := preGitHook(store) if err != nil { return err } - for _, db := range stores { - if err := snapshotDB(store, repoDir, db); err != nil { - return fmt.Errorf("snapshot %q: %w", db, err) - } - } - + msg := fmt.Sprintf("sync: %s", time.Now().UTC().Format(time.RFC3339)) if err := runGit(repoDir, "add", storeDirName); err != nil { return err } - - message := fmt.Sprintf("snapshot: %s", time.Now().UTC().Format(time.RFC3339)) - if err := runGit(repoDir, "commit", "-m", message); err != nil { - fmt.Println(err.Error()) - return nil - } - - return nil - }, -} - -var vcsLogCmd = &cobra.Command{ - Use: "log", - Short: "show git log for pda snapshots", - SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { - - repoDir, err := ensureVCSInitialized() - if err != nil { - return err - } - return runGit(repoDir, "log", "--oneline", "--graph", "--decorate") - }, -} - -var vcsPullCmd = &cobra.Command{ - Use: "pull", - Short: "pull snapshots from remote and restore into local store", - SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { - repoDir, err := ensureVCSInitialized() - if err != nil { - return err - } - - clean, err := cmd.Flags().GetBool("clean") - if err != nil { + if err := runGit(repoDir, "commit", "-m", msg); err != nil { return err } + pulled := false hasUpstream, err := repoHasUpstream(repoDir) if err != nil { return err @@ -119,50 +58,56 @@ var vcsPullCmd = &cobra.Command{ if err := runGit(repoDir, "pull"); err != nil { return err } - } - - store := &Store{} - if clean { - fmt.Printf("this will remove all existing stores before restoring from version control. continue? (y/n)\n") - var confirm string - if _, err := fmt.Scanln(&confirm); err != nil { - return fmt.Errorf("cannot clean stores: %w", err) - } - if strings.ToLower(confirm) != "y" { - return fmt.Errorf("pull aborted; stores not removed") - } - if err := wipeAllStores(store); err != nil { - return err - } - } - if err := restoreAllSnapshots(store, repoDir); err != nil { - return err - } - return nil - }, -} - -var vcsPushCmd = &cobra.Command{ - Use: "push", - Short: "push local snapshots to remote", - SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { - repoDir, err := ensureVCSInitialized() - if err != nil { - return err - } - hasUpstream, err := repoHasUpstream(repoDir) - if err != nil { - return err - } - if !hasUpstream { + pulled = true + } else { hasOrigin, err := repoHasRemote(repoDir, "origin") if err != nil { return err } - if !hasOrigin { - return fmt.Errorf("no upstream configured; set a remote before pushing") + if hasOrigin { + branch, err := currentBranch(repoDir) + if err != nil { + return err + } + if branch == "" { + branch = "main" + } + fmt.Printf("running: git pull origin %s\n", branch) + if err := runGit(repoDir, "pull", "origin", branch); err != nil { + return err + } + pulled = true + } else { + fmt.Println("no remote configured; skipping pull") } + } + + if pulled { + conflicted, err := hasMergeConflicts(repoDir) + if err != nil { + return err + } + if conflicted { + return fmt.Errorf("git pull left merge conflicts; resolve and re-run sync") + } + if err := restoreAllSnapshots(store, repoDir); err != nil { + return err + } + } + + hasUpstream, err = repoHasUpstream(repoDir) + if err != nil { + return err + } + if hasUpstream { + return runGit(repoDir, "push") + } + + hasOrigin, err := repoHasRemote(repoDir, "origin") + if err != nil { + return err + } + if hasOrigin { branch, err := currentBranch(repoDir) if err != nil { return err @@ -173,25 +118,18 @@ var vcsPushCmd = &cobra.Command{ fmt.Printf("running: git push -u origin %s\n", branch) return runGit(repoDir, "push", "-u", "origin", branch) } - return runGit(repoDir, "push") + + fmt.Println("no remote configured; skipping push") + return nil }, } -var ( - storeDirName string = "stores" - rewrite bool = false -) +const storeDirName = "stores" func init() { vcsInitCmd.Flags().Bool("clean", false, "Remove existing VCS directory before initialising") vcsCmd.AddCommand(vcsInitCmd) - vcsGitignoreCmd.Flags().BoolVarP(&rewrite, "rewrite", "r", false, "Rewrite existing .gitignore, if present") - vcsCmd.AddCommand(vcsGitignoreCmd) - vcsCmd.AddCommand(vcsSnapshotCmd) - vcsCmd.AddCommand(vcsLogCmd) - vcsPullCmd.Flags().Bool("clean", false, "Remove all existing stores before restoring snapshots") - vcsCmd.AddCommand(vcsPullCmd) - vcsCmd.AddCommand(vcsPushCmd) + vcsCmd.AddCommand(vcsSyncCmd) rootCmd.AddCommand(vcsCmd) } @@ -200,6 +138,7 @@ func vcsInit(cmd *cobra.Command, args []string) error { if err != nil { return err } + store := &Store{} clean, err := cmd.Flags().GetBool("clean") if err != nil { @@ -220,6 +159,21 @@ func vcsInit(cmd *cobra.Command, args []string) error { if err := os.RemoveAll(repoDir); err != nil { return fmt.Errorf("cannot clean vcs dir: %w", err) } + + dbs, err := store.AllStores() + if err == nil && len(dbs) > 0 { + fmt.Printf("remove all existing stores? (y/n)\n") + var confirm string + if _, err := fmt.Scanln(&confirm); err != nil { + return fmt.Errorf("cannot clean stores: %w", err) + } + if strings.ToLower(confirm) != "y" { + return fmt.Errorf("aborted cleaning stores") + } + if err := wipeAllStores(store); err != nil { + return fmt.Errorf("cannot clean stores: %w", err) + } + } } if err := os.MkdirAll(filepath.Join(repoDir), 0o750); err != nil { return err @@ -244,7 +198,7 @@ func vcsInit(cmd *cobra.Command, args []string) error { return nil } - return vcsGitignoreCmd.RunE(cmd, args) + return writeGitignore(repoDir) } func vcsRepoRoot() (string, error) { @@ -273,9 +227,23 @@ func ensureVCSInitialized() (string, error) { return repoDir, nil } -func writeGitignore(repoDir string, rewrite bool) error { +func preGitHook(store *Store) (string, error) { + repoDir, err := ensureVCSInitialized() + if err != nil { + return "", err + } + if store == nil { + store = &Store{} + } + if err := exportAllStores(store, repoDir); err != nil { + return "", err + } + return repoDir, nil +} + +func writeGitignore(repoDir string) error { path := filepath.Join(repoDir, ".gitignore") - if _, err := os.Stat(path); os.IsNotExist(err) || rewrite { + if _, err := os.Stat(path); os.IsNotExist(err) { content := strings.Join([]string{ "# generated by pda", "*", @@ -310,55 +278,58 @@ func snapshotDB(store *Store, repoDir, db string) error { } defer f.Close() - trans := TransactionArgs{ - key: "@" + db, - readonly: true, - sync: true, - transact: func(tx *badger.Txn, k []byte) error { - it := tx.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - key := item.KeyCopy(nil) - meta := item.UserMeta() - isSecret := meta&metaSecret != 0 - expiresAt := item.ExpiresAt() - if err := item.Value(func(v []byte) error { - entry := dumpEntry{ - Key: string(key), - Secret: isSecret, - } - if expiresAt > 0 { - ts := int64(expiresAt) - entry.ExpiresAt = &ts - } - if utf8.Valid(v) { - entry.Encoding = "text" - entry.Value = string(v) - } else { - encodeBase64(&entry, v) - } - payload, err := json.Marshal(entry) - if err != nil { - return err - } - _, err = fmt.Fprintln(f, string(payload)) - return err - }); err != nil { - return err - } - } - return nil - }, + opts := DumpOptions{ + Encoding: "auto", + IncludeSecret: false, } - - if err := store.Transaction(trans); err != nil { + if err := dumpDatabase(store, db, f, opts); err != nil { return err } return f.Sync() } +// exportAllStores writes every Badger store to ndjson files under repoDir/stores +// and removes stale snapshot files for deleted databases. +func exportAllStores(store *Store, repoDir string) error { + stores, err := store.AllStores() + if err != nil { + return err + } + + targetDir := filepath.Join(repoDir, storeDirName) + if err := os.MkdirAll(targetDir, 0o750); err != nil { + return err + } + + current := make(map[string]struct{}) + for _, db := range stores { + current[db] = struct{}{} + if err := snapshotDB(store, repoDir, db); err != nil { + return fmt.Errorf("snapshot %q: %w", db, err) + } + } + + entries, err := os.ReadDir(targetDir) + if err != nil { + return err + } + for _, e := range entries { + if e.IsDir() || filepath.Ext(e.Name()) != ".ndjson" { + continue + } + dbName := strings.TrimSuffix(e.Name(), ".ndjson") + if _, ok := current[dbName]; ok { + continue + } + if err := os.Remove(filepath.Join(targetDir, e.Name())); err != nil && !os.IsNotExist(err) { + return err + } + } + + return nil +} + func runGit(dir string, args ...string) error { cmd := exec.Command("git", args...) cmd.Dir = dir @@ -420,6 +391,8 @@ func restoreAllSnapshots(store *Store, repoDir string) error { } return err } + snapshotDBs := make(map[string]struct{}) + for _, e := range entries { if e.IsDir() { continue @@ -428,10 +401,35 @@ func restoreAllSnapshots(store *Store, repoDir string) error { continue } dbName := strings.TrimSuffix(e.Name(), ".ndjson") + snapshotDBs[dbName] = struct{}{} + + dbPath, err := store.FindStore(dbName) + if err == nil { + _ = os.RemoveAll(dbPath) + } + if err := restoreSnapshot(store, filepath.Join(targetDir, e.Name()), dbName); err != nil { return fmt.Errorf("restore %q: %w", dbName, err) } } + + localDBs, err := store.AllStores() + if err != nil { + return err + } + for _, db := range localDBs { + if _, ok := snapshotDBs[db]; ok { + continue + } + dbPath, err := store.FindStore(db) + if err != nil { + return err + } + if err := os.RemoveAll(dbPath); err != nil { + return fmt.Errorf("remove db '%s': %w", db, err) + } + } + return nil } @@ -512,49 +510,22 @@ func restoreSnapshot(store *Store, path string, dbName string) error { return nil } -func autoCommit(store *Store, dbs []string, message string) error { +// hasMergeConflicts returns true if there are files with unresolved merge +// conflicts in the working tree. +func hasMergeConflicts(dir string) (bool, error) { + cmd := exec.Command("git", "diff", "--name-only", "--diff-filter=U") + cmd.Dir = dir + out, err := cmd.Output() + if err != nil { + return false, err + } + return len(bytes.TrimSpace(out)) > 0, nil +} + +func autoSync() error { if !config.Git.AutoCommit { return nil } - - repoDir, err := ensureVCSInitialized() - if err != nil { - return err - } - - unique := make(map[string]struct{}) - for _, db := range dbs { - if db == "" { - db = config.Store.DefaultStoreName - } - unique[db] = struct{}{} - } - - for db := range unique { - if err := snapshotOrRemoveDB(store, repoDir, db); err != nil { - return err - } - } - - if err := runGit(repoDir, "add", storeDirName); err != nil { - return err - } - - return runGit(repoDir, "commit", "--allow-empty", "-m", message) -} - -func snapshotOrRemoveDB(store *Store, repoDir, db string) error { - _, err := store.FindStore(db) - var nf errNotFound - if errors.As(err, &nf) { - snapPath := filepath.Join(repoDir, storeDirName, fmt.Sprintf("%s.ndjson", db)) - if rmErr := os.Remove(snapPath); rmErr != nil && !os.IsNotExist(rmErr) { - return rmErr - } - return nil - } - if err != nil { - return err - } - return snapshotDB(store, repoDir, db) + // Reuse the sync command end-to-end (export, commit, pull/restore, push). + return vcsSyncCmd.RunE(vcsSyncCmd, []string{}) }