Recursive REPL ingest

This commit is contained in:
Ehmry - 2018-12-23 03:23:10 +01:00
parent 1aba9dcd42
commit e583830f73
3 changed files with 133 additions and 128 deletions

View File

@ -3,12 +3,60 @@ when not isMainModule:
import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin
import cbor import cbor
import ./blobsets, ./blobsets/stores, ./blobsets/fsnodes import ./blobsets, ./blobsets/stores
when defined(genode): when defined(genode):
import dagfsclient import dagfsclient
else:
import ./blobsets/tcp #else:
# import ./blobsets/tcp
proc dumpMain() =
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
let store = newFileStore("/tmp/blobs")
for i in 1..args.high:
try:
for chunk in store.dumpBlob(args[i].toBlobId):
write(stdout, chunk)
except:
writeLine(stderr, "failed to dump '", args[i], "', ", getCurrentExceptionMsg())
quit(-1)
proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: string) =
try:
case kind
of pcFile, pcLinkToFile:
var path = normalizedPath path
let (id, size) = store.ingestFile(path)
path.removePrefix(getCurrentDir())
path.removePrefix("/")
set.insert(path, id, size)
writeLine(stdout, id, align($size, 11), " ", path)
of pcDir, pcLinkToDir:
for kind, subPath in path.walkDir:
set.insertPath(store, kind, subPath)
except:
let e = getCurrentException()
writeLine(stderr, "failed to ingest '", path, "', ", e.msg)
# raise e
proc ingestMain() =
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
var set = newBlobSet()
let store = newFileStore("/tmp/blobs")
for i in 1..args.high:
let path = normalizedPath args[i]
set.insertPath(store, path.getFileInfo.kind, path)
let final = store.commit set
writeLine(stdout, final.setId)
type type
EvalError = object of CatchableError EvalError = object of CatchableError
@ -16,8 +64,9 @@ type
Env = ref EnvObj Env = ref EnvObj
AtomKind = enum AtomKind = enum
atomBlob
atomSet
atomPath atomPath
atomCid
atomString atomString
atomNum atomNum
atomSymbol atomSymbol
@ -25,11 +74,14 @@ type
Atom = object Atom = object
case kind: AtomKind case kind: AtomKind
of atomBlob:
blob: BlobId
size: BiggestInt
of atomSet:
bs: BlobSet
of atomPath: of atomPath:
path: string path: string
name: string name: string
of atomCid:
cid: Cid
of atomString: of atomString:
str: string str: string
of atomNum: of atomNum:
@ -68,14 +120,16 @@ type
store: BlobStore store: BlobStore
bindings: Table[string, NodeObj] bindings: Table[string, NodeObj]
blobs: Table[string, tuple[id: BlobId, size: BiggestInt]] blobs: Table[string, tuple[id: BlobId, size: BiggestInt]]
paths: Table[string, FsNode] sets: Table[string, BlobSet]
cids: Table[Cid, FsNode]
proc print(a: Atom; s: Stream) proc print(a: Atom; s: Stream)
proc print(ast: NodeRef; s: Stream) proc print(ast: NodeRef; s: Stream)
proc newAtom(c: Cid): Atom = proc newAtom(x: tuple[id: BlobId, size: BiggestInt]): Atom =
Atom(kind: atomCid, cid: c) Atom(kind: atomBlob, blob: x.id, size: x.size)
proc newAtom(bs: BlobSet): Atom =
Atom(kind: atomSet, bs: bs)
proc newAtomError(msg: string): Atom = proc newAtomError(msg: string): Atom =
Atom(kind: atomError, err: msg) Atom(kind: atomError, err: msg)
@ -160,28 +214,13 @@ proc getBlob(env: Env; path: string): tuple[id: BlobId, size: BiggestInt] =
if result.size != 0: if result.size != 0:
env.blobs[path] = result env.blobs[path] = result
proc getFile(env: Env; path: string): FsNode = proc getSet(env: Env; path: string): BlobSet=
result = env.paths.getOrDefault path result = env.sets.getOrDefault(path)
if result.isNil: if result.isNil:
result = env.store.addFile(path) result = newBlobSet()
assert(not result.isNil) result.insertPath(env.store, pcDir, path)
env.paths[path] = result if not result.isEmpty:
env.sets[path] = result
proc getDir(env: Env; path: string): FsNode =
result = env.paths.getOrDefault path
if result.isNil:
result = env.store.addDir(path)
assert(not result.isNil)
env.paths[path] = result
proc getUnixfs(env: Env; cid: Cid): FsNode =
assert cid.isValid
result = env.cids.getOrDefault cid
if result.isNil:
var raw = ""
env.store.get(cid, raw)
result = parseFs(raw, cid)
env.cids[cid] = result
type type
Tokens = seq[string] Tokens = seq[string]
@ -207,8 +246,12 @@ proc print(a: Atom; s: Stream) =
case a.kind case a.kind
of atomPath: of atomPath:
s.write a.path s.write a.path
of atomCid: of atomBlob:
s.write $a.cid s.write $a.blob
s.write '|'
s.write $a.size
of atomSet:
s.write "«set»"
of atomString: of atomString:
s.write '"' s.write '"'
s.write a.str s.write a.str
@ -269,8 +312,8 @@ proc readAtom(r: Reader): Atom =
elif token.contains DirSep: elif token.contains DirSep:
# TODO: memoize this, store a table of paths to atoms # TODO: memoize this, store a table of paths to atoms
newAtomPath token newAtomPath token
elif token.len == 46 or token.len > 48: elif token.len == blobVisualLen:
Atom(kind: atomCid, cid: token.toBlobId) Atom(kind: atomBlob, blob: token.toBlobId)
else: else:
Atom(kind: atomSymbol, sym: token.normalize) Atom(kind: atomSymbol, sym: token.normalize)
#except: #except:
@ -342,12 +385,13 @@ proc applyFunc(env: Env; args: NodeObj): NodeRef =
proc cborFunc(env: Env; arg: NodeObj): NodeRef = proc cborFunc(env: Env; arg: NodeObj): NodeRef =
assertArgCount(arg, 1) assertArgCount(arg, 1)
let newNode(newAtomString($toCbor(arg.atom.bs)))
a = arg.atom
ufsNode = env.getUnixfs a.cid proc commitFunc(env: Env; arg: NodeObj): NodeRef =
diag = $ufsNode.toCbor assertArgCount(arg, 1)
diag.newAtomString.newNode raiseAssert("not implemented")
#[
proc copyFunc(env: Env; args: NodeObj): NodeRef = proc copyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 3) assertArgCount(args, 3)
let let
@ -361,6 +405,7 @@ proc copyFunc(env: Env; args: NodeObj): NodeRef =
root.add(z.atom.str, dir[y.atom.str]) root.add(z.atom.str, dir[y.atom.str])
let cid = env.store.putDag(root.toCbor) let cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode cid.newAtom.newNode
]#
proc consFunc(env: Env; args: NodeObj): NodeRef = proc consFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2) assertArgCount(args, 2)
@ -393,31 +438,24 @@ proc globFunc(env: Env; args: NodeObj): NodeRef =
else: else:
result = newNodeError("invalid glob argument", n) result = newNodeError("invalid glob argument", n)
proc keyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1)
args.atom.str.toKey.newAtom.newNode
proc ingestFunc(env: Env; args: NodeObj): NodeRef = proc ingestFunc(env: Env; args: NodeObj): NodeRef =
var root = newFsRoot() let set = newBlobSet()
for n in args.walk: for n in args.walk:
returnError n returnError n
let let a = n.atom
a = n.atom case a.path.getFileInfo.kind
info = a.path.getFileInfo
case info.kind
of pcFile, pcLinkToFile: of pcFile, pcLinkToFile:
let file = env.getFile a.path result = newNode(newAtomError("not a directory"))
root.add(a.name, file)
of pcDir, pcLinkToDir: of pcDir, pcLinkToDir:
let dir = env.getDir a.path result = newNode(newAtom(env.getSet(a.path)))
root.add(a.name, dir)
let
cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode
proc blobFunc(env: Env; args: NodeObj): NodeRef = proc blobFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1) assertArgCount(args, 1)
let (blob, size) = env.getBlob args.atom.path newNode(newAtom(env.getBlob args.atom.path))
result = newNodeList()
result.append blob.newAtom.newNode
result.append size.newAtom.newNode
# TODO: natural number atom
proc listFunc(env: Env; args: NodeObj): NodeRef = proc listFunc(env: Env; args: NodeObj): NodeRef =
## Standard Lisp 'list' function. ## Standard Lisp 'list' function.
@ -428,6 +466,7 @@ proc listFunc(env: Env; args: NodeObj): NodeRef =
while not result.tailRef.nextRef.isNil: while not result.tailRef.nextRef.isNil:
result.tailRef = result.tailRef.nextRef result.tailRef = result.tailRef.nextRef
#[
proc lsFunc(env: Env; args: NodeObj): NodeRef = proc lsFunc(env: Env; args: NodeObj): NodeRef =
result = newNodeList() result = newNodeList()
for n in args.walk: for n in args.walk:
@ -440,6 +479,7 @@ proc lsFunc(env: Env; args: NodeObj): NodeRef =
e.append u.cid.newAtom.newNode e.append u.cid.newAtom.newNode
e.append name.newAtomString.newNode e.append name.newAtomString.newNode
result.append e result.append e
]#
proc mapFunc(env: Env; args: NodeObj): NodeRef = proc mapFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2) assertArgCount(args, 2)
@ -448,6 +488,7 @@ proc mapFunc(env: Env; args: NodeObj): NodeRef =
for v in args.next.list: for v in args.next.list:
result.append f(env, v) result.append f(env, v)
#[
proc mergeFunc(env: Env; args: NodeObj): NodeRef = proc mergeFunc(env: Env; args: NodeObj): NodeRef =
var root = newFsRoot() var root = newFsRoot()
for n in args.walk: for n in args.walk:
@ -458,10 +499,12 @@ proc mergeFunc(env: Env; args: NodeObj): NodeRef =
root.add(name, node) root.add(name, node)
let cid = env.store.putDag(root.toCbor) let cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode cid.newAtom.newNode
]#
proc pathFunc(env: Env; arg: NodeObj): NodeRef = proc pathFunc(env: Env; arg: NodeObj): NodeRef =
result = arg.atom.str.newAtomPath.newNode result = arg.atom.str.newAtomPath.newNode
#[
proc rootFunc(env: Env; args: NodeObj): NodeRef = proc rootFunc(env: Env; args: NodeObj): NodeRef =
var root = newFsRoot() var root = newFsRoot()
let let
@ -471,18 +514,13 @@ proc rootFunc(env: Env; args: NodeObj): NodeRef =
root.add(name, ufs) root.add(name, ufs)
let rootCid = env.store.putDag(root.toCbor) let rootCid = env.store.putDag(root.toCbor)
rootCid.newAtom.newNode rootCid.newAtom.newNode
]#
proc unionFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let bs = env.store.union(args.atom.bs, args.next.atom.bs)
bs.newAtom.newNode
proc walkFunc(env: Env; args: NodeObj): NodeRef =
assert args.atom.cid.isValid
let
rootCid = args.atom.cid
walkPath = args.next.atom.str
root = env.getUnixfs rootCid
final = env.store.walk(root, walkPath)
if final.isNil:
result = newNodeError("no walk to '$1'" % walkPath, args)
else:
result = final.cid.newAtom.newNode
## ##
# Environment # Environment
@ -497,23 +535,25 @@ proc newEnv(store: BlobStore): Env =
store: store, store: store,
bindings: initTable[string, NodeObj](), bindings: initTable[string, NodeObj](),
blobs: initTable[string, tuple[id: BlobId, size: BiggestInt]](), blobs: initTable[string, tuple[id: BlobId, size: BiggestInt]](),
paths: initTable[string, FsNode](), sets: initTable[string, BlobSet]())
cids: initTable[Cid, FsNode]())
result.bindEnv "apply", applyFunc result.bindEnv "apply", applyFunc
result.bindEnv "blob", blobFunc
result.bindEnv "cbor", cborFunc result.bindEnv "cbor", cborFunc
result.bindEnv "commit", commitFunc
result.bindEnv "cons", consFunc result.bindEnv "cons", consFunc
result.bindEnv "copy", copyFunc #result.bindEnv "copy", copyFunc
result.bindEnv "define", defineFunc result.bindEnv "define", defineFunc
result.bindEnv "glob", globFunc result.bindEnv "glob", globFunc
result.bindEnv "key", keyFunc
result.bindEnv "ingest", ingestFunc result.bindEnv "ingest", ingestFunc
result.bindEnv "list", listFunc result.bindEnv "list", listFunc
result.bindEnv "ls", lsFunc #result.bindEnv "ls", lsFunc
result.bindEnv "map", mapFunc result.bindEnv "map", mapFunc
result.bindEnv "merge", mergeFunc #result.bindEnv "merge", mergeFunc
result.bindEnv "path", pathFunc result.bindEnv "path", pathFunc
result.bindEnv "root", rootFunc #result.bindEnv "root", rootFunc
result.bindEnv "walk", walkFunc #result.bindEnv "walk", walkFunc
result.bindEnv "blob", blobFunc result.bindEnv "union", unionFunc
proc eval(ast: NodeRef; env: Env): NodeRef proc eval(ast: NodeRef; env: Env): NodeRef
@ -570,6 +610,7 @@ proc eval(ast: NodeRef; env: Env): NodeRef =
proc readLineSimple(prompt: string; line: var TaintedString): bool = proc readLineSimple(prompt: string; line: var TaintedString): bool =
stdin.readLine(line) stdin.readLine(line)
#[
when defined(genode): when defined(genode):
proc openStore(): BlobStore = proc openStore(): BlobStore =
result = newDagfsClient("repl") result = newDagfsClient("repl")
@ -586,6 +627,7 @@ else:
try: result = newTcpClient(host) try: result = newTcpClient(host)
except: except:
quit("failed to connect to store at $1 ($2)" % [host, getCurrentExceptionMsg()]) quit("failed to connect to store at $1 ($2)" % [host, getCurrentExceptionMsg()])
]#
proc replMain() = proc replMain() =
var scripted: bool var scripted: bool
@ -594,7 +636,7 @@ proc replMain() =
scripted = true scripted = true
let let
#store = openStore() #store = openStore()
store = newFileStore("/tmp/blobs") store = newNullStore() # newFileStore("/tmp/blobs")
env = newEnv(store) env = newEnv(store)
outStream = stdout.newFileStream outStream = stdout.newFileStream
readLine = if scripted: readLineSimple else: readLineFromStdin readLine = if scripted: readLineSimple else: readLineFromStdin
@ -609,51 +651,6 @@ proc replMain() =
outStream.write "\n" outStream.write "\n"
flush outStream flush outStream
proc dumpMain() =
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
let store = newFileStore("/tmp/blobs")
for i in 1..args.high:
try:
for chunk in store.dumpBlob(args[i].toBlobId):
write(stdout, chunk)
except:
writeLine(stderr, "failed to dump '", args[i], "', ", getCurrentExceptionMsg())
quit(-1)
proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: string) =
try:
case kind
of pcFile, pcLinkToFile:
let (id, size) = store.ingestFile(path)
set.insert(path, id, size)
writeLine(stdout, id, align($size, 11), " ", path)
of pcDir, pcLinkToDir:
for kind, subPath in path.walkDir:
set.insertPath(store, kind, normalizedPath subPath)
except:
let e = getCurrentException()
writeLine(stderr, "failed to ingest '", path, "', ", e.msg)
# raise e
proc ingestMain() =
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
var set = newBlobSet()
#let store = newFileStore("/tmp/blobs")
let store = newNullStore()
for i in 1..args.high:
let path = normalizedPath args[i]
set.insertPath(store, path.getFileInfo.kind, path)
let final = store.commit set
writeLine(stdout, final.setId)
proc main() = proc main() =
var cmd = "" var cmd = ""
for kind, key, val in getopt(): for kind, key, val in getopt():
@ -663,8 +660,8 @@ proc main() =
case normalize(cmd) case normalize(cmd)
of "": of "":
quit("no subcommand specified") quit("no subcommand specified")
#of "repl": of "repl":
# replMain() replMain()
of "dump": of "dump":
dumpMain() dumpMain()
of "ingest": of "ingest":

View File

@ -12,7 +12,7 @@ const
blobLeafSize* = 1 shl 14 blobLeafSize* = 1 shl 14
## Size of blob leaves. ## Size of blob leaves.
blobLeafSizeMask* = not(not(0) shl 14) blobLeafSizeMask* = not(not(0) shl 14)
visualLen = 32 * 3 blobVisualLen* = 32 * 3
maxChunkSize* {.deprecated} = blobLeafSize maxChunkSize* {.deprecated} = blobLeafSize
@ -29,7 +29,7 @@ type
func `$`*(bh: BlobId): string = func `$`*(bh: BlobId): string =
## Convert a blob hash to a visual representation. ## Convert a blob hash to a visual representation.
const baseRune = 0x2800 const baseRune = 0x2800
result = newString(visualLen) result = newString(blobVisualLen)
var pos = 0 var pos = 0
for b in bh.data.items: for b in bh.data.items:
let r = (Rune)baseRune or b.int let r = (Rune)baseRune or b.int
@ -37,7 +37,7 @@ func `$`*(bh: BlobId): string =
func toBlobId*(s: string): BlobId = func toBlobId*(s: string): BlobId =
## Parse a visual blob hash to binary. ## Parse a visual blob hash to binary.
if s.len == visualLen: if s.len == blobVisualLen:
var var
pos: int pos: int
r: Rune r: Rune
@ -172,7 +172,7 @@ const
keyChunkBits = fastLog2 keyBits keyChunkBits = fastLog2 keyBits
keyChunkMask = not ((not 0.Key) shl (keyChunkBits)) keyChunkMask = not ((not 0.Key) shl (keyChunkBits))
func toKey(s: string): Key = func toKey*(s: string): Key =
var key: siphash.Key var key: siphash.Key
let b = sipHash(toOpenArrayByte(s, s.low, s.high), key) let b = sipHash(toOpenArrayByte(s, s.low, s.high), key)
cast[Key](b) cast[Key](b)

View File

@ -259,6 +259,14 @@ proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
let (id, _) = finish stream let (id, _) = finish stream
result = BlobSet(kind: coldNode, setId: id) result = BlobSet(kind: coldNode, setId: id)
proc union*(store: BlobStore; a, b: BlobSet): BlobSet =
## Return the union of a set pair.
result = newBlobSet()
raiseAssert("not implemented")
# Store implementations
#
type type
FsBlobStream = ref FsBlobStreamObj FsBlobStream = ref FsBlobStreamObj
FsBlobStreamObj = object of BlobStreamObj FsBlobStreamObj = object of BlobStreamObj