blobsets/src/blobset.nim

840 lines
22 KiB
Nim

when not isMainModule:
{.error: "this module is not a library, import blobsets instead".}
import std/asyncdispatch, std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin, std/random
import cbor
import ./blobsets, ./blobsets/filestores,
./blobsets/httpservers, ./blobsets/httpstores
import os, strutils
when defined(readLine):
import rdstdin, linenoise
proc openStore(): BlobStore =
const key = "BLOB_STORE_URL"
var url = os.getEnv key
if url == "":
url = "/tmp/blobs"
#quit(key & " not set in environment")
if url.startsWith "http://":
newHttpStore(url)
else:
newFileStore(url)
proc serverMain(): Future[void] =
let
store = newFileStore("/tmp/blobs")
server = newHttpStoreServer(store)
server.serve((Port)8080)
proc dumpMain() =
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
let store = openStore()
for i in 1..args.high:
try:
for chunk in store.dumpBlob(args[i].toBlobId):
write(stdout, chunk)
except:
writeLine(stderr, "failed to dump '", args[i], "', ", getCurrentExceptionMsg())
quit(-1)
proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string): BlobSet =
result = bs
try:
case kind
of pcFile, pcLinkToFile:
var path = normalizedPath path
let (id, size) = waitFor store.ingestFile(path)
path.removePrefix(getCurrentDir())
path.removePrefix("/")
result = waitfor insert(store, result, path, id, size)
writeLine(stdout, id, align($size, 11), " ", path)
of pcDir, pcLinkToDir:
for kind, subPath in path.walkDir:
result = store.insertPath(result, kind, subPath)
except:
let e = getCurrentException()
writeLine(stderr, "failed to ingest '", path, "', ", e.msg)
proc ingestMain() {.async.} =
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
var set = newBlobSet()
let store = openStore()
for i in 1..args.high:
let path = normalizedPath args[i]
set = store.insertPath(set, path.getFileInfo.kind, path)
let final = await store.commit(set)
writeLine(stdout, final.setId.toHex)
func isShortHash(id: BlobId): bool =
var r: byte
for i in countup(16, id.data.high):
r = r or id.data[i]
r == 0
proc checkMain() {.async.} =
randomize()
var args = newSeq[string]()
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 1:
let store = openStore()
for i in 1..args.high:
try:
var bs = await store.load(args[i].toBlobId)
let stream = newMemberStream()
asyncCheck stream.streamMembers(store, bs)
var m: tuple[key: Key; id: BlobId; size: BiggestInt]
while true:
let (valid, m) = await stream.read()
if not valid: break
if m.id.isShortHash:
echo m.key, " has a short hash - ", m.id
bs = await remove(store, bs, m.key)
echo "removed ", m.key
#close(store.openBlobStream(id, size, dataBlob))
echo "commit repaired set"
bs = await commit(store, bs)
writeLine(stdout, "recovered set is ", bs.setId.toHex)
except:
writeLine(stderr, "failed to check '", args[i], "', ", getCurrentExceptionMsg())
quit(-1)
proc replicateMain() {.async.} =
randomize()
var
args = newSeq[string]()
rng = initRand(rand(int.high))
for kind, key, val in getopt():
if kind == cmdArgument:
args.add key
if args.len > 3:
let
src = openStore()
dst = newHttpStore(args[1])
for i in 2..args.high:
try:
let
srcId = args[i].toBlobId
srcSet = await src.load(srcId)
var dstSet = newBlobSet()
let stream = newMemberStream()
asyncCheck stream.streamMembers(src, srcSet)
var m: tuple[key: Key; id: BlobId; size: BiggestInt]
while true:
let (valid, m) = await stream.read()
if not valid: break
let
readStream = src.openBlobStream(m.id, m.size, dataBlob)
ingestStream = dst.openIngestStream(m.size, dataBlob)
var buf: array[0x16000, byte]
while true:
let n = await readStream.read(addr buf, buf.len)
if n == 0: break
await ingestStream.ingest(addr buf, n)
close readStream
let (otherId, otherSize) = await ingestStream.finish()
if otherId != m.id or otherSize != m.size:
writeLine(stderr, "replication mismatch ",
m.id, ":", m.size, " replicated to ",
otherId, ":", otherSize)
quit -1
dstSet = await insert(dst, dstSet, m.key, m.id, m.size)
let
newSet = await commit(dst, dstSet)
dstId = newSet.setId
doAssert(dstId == srcId, "set mismatch after replication")
except:
writeLine(stderr, "failed to replicate '", args[i], "', ", getCurrentExceptionMsg())
quit -1
type
EvalError = object of CatchableError
Env = ref EnvObj
AtomKind = enum
atomBlob
atomSet
atomPath
atomString
atomKey
atomNum
atomSymbol
atomError
Atom = object
case kind: AtomKind
of atomBlob:
blob: BlobId
size: BiggestInt
of atomSet:
bs: BlobSet
of atomPath:
path: string
name: string
of atomString:
str: string
of atomKey:
key: Key
of atomNum:
num: BiggestInt
of atomSymbol:
sym: string
of atomError:
err: string
Func = proc(env: Env; arg: NodeObj): NodeRef
NodeKind = enum
nodeError
nodeList
nodeAtom
nodeFunc
NodeRef = ref NodeObj
## NodeRef is used to chain nodes into lists.
NodeObj = object
## NodeObj is used to mutate nodes without side-effects.
case kind: NodeKind
of nodeList:
headRef, tailRef: NodeRef
of nodeAtom:
atom: Atom
of nodeFunc:
fun: Func
name: string
of nodeError:
errMsg: string
errNode: NodeRef
nextRef: NodeRef
EnvObj = object
store: BlobStore
bindings: Table[string, NodeObj]
blobs: Table[string, tuple[id: BlobId, size: BiggestInt]]
sets: Table[string, BlobSet]
proc print(a: Atom; s: Stream)
proc print(ast: NodeRef; s: Stream)
proc newAtom(id: BlobId, size: BiggestInt): Atom =
Atom(kind: atomBlob, blob: id, size: size)
proc newAtom(bs: BlobSet): Atom =
Atom(kind: atomSet, bs: bs)
proc newAtomError(msg: string): Atom =
Atom(kind: atomError, err: msg)
proc newAtomPath(s: string): Atom =
try:
let path = expandFilename s
Atom(kind: atomPath, path: path, name: extractFilename(s))
except OSError:
newAtomError("invalid path '$1'" % s)
proc newAtomString(s: string): Atom =
Atom(kind: atomString, str: s)
proc newAtom(k: Key): Atom =
Atom(kind: atomKey, key: k)
proc newAtom(i: Natural): Atom =
Atom(kind: atomNum, num: i)
proc newNodeError(msg: string; n: NodeObj): NodeRef =
var p = new NodeRef
p[] = n
NodeRef(kind: nodeError, errMsg: msg, errNode: p)
proc newNode(a: Atom): NodeRef =
NodeRef(kind: nodeAtom, atom: a)
proc newNodeList(): NodeRef =
NodeRef(kind: nodeList)
proc next(n: NodeObj | NodeRef): NodeObj =
## Return a copy of list element that follows Node n.
assert(not n.nextRef.isNil, "next element is nil")
result = n.nextRef[]
proc head(list: NodeObj | NodeRef): NodeObj =
## Return the start element of a list Node.
list.headRef[]
proc `next=`(n, p: NodeRef) =
## Return a copy of list element that follows Node n.
assert(n.nextRef.isNil, "append to node that is not at the end of a list")
n.nextRef = p
iterator list(n: NodeObj): NodeObj =
## Iterate over members of a list node.
var n = n.headRef
while not n.isNil:
yield n[]
n = n.nextRef
iterator walk(n: NodeObj): NodeObj =
## Walk down the singly linked list starting from a member node.
var n = n
while not n.nextRef.isNil:
yield n
n = n.nextRef[]
yield n
proc append(list, n: NodeRef) =
## Append a node to the end of a list node.
if list.headRef.isNil:
list.headRef = n
list.tailRef = n
else:
list.tailRef.next = n
while not list.tailRef.nextRef.isNil:
assert(list.tailRef != list.tailRef.nextRef)
list.tailRef = list.tailRef.nextRef
proc append(list: NodeRef; n: NodeObj) =
let p = new NodeRef
p[] = n
list.append p
template returnError(n: NodeObj) =
if n.atom.kind == atomError:
return n.atom.newNode
proc getBlob(env: Env; path: string): tuple[id: BlobId, size: BiggestInt] =
result = env.blobs.getOrDefault(path)
if result.size == 0:
result = waitFor env.store.ingestFile(path)
if result.size != 0:
env.blobs[path] = result
proc getSet(env: Env; path: string): BlobSet=
result = env.sets.getOrDefault(path)
if result.isNil:
result = newBlobSet()
result = env.store.insertPath(result, path.getFileInfo.kind, path)
if not result.isEmpty:
env.sets[path] = result
type
Tokens = seq[string]
Reader = ref object
buffer: string
tokens: Tokens
pos: int
proc newReader(): Reader =
Reader(buffer: "", tokens: newSeq[string]())
proc next(r: Reader): string =
assert(r.pos < r.tokens.len, $r.tokens)
result = r.tokens[r.pos]
inc r.pos
proc peek(r: Reader): string =
assert(r.pos < r.tokens.len, $r.tokens)
r.tokens[r.pos]
proc print(a: Atom; s: Stream) =
case a.kind
of atomPath:
s.write a.path
of atomBlob:
s.write $a.blob
s.write '|'
s.write $a.size
of atomSet:
case a.bs.kind
of coldNode:
s.write $a.bs.setId
else:
s.write "«set»"
of atomString:
s.write '"'
s.write a.str
s.write '"'
#[
of atomData:
let fut = newFutureStream[string]()
asyncCheck env.store.fileStream(a.fileCid, fut)
while true:
let (valid, chunk) = fut.read()
if not valid: break
f.write chunk
]#
of atomKey:
s.write $a.key
of atomNum:
s.write $a.num
of atomSymbol:
s.write a.sym
of atomError:
s.write "«"
s.write a.err
s.write "»"
proc print(ast: NodeObj; s: Stream) =
case ast.kind:
of nodeAtom:
ast.atom.print(s)
of nodeList:
s.write "\n("
for n in ast.list:
s.write " "
n.print(s)
s.write " )"
of nodeFunc:
s.write "#<procedure "
s.write ast.name
s.write ">"
of nodeError:
s.write "«"
s.write ast.errMsg
s.write ": "
ast.errNode.print s
s.write "»"
proc print(ast: NodeRef; s: Stream) =
if ast.isNil:
s.write "«nil»"
else:
ast[].print s
proc readAtom(r: Reader): Atom =
let token = r.next
block:
if token[token.low] == '"':
if token[token.high] != '"':
newAtomError("invalid string '$1'" % token)
else:
newAtomString(token[1..token.len-2])
elif token.contains DirSep:
# TODO: memoize this, store a table of paths to atoms
newAtomPath token
elif token.len in { blobHexLen, blobVisualLen }:
Atom(kind: atomSet, bs: newBlobSet(token.toSetId))
else:
Atom(kind: atomSymbol, sym: token.normalize)
#except:
# newAtomError(getCurrentExceptionMsg())
proc readForm(r: Reader): NodeRef
proc readList(r: Reader): NodeRef =
result = newNodeList()
while true:
if (r.pos == r.tokens.len):
return nil
let p = r.peek
case p[p.high]
of ')':
discard r.next
break
else:
result.append r.readForm
proc readForm(r: Reader): NodeRef =
case r.peek[0]
of '(':
discard r.next
r.readList
else:
r.readAtom.newNode
proc tokenizer(s: string): Tokens =
# TODO: this sucks
let tokens = s.findAll(re"""[\s,]*(~@|[\[\]{}()'`~^@]|"(?:\\.|[^\\"])*"|;.*|[^\s\[\]{}('"`,;)]*)""")
result = newSeqOfCap[string] tokens.len
for s in tokens:
let t = s.strip(leading = true, trailing = false).strip(leading = false, trailing = true)
if t.len > 0:
result.add t
proc read(r: Reader; line: string): NodeRef =
r.pos = 0
if r.buffer.len > 0:
r.buffer.add " "
r.buffer.add line
r.tokens = r.buffer.tokenizer
else:
r.tokens = line.tokenizer
result = r.readForm
if result.isNil:
r.buffer = line
else:
r.buffer.setLen 0
proc assertArgCount(args: NodeObj; len: int) =
var arg = args
for _ in 2..len:
doAssert(not arg.nextRef.isNil)
arg = arg.next
doAssert(arg.nextRef.isNil)
##
# Builtin functions
#
proc applyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let
fn = args
ln = fn.next
fn.fun(env, ln.head)
proc cborFunc(env: Env; arg: NodeObj): NodeRef =
assertArgCount(arg, 1)
newNode(newAtomString($toCbor(arg.atom.bs)))
proc commitFunc(env: Env; arg: NodeObj): NodeRef =
assertArgCount(arg, 1)
let cold = waitFor commit(env.store, arg.atom.bs)
cold.newAtom.newNode
#[
proc copyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 3)
let
x = args
y = x.next
z = y.next
var root = newFsRoot()
let dir = env.getUnixfs x.atom.cid
for name, node in dir.items:
root.add(name, node)
root.add(z.atom.str, dir[y.atom.str])
let cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode
]#
proc consFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
result = newNodeList()
let
car = args
cdr = args.next
result.append car
result.append cdr.head
proc defineFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let
symN = args
val = args.next
env.bindings[symN.atom.sym] = val
new result
result[] = val
proc globFunc(env: Env; args: NodeObj): NodeRef =
result = newNodeList()
for n in args.walk:
let a = n.atom
case a.kind
of atomPath:
result.append n
of atomString:
for match in walkPattern a.str:
result.append match.newAtomPath.newNode
else:
result = newNodeError("invalid glob argument", n)
proc hexFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1)
let a = args.atom
case a.kind
of atomBlob:
a.blob.toHex.newAtomString.newNode
of atomSet:
case a.bs.kind
of hotNode:
let cold = waitFor commit(env.store, a.bs)
cold.setId.toHex.newAtomString.newNode
of coldNode:
a.bs.setId.toHex.newAtomString.newNode
of leafNode:
a.bs.blob.toHex.newAtomString.newNode
else:
newNodeError("cannot convert to hex", args)
proc keyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1)
let k = args.atom.str.toKey
k.newAtom.newNode
proc ingestFunc(env: Env; args: NodeObj): NodeRef =
var bs: BlobSet
for n in args.walk:
if bs.isNil:
bs = env.getSet(n.atom.path)
else:
bs = env.store.union(bs, env.getSet(n.atom.path))
result = bs.newAtom.newNode
proc insertFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 3)
let
trie = args.atom.bs
blob = args.next.atom
name = args.next.next.atom.str
let newBs = waitFor env.store.insert(trie, name, blob.blob, blob.size)
doAssert(not newBs.isNil)
newNode(newAtom(newBs))
proc blobFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1)
let (id, size) = env.getBlob args.atom.path
newNode(newAtom(id, size))
proc listFunc(env: Env; args: NodeObj): NodeRef =
## Standard Lisp 'list' function.
result = newNodeList()
new result.headRef
result.headRef[] = args
result.tailRef = result.headRef
while not result.tailRef.nextRef.isNil:
result.tailRef = result.tailRef.nextRef
proc loadFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1)
let bs = waitFor env.store.load(args.atom.bs.setId)
bs.newAtom.newNode
proc mapFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
result = newNodeList()
let f = args.fun
for v in args.next.list:
result.append f(env, v)
#[
proc mergeFunc(env: Env; args: NodeObj): NodeRef =
var root = newFsRoot()
for n in args.walk:
let
a = n.atom
dir = env.getUnixfs a.cid
for name, node in dir.items:
root.add(name, node)
let cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode
]#
proc pathFunc(env: Env; arg: NodeObj): NodeRef =
result = arg.atom.str.newAtomPath.newNode
proc randomFunc(env: Env; arg: NodeObj): NodeRef =
assertArgCount(arg, 1)
var rng = initRand(rand(int.high))
let bs = arg.atom.bs
var random: NodeRef
env.store.randomApply(bs, rng) do (id: BlobId; size: BiggestInt):
random = newNode(newAtom(id, size))
if random.isNil: newNodeList()
else: random
proc removeFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let
bs = args.atom.bs
name = args.next.atom.str
newNode(newAtom(waitFor env.store.remove(bs, name)))
proc searchFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
var found: NodeRef
let
bs = args.atom.bs
name = args.next.atom.str
apply(env.store, bs, name) do (id: BlobId; size: BiggestInt):
found = newNode(newAtom(id, size))
if found.isNil:
result = newNodeList()
else:
result = found
proc unionFunc(env: Env; args: NodeObj): NodeRef =
var bs: BlobSet
for n in args.walk:
if bs.isNil:
bs = n.atom.bs
else:
bs = env.store.union(bs, n.atom.bs)
result = bs.newAtom.newNode
##
# Environment
#
proc bindEnv(env: Env; name: string; fun: Func) =
assert(not env.bindings.contains name)
env.bindings[name] = NodeObj(kind: nodeFunc, fun: fun, name: name)
proc newEnv(store: BlobStore): Env =
result = Env(
store: store,
bindings: initTable[string, NodeObj](),
blobs: initTable[string, tuple[id: BlobId, size: BiggestInt]](),
sets: initTable[string, BlobSet]())
result.bindEnv "apply", applyFunc
result.bindEnv "blob", blobFunc
result.bindEnv "cbor", cborFunc
result.bindEnv "commit", commitFunc
result.bindEnv "cons", consFunc
#result.bindEnv "copy", copyFunc
result.bindEnv "define", defineFunc
result.bindEnv "glob", globFunc
result.bindEnv "hex", hexFunc
result.bindEnv "ingest", ingestFunc
result.bindEnv "insert", insertFunc
result.bindEnv "key", keyFunc
result.bindEnv "load", loadFunc
result.bindEnv "list", listFunc
result.bindEnv "map", mapFunc
#result.bindEnv "merge", mergeFunc
result.bindEnv "path", pathFunc
result.bindEnv "random", randomFunc
result.bindEnv "remove", removeFunc
result.bindEnv "search", searchFunc
result.bindEnv "union", unionFunc
proc eval(ast: NodeRef; env: Env): NodeRef
proc eval_ast(ast: NodeRef; env: Env): NodeRef =
result = ast
case ast.kind
of nodeList:
result = newNodeList()
while not ast.headRef.isNil:
# cut out the head of the list and evaluate
let n = ast.headRef
ast.headRef = n.nextRef
n.nextRef = nil
let x = n.eval(env)
result.append x
of nodeAtom:
if ast.atom.kind == atomSymbol:
if env.bindings.contains ast.atom.sym:
result = new NodeRef
result[] = env.bindings[ast.atom.sym]
else: discard
proc eval(ast: NodeRef; env: Env): NodeRef =
var input = ast[]
try:
if ast.kind == nodeList:
if ast.headRef == nil:
newNodeList()
else:
let
ast = eval_ast(ast, env)
head = ast.headRef
if head.kind == nodeFunc:
if not head.nextRef.isNil:
input = head.next
head.fun(env, input)
else:
input = NodeObj(kind: nodeList)
head.fun(env, input)
else:
input = head[]
newNodeError("not a function", input)
else:
eval_ast(ast, env)
except EvalError:
newNodeError(getCurrentExceptionMsg(), input)
except FieldError:
newNodeError("invalid argument", input)
except OSError:
newNodeError(getCurrentExceptionMsg(), input)
proc readLineSimple(prompt: string; line: var TaintedString): bool =
stdin.readLine(line)
#[
when defined(genode):
proc openStore(): BlobStore =
result = newDagfsClient("repl")
else:
proc openStore(): BlobStore =
var host = ""
for kind, key, value in getopt():
if kind == cmdShortOption:
if key == "h":
if host != "":
quit "only a single store path argument is accepted"
host = value
if host == "": host = "127.0.0.1"
try: result = newTcpClient(host)
except:
quit("failed to connect to store at $1 ($2)" % [host, getCurrentExceptionMsg()])
]#
proc emptyMain() =
let
store = openStore()
bs = waitFor store.commit(newBlobSet())
echo bs.setId.toHex
proc replMain() =
randomize()
var scripted: bool
for kind, key, value in getopt():
if kind == cmdShortOption and key == "s":
scripted = true
let
#store = openStore()
store = openStore()
env = newEnv(store)
outStream = stdout.newFileStream
readLine = if scripted: readLineSimple else: readLineFromStdin
var
reader = newReader()
line = newStringOfCap 128
while readLine("> ", line):
if line.len > 0:
let ast = reader.read(line)
if not ast.isNil:
ast.eval(env).print(outStream)
outStream.write "\n"
flush outStream
const Prompt = ">>> "
proc getLine(prompt: string): string =
# Using line editing
when defined(readLine):
result = readLineFromStdin(prompt)
else:
# Primitive fallback
stdout.write(prompt)
result = stdin.readline()
when isMainModule:
var cmd = ""
for kind, key, val in getopt():
if kind == cmdArgument:
cmd = key
break
case normalize(cmd)
of "": quit("no subcommand specified")
of "empty": emptyMain()
of "repl": replMain()
of "dump": dumpMain()
of "ingest": waitFor ingestMain()
of "server": waitFor serverMain()
of "check": waitFor checkMain()
of "replicate": waitFor replicateMain()
else: quit("no such subcommand " & cmd)