feat(globs): glob support for dump/restore, extracts some shared logic

This commit is contained in:
Lewis Wynne 2025-12-17 22:18:15 +00:00
parent 9869b663e2
commit 7890e9451d
9 changed files with 141 additions and 57 deletions

View file

@ -32,10 +32,24 @@ func restore(cmd *cobra.Command, args []string) error {
}
dbName = parsed
}
displayTarget := "@" + dbName
globPatterns, err := cmd.Flags().GetStringSlice("glob")
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
separators, err := parseGlobSeparators(cmd)
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
matchers, err := compileGlobMatchers(globPatterns, separators)
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
reader, closer, err := restoreInput(cmd)
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", dbName, err)
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
if closer != nil {
defer closer.Close()
@ -43,38 +57,38 @@ func restore(cmd *cobra.Command, args []string) error {
db, err := store.open(dbName)
if err != nil {
return fmt.Errorf("cannot restore '%s': %v", dbName, err)
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
defer db.Close()
scanner := bufio.NewScanner(reader)
buf := make([]byte, 1024*1024)
scanner.Buffer(buf, 8*1024*1024)
decoder := json.NewDecoder(bufio.NewReaderSize(reader, 8*1024*1024))
wb := db.NewWriteBatch()
defer wb.Cancel()
lineNo := 0
entryNo := 0
var restored int
var matched bool
for scanner.Scan() {
lineNo++
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
for {
var entry dumpEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
return fmt.Errorf("cannot restore '%s': line %d: %w", dbName, lineNo, err)
if err := decoder.Decode(&entry); err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("cannot restore '%s': entry %d: %w", displayTarget, entryNo+1, err)
}
entryNo++
if entry.Key == "" {
return fmt.Errorf("cannot restore '%s': line %d: missing key", dbName, lineNo)
return fmt.Errorf("cannot restore '%s': entry %d: missing key", displayTarget, entryNo)
}
if !globMatch(matchers, entry.Key) {
continue
}
value, err := decodeEntryValue(entry)
if err != nil {
return fmt.Errorf("cannot restore '%s': line %d: %w", dbName, lineNo, err)
return fmt.Errorf("cannot restore '%s': entry %d: %w", displayTarget, entryNo, err)
}
entryMeta := byte(0x0)
@ -85,23 +99,24 @@ func restore(cmd *cobra.Command, args []string) error {
writeEntry := badger.NewEntry([]byte(entry.Key), value).WithMeta(entryMeta)
if entry.ExpiresAt != nil {
if *entry.ExpiresAt < 0 {
return fmt.Errorf("cannot restore '%s': line %d: expires_at must be >= 0", dbName, lineNo)
return fmt.Errorf("cannot restore '%s': entry %d: expires_at must be >= 0", displayTarget, entryNo)
}
writeEntry.ExpiresAt = uint64(*entry.ExpiresAt)
}
if err := wb.SetEntry(writeEntry); err != nil {
return fmt.Errorf("cannot restore '%s': line %d: %w", dbName, lineNo, err)
return fmt.Errorf("cannot restore '%s': entry %d: %w", displayTarget, entryNo, err)
}
restored++
}
if err := scanner.Err(); err != nil {
return err
matched = true
}
if err := wb.Flush(); err != nil {
return fmt.Errorf("cannot restore '%s': %v", dbName, err)
return fmt.Errorf("cannot restore '%s': %v", displayTarget, err)
}
if len(matchers) > 0 && !matched {
return fmt.Errorf("cannot restore '%s': No matches for pattern", displayTarget)
}
fmt.Fprintf(cmd.ErrOrStderr(), "Restored %d entries into @%s\n", restored, dbName)
@ -140,5 +155,7 @@ func decodeEntryValue(entry dumpEntry) ([]byte, error) {
func init() {
restoreCmd.Flags().StringP("file", "f", "", "Path to an NDJSON dump (defaults to stdin)")
restoreCmd.Flags().StringSliceP("glob", "g", nil, "Restore keys matching glob pattern (repeatable)")
restoreCmd.Flags().String("glob-sep", "", fmt.Sprintf("Characters treated as separators for globbing (default %q)", defaultGlobSeparatorsDisplay()))
rootCmd.AddCommand(restoreCmd)
}